packio
dispatcher.h
Go to the documentation of this file.
1 // This Source Code Form is subject to the terms of the Mozilla Public
2 // License, v. 2.0. If a copy of the MPL was not distributed with this
3 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
4 
5 #ifndef PACKIO_DISPATCHER_H
6 #define PACKIO_DISPATCHER_H
7 
10 
11 #include <functional>
12 #include <memory>
13 #include <mutex>
14 #include <optional>
15 #include <string_view>
16 #include <tuple>
17 
18 #include "args_specs.h"
19 #include "handler.h"
20 #include "internal/config.h"
21 #include "internal/movable_function.h"
22 #include "internal/rpc.h"
23 #include "internal/utils.h"
24 #include "traits.h"
25 
26 namespace packio {
27 
32 template <typename Rpc, template <class...> class Map = default_map, typename Lockable = default_mutex>
33 class dispatcher {
34 public:
36  using rpc_type = Rpc;
38  using mutex_type = Lockable;
40  using args_type = decltype(typename rpc_type::request_type{}.args);
42  using function_type = internal::movable_function<
45  using function_ptr_type = std::shared_ptr<function_type>;
46 
51  template <typename SyncProcedure>
52  bool add(
53  std::string_view name,
54  args_specs<SyncProcedure> arg_specs,
55  SyncProcedure&& fct)
56  {
57  PACKIO_STATIC_ASSERT_TRAIT(SyncProcedure);
58  std::unique_lock lock{map_mutex_};
59  return function_map_
60  .emplace(
61  name,
62  std::make_shared<function_type>(wrap_sync(
63  std::forward<SyncProcedure>(fct), std::move(arg_specs))))
64  .second;
65  }
66 
68  template <typename SyncProcedure>
69  bool add(std::string_view name, SyncProcedure&& fct)
70  {
71  return add<SyncProcedure>(name, {}, std::forward<SyncProcedure>(fct));
72  }
73 
78  template <typename AsyncProcedure>
79  bool add_async(
80  std::string_view name,
82  AsyncProcedure&& fct)
83  {
84  PACKIO_STATIC_ASSERT_TTRAIT(AsyncProcedure, rpc_type);
85  std::unique_lock lock{map_mutex_};
86  return function_map_
87  .emplace(
88  name,
89  std::make_shared<function_type>(wrap_async(
90  std::forward<AsyncProcedure>(fct), std::move(arg_specs))))
91  .second;
92  }
93 
95  template <typename AsyncProcedure>
96  bool add_async(std::string_view name, AsyncProcedure&& fct)
97  {
98  return add_async<AsyncProcedure>(
99  name, {}, std::forward<AsyncProcedure>(fct));
100  }
101 
102 #if defined(PACKIO_HAS_CO_AWAIT)
108  template <
109  typename Executor,
110  typename CoroProcedure,
111  std::size_t N = internal::func_traits<CoroProcedure>::args_count>
112  bool add_coro(
113  std::string_view name,
114  const Executor& executor,
115  args_specs<CoroProcedure> arg_specs,
116  CoroProcedure&& coro)
117  {
118  PACKIO_STATIC_ASSERT_TRAIT(CoroProcedure);
119  std::unique_lock lock{map_mutex_};
120  return function_map_
121  .emplace(
122  name,
123  std::make_shared<function_type>(wrap_coro(
124  executor,
125  std::forward<CoroProcedure>(coro),
126  std::move(arg_specs))))
127  .second;
128  }
129 
131  template <typename Executor, typename CoroProcedure>
132  bool add_coro(std::string_view name, const Executor& executor, CoroProcedure&& coro)
133  {
134  return add_coro<Executor, CoroProcedure>(
135  name, executor, {}, std::forward<CoroProcedure>(coro));
136  }
137 
139  template <
140  typename ExecutionContext,
141  typename CoroProcedure,
142  std::size_t N = internal::func_traits<CoroProcedure>::args_count>
143  bool add_coro(
144  std::string_view name,
145  ExecutionContext& ctx,
146  args_specs<CoroProcedure> arg_specs,
147  CoroProcedure&& coro)
148  {
149  return add_coro<decltype(ctx.get_executor()), CoroProcedure, N>(
150  name,
151  ctx.get_executor(),
152  std::move(arg_specs),
153  std::forward<CoroProcedure>(coro));
154  }
155 
157  template <typename ExecutionContext, typename CoroProcedure>
158  bool add_coro(std::string_view name, ExecutionContext& ctx, CoroProcedure&& coro)
159  {
160  return add_coro<ExecutionContext, CoroProcedure>(
161  name, ctx, {}, std::forward<CoroProcedure>(coro));
162  }
163 #endif // defined(PACKIO_HAS_CO_AWAIT)
164 
168  bool remove(const std::string& name)
169  {
170  std::unique_lock lock{map_mutex_};
171  return function_map_.erase(name);
172  }
173 
177  bool has(const std::string& name) const
178  {
179  std::unique_lock lock{map_mutex_};
180  return function_map_.find(name) != function_map_.end();
181  }
182 
185  size_t clear()
186  {
187  std::unique_lock lock{map_mutex_};
188  size_t size = function_map_.size();
189  function_map_.clear();
190  return size;
191  }
192 
195  std::vector<std::string> known() const
196  {
197  std::unique_lock lock{map_mutex_};
198  std::vector<std::string> names;
199  names.reserve(function_map_.size());
200  std::transform(
201  function_map_.begin(),
202  function_map_.end(),
203  std::back_inserter(names),
204  [](const typename decltype(function_map_)::value_type& pair) {
205  return pair.first;
206  });
207  return names;
208  }
209 
210  function_ptr_type get(const std::string& name) const
211  {
212  std::unique_lock lock{map_mutex_};
213  auto it = function_map_.find(name);
214  if (it == function_map_.end()) {
215  return {};
216  }
217  else {
218  return it->second;
219  }
220  }
221 
222 private:
223  using function_map_type = Map<std::string, function_ptr_type>;
224 
225  template <typename F>
226  auto wrap_sync(F&& fct, args_specs<F> args_specs)
227  {
228  using value_args =
229  internal::decay_tuple_t<typename internal::func_traits<F>::args_type>;
230  using result_type = typename internal::func_traits<F>::result_type;
231 
232  return
233  [fct = std::forward<F>(fct), args_specs = std::move(args_specs)](
234  completion_handler<rpc_type> handler, args_type&& args) mutable {
235  auto typed_args = rpc_type::template extract_args<value_args>(
236  std::move(args), args_specs);
237  if (!typed_args) {
238  PACKIO_DEBUG(typed_args.error());
239  handler.set_error(typed_args.error());
240  return;
241  }
242 
243  if constexpr (std::is_void_v<result_type>) {
244  std::apply(fct, std::move(*typed_args));
245  handler();
246  }
247  else {
248  handler(std::apply(fct, std::move(*typed_args)));
249  }
250  };
251  }
252 
253  template <typename F>
254  auto wrap_async(F&& fct, args_specs<F> args_specs)
255  {
256  using args = typename internal::func_traits<F>::args_type;
257  using value_args =
258  internal::decay_tuple_t<internal::left_shift_tuple_t<args>>;
259 
260  return
261  [fct = std::forward<F>(fct), args_specs = std::move(args_specs)](
262  completion_handler<rpc_type> handler, args_type&& args) mutable {
263  auto typed_args = rpc_type::template extract_args<value_args>(
264  std::move(args), args_specs);
265  if (!typed_args) {
266  PACKIO_DEBUG(typed_args.error());
267  handler.set_error(typed_args.error());
268  return;
269  }
270 
271  std::apply(
272  [&](auto&&... args) {
273  fct(std::move(handler),
274  std::forward<decltype(args)>(args)...);
275  },
276  std::move(*typed_args));
277  };
278  }
279 
280 #if defined(PACKIO_HAS_CO_AWAIT)
281  template <typename E, typename C>
282  auto wrap_coro(const E& executor, C&& coro, args_specs<C> args_specs)
283  {
284  using value_args =
285  internal::decay_tuple_t<typename internal::func_traits<C>::args_type>;
286  using result_type =
287  typename internal::func_traits<C>::result_type::value_type;
288 
289  return [executor,
290  coro = std::forward<C>(coro),
291  args_specs = std::move(args_specs)](
292  completion_handler<rpc_type> handler,
293  args_type&& args) mutable {
294  auto typed_args = rpc_type::template extract_args<value_args>(
295  std::move(args), args_specs);
296  if (!typed_args) {
297  PACKIO_DEBUG(typed_args.error());
298  handler.set_error(typed_args.error());
299  return;
300  }
301 
302  net::co_spawn(
303  executor,
304  [typed_args = std::move(*typed_args),
305  handler = std::move(handler),
306  coro = std::forward<C>(coro)]() mutable -> net::awaitable<void> {
307  if constexpr (std::is_void_v<result_type>) {
308  co_await std::apply(coro, std::move(typed_args));
309  handler();
310  }
311  else {
312  handler(co_await std::apply(coro, std::move(typed_args)));
313  }
314  },
315  [](std::exception_ptr exc) {
316  if (exc) {
317  std::rethrow_exception(exc);
318  }
319  });
320  };
321  }
322 #endif // defined(PACKIO_HAS_CO_AWAIT)
323 
324  mutable mutex_type map_mutex_;
325  function_map_type function_map_;
326 };
327 
328 } // packio
329 
330 #endif // PACKIO_DISPATCHER_H
Class args_specs.
Procedure arguments specifications.
Definition: args_specs.h:174
The completion_handler class.
Definition: handler.h:27
The dispatcher class, used to store and dispatch procedures.
Definition: dispatcher.h:33
bool add_coro(std::string_view name, ExecutionContext &ctx, CoroProcedure &&coro)
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: dispatcher.h:158
std::shared_ptr< function_type > function_ptr_type
A shared pointer to function_type.
Definition: dispatcher.h:45
bool add_async(std::string_view name, AsyncProcedure &&fct)
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: dispatcher.h:96
bool remove(const std::string &name)
Remove a procedure from the dispatcher.
Definition: dispatcher.h:168
size_t clear()
Remove all procedures.
Definition: dispatcher.h:185
bool add_coro(std::string_view name, const Executor &executor, CoroProcedure &&coro)
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: dispatcher.h:132
bool has(const std::string &name) const
Check if a procedure is registered.
Definition: dispatcher.h:177
bool add_coro(std::string_view name, const Executor &executor, args_specs< CoroProcedure > arg_specs, CoroProcedure &&coro)
Add a coroutine to the dispatcher.
Definition: dispatcher.h:112
bool add(std::string_view name, args_specs< SyncProcedure > arg_specs, SyncProcedure &&fct)
Add a synchronous procedure to the dispatcher.
Definition: dispatcher.h:52
std::vector< std::string > known() const
Get the name of all known procedures.
Definition: dispatcher.h:195
bool add(std::string_view name, SyncProcedure &&fct)
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: dispatcher.h:69
internal::movable_function< void(completion_handler< rpc_type >, args_type &&args)> function_type
The type of function stored in the dispatcher.
Definition: dispatcher.h:43
bool add_async(std::string_view name, args_specs< AsyncProcedure > arg_specs, AsyncProcedure &&fct)
Add an asynchronous procedure to the dispatcher.
Definition: dispatcher.h:79
decltype(typename rpc_type::request_type{}.args) args_type
The type of the arguments used by the RPC protocol.
Definition: dispatcher.h:40
Rpc rpc_type
The RPC protocol type.
Definition: dispatcher.h:36
bool add_coro(std::string_view name, ExecutionContext &ctx, args_specs< CoroProcedure > arg_specs, CoroProcedure &&coro)
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: dispatcher.h:143
Lockable mutex_type
The mutex type used to protect the procedure map.
Definition: dispatcher.h:38
Class completion_handler.
The packio namespace.
Definition: arg.h:14
Traits definition.