packio
client.h
Go to the documentation of this file.
1 // This Source Code Form is subject to the terms of the Mozilla Public
2 // License, v. 2.0. If a copy of the MPL was not distributed with this
3 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
4 
5 #ifndef PACKIO_CLIENT_H
6 #define PACKIO_CLIENT_H
7 
10 
11 #include <atomic>
12 #include <chrono>
13 #include <functional>
14 #include <memory>
15 #include <queue>
16 #include <string_view>
17 #include <type_traits>
18 #include <optional>
19 
20 #include "internal/config.h"
21 #include "internal/manual_strand.h"
22 #include "internal/movable_function.h"
23 #include "internal/rpc.h"
24 #include "internal/utils.h"
25 #include "traits.h"
26 
27 namespace packio {
28 
33 template <typename Rpc, typename Socket, template <class...> class Map = default_map>
34 class client : public std::enable_shared_from_this<client<Rpc, Socket, Map>> {
35 public:
37  using rpc_type = Rpc;
39  using id_type = typename rpc_type::id_type;
41  using response_type = typename rpc_type::response_type;
43  using socket_type = Socket;
45  using protocol_type = typename socket_type::protocol_type;
47  using executor_type = typename socket_type::executor_type;
48 
49  using std::enable_shared_from_this<client<Rpc, Socket, Map>>::shared_from_this;
50 
52  static constexpr size_t kDefaultBufferReserveSize = 4096;
53 
57  : socket_{std::move(socket)}, strand_{socket_.get_executor()}, wstrand_{strand_}
58  {
59  }
60 
62  socket_type& socket() noexcept { return socket_; }
63 
65  const socket_type& socket() const noexcept { return socket_; }
66 
68  void set_buffer_reserve_size(std::size_t size) noexcept
69  {
70  buffer_reserve_size_ = size;
71  }
73  std::size_t get_buffer_reserve_size() const noexcept
74  {
75  return buffer_reserve_size_;
76  }
77 
79  executor_type get_executor() { return socket().get_executor(); }
80 
85  void cancel(id_type id)
86  {
87  PACKIO_TRACE("cancel {}", rpc_type::format_id(id));
88  net::dispatch(strand_, [self = shared_from_this(), id] {
89  auto ec = make_error_code(net::error::operation_aborted);
90  self->async_call_handler(id, ec, {});
91  self->maybe_cancel_reading();
92  });
93  }
94 
98  void cancel()
99  {
100  PACKIO_TRACE("cancel all");
101  net::dispatch(strand_, [self = shared_from_this()] {
102  auto ec = make_error_code(net::error::operation_aborted);
103  while (!self->pending_.empty()) {
104  self->async_call_handler(self->pending_.begin()->first, ec, {});
105  }
106  self->maybe_cancel_reading();
107  });
108  }
109 
118  template <
119  PACKIO_COMPLETION_TOKEN_FOR(void(error_code))
120  NotifyHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type),
121  typename ArgsTuple,
122  typename = std::enable_if_t<internal::is_tuple_v<ArgsTuple>>>
124  std::string_view name,
125  ArgsTuple&& args,
126  NotifyHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(executor_type))
127  {
128  return net::async_initiate<NotifyHandler, void(error_code)>(
129  initiate_async_notify(this),
130  handler,
131  name,
132  std::forward<ArgsTuple>(args));
133  }
134 
136  template <
137  PACKIO_COMPLETION_TOKEN_FOR(void(error_code))
138  NotifyHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type),
139  typename = std::enable_if_t<!internal::is_tuple_v<NotifyHandler>>>
141  std::string_view name,
142  NotifyHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(executor_type))
143  {
144  return async_notify(
145  name, std::tuple{}, std::forward<NotifyHandler>(handler));
146  }
147 
155  template <
156  PACKIO_COMPLETION_TOKEN_FOR(void(error_code, response_type))
157  CallHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type),
158  typename ArgsTuple,
159  typename = std::enable_if_t<internal::is_tuple_v<ArgsTuple>>>
161  std::string_view name,
162  ArgsTuple&& args,
163  CallHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(executor_type),
164  std::optional<std::reference_wrapper<id_type>> call_id = std::nullopt)
165  {
166  return net::async_initiate<CallHandler, void(error_code, response_type)>(
167  initiate_async_call(this),
168  handler,
169  name,
170  std::forward<ArgsTuple>(args),
171  call_id);
172  }
173 
175  template <
176  PACKIO_COMPLETION_TOKEN_FOR(void(error_code, response_type))
177  CallHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type),
178  typename = std::enable_if_t<!internal::is_tuple_v<CallHandler>>>
180  std::string_view name,
181  CallHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(executor_type),
182  std::optional<std::reference_wrapper<id_type>> call_id = std::nullopt)
183  {
184  return async_call(
185  name, std::tuple{}, std::forward<CallHandler>(handler), call_id);
186  }
187 
188 private:
189  using parser_type = typename rpc_type::incremental_parser_type;
190  using async_call_handler_type =
191  internal::movable_function<void(error_code, response_type)>;
192 
193  void close(error_code ec)
194  {
195  net::dispatch(strand_, [self = shared_from_this(), ec] {
196  while (!self->pending_.empty()) {
197  self->async_call_handler(self->pending_.begin()->first, ec, {});
198  }
199 
200  error_code close_ec;
201  self->socket_.close(close_ec);
202  if (close_ec) {
203  PACKIO_WARN("close failed: {}", close_ec.message());
204  }
205  });
206  }
207 
208  void maybe_cancel_reading()
209  {
210  assert(strand_.running_in_this_thread());
211  if (reading_ && pending_.empty()) {
212  PACKIO_DEBUG("stop reading");
213  error_code ec;
214  socket_.cancel(ec);
215  if (ec) {
216  PACKIO_WARN("cancel failed: {}", ec.message());
217  }
218  }
219  }
220 
221  template <typename Buffer, typename WriteHandler>
222  void async_send(std::unique_ptr<Buffer>&& buffer_ptr, WriteHandler&& handler)
223  {
224  wstrand_.push([self = shared_from_this(),
225  buffer_ptr = std::move(buffer_ptr),
226  handler = std::forward<WriteHandler>(handler)]() mutable {
227  assert(self->strand_.running_in_this_thread());
228  internal::set_no_delay(self->socket_);
229 
230  auto buf = rpc_type::buffer(*buffer_ptr);
231  net::async_write(
232  self->socket_,
233  buf,
234  internal::bind_executor(
235  self->strand_,
236  [self,
237  buffer_ptr = std::move(buffer_ptr),
238  handler = std::forward<WriteHandler>(handler)](
239  error_code ec, size_t length) mutable {
240  self->wstrand_.next();
241  handler(ec, length);
242  }));
243  });
244  }
245 
246  void async_read(parser_type&& parser)
247  {
248  parser.reserve_buffer(buffer_reserve_size_);
249  auto buffer = net::buffer(parser.buffer(), parser.buffer_capacity());
250 
251  assert(strand_.running_in_this_thread());
252  reading_ = true;
253  PACKIO_TRACE("reading ... {} call(s) pending", pending_.size());
254  socket_.async_read_some(
255  buffer,
256  internal::bind_executor(
257  strand_,
258  [this, self = shared_from_this(), parser = std::move(parser)](
259 
260  error_code ec, size_t length) mutable {
261  // stop if there is an error or there is no more pending calls
262  assert(self->strand_.running_in_this_thread());
263 
264  if (ec) {
265  PACKIO_WARN("read error: {}", ec.message());
266  self->reading_ = false;
267  if (ec != net::error::operation_aborted)
268  self->close(ec);
269  return;
270  }
271 
272  PACKIO_TRACE("read: {}", length);
273  parser.buffer_consumed(length);
274 
275  while (true) {
276  auto response = parser.get_response();
277  if (!response) {
278  PACKIO_INFO("stop reading: {}", response.error());
279  break;
280  }
281  self->async_call_handler(std::move(*response));
282  }
283 
284  if (self->pending_.empty()) {
285  PACKIO_TRACE("done reading, no more pending calls");
286  self->reading_ = false;
287  return;
288  }
289 
290  self->async_read(std::move(parser));
291  }));
292  }
293 
294  void async_call_handler(response_type&& response)
295  {
296  auto id = response.id;
297  return async_call_handler(id, {}, std::move(response));
298  }
299 
300  void async_call_handler(id_type id, error_code ec, response_type&& response)
301  {
302  net::dispatch(
303  strand_,
304  [ec,
305  id,
306  self = shared_from_this(),
307  response = std::move(response)]() mutable {
308  PACKIO_DEBUG(
309  "calling handler for id: {}", rpc_type::format_id(id));
310 
311  assert(self->strand_.running_in_this_thread());
312  auto it = self->pending_.find(id);
313  if (it == self->pending_.end()) {
314  PACKIO_WARN("unexisting id: {}", rpc_type::format_id(id));
315  return;
316  }
317 
318  auto handler = std::move(it->second);
319  self->pending_.erase(it);
320 
321  // handle the response asynchronously (post)
322  // to schedule the next read immediately
323  // this will allow parallel response handling
324  // in multi-threaded environments
325  net::post(
326  self->socket_.get_executor(),
327  [ec,
328  handler = std::move(handler),
329  response = std::move(response)]() mutable {
330  handler(ec, std::move(response));
331  });
332  });
333  }
334 
335  class initiate_async_notify {
336  public:
337  using executor_type = typename client::executor_type;
338 
339  explicit initiate_async_notify(client* self) : self_(self) {}
340 
341  executor_type get_executor() const noexcept
342  {
343  return self_->get_executor();
344  }
345 
346  template <typename NotifyHandler, typename ArgsTuple>
347  void operator()(
348  NotifyHandler&& handler,
349  std::string_view name,
350  ArgsTuple&& args) const
351  {
352  PACKIO_STATIC_ASSERT_TRAIT(NotifyHandler);
353  PACKIO_DEBUG("async_notify: {}", name);
354 
355  auto packer_buf = internal::to_unique_ptr(std::apply(
356  [&name](auto&&... args) {
357  return rpc_type::serialize_notification(
358  name, std::forward<decltype(args)>(args)...);
359  },
360  std::forward<ArgsTuple>(args))
361 
362  );
363  self_->async_send(
364  std::move(packer_buf),
365  [handler = std::forward<NotifyHandler>(handler),
366  self = self_->shared_from_this()](
367  error_code ec, std::size_t length) mutable {
368  if (ec) {
369  PACKIO_WARN("write error: {}", ec.message());
370  handler(ec);
371  if (ec != net::error::operation_aborted)
372  self->close(ec);
373  return;
374  }
375 
376  PACKIO_TRACE("write: {}", length);
377  (void)length;
378  handler(ec);
379  });
380  }
381 
382  private:
383  client* self_;
384  };
385 
386  class initiate_async_call {
387  public:
388  using executor_type = typename client::executor_type;
389 
390  explicit initiate_async_call(client* self) : self_(self) {}
391 
392  executor_type get_executor() const noexcept
393  {
394  return self_->get_executor();
395  }
396 
397  template <typename CallHandler, typename ArgsTuple>
398  void operator()(
399  CallHandler&& handler,
400  std::string_view name,
401  ArgsTuple&& args,
402  std::optional<std::reference_wrapper<id_type>> opt_call_id) const
403  {
404  PACKIO_STATIC_ASSERT_TTRAIT(CallHandler, rpc_type);
405  PACKIO_DEBUG("async_call: {}", name);
406 
407  id_type call_id = self_->id_.fetch_add(1, std::memory_order_acq_rel);
408  if (opt_call_id) {
409  opt_call_id->get() = call_id;
410  }
411 
412  auto packer_buf = internal::to_unique_ptr(std::apply(
413  [&name, &call_id](auto&&... args) {
414  return rpc_type::serialize_request(
415  call_id, name, std::forward<decltype(args)>(args)...);
416  },
417  std::forward<ArgsTuple>(args)));
418 
419  net::dispatch(
420  self_->strand_,
421  [self = self_->shared_from_this(),
422  call_id,
423  handler = std::forward<CallHandler>(handler),
424  packer_buf = std::move(packer_buf)]() mutable {
425  // we must emplace the id and handler before sending data
426  // otherwise we might drop a fast response
427  assert(self->strand_.running_in_this_thread());
428  self->pending_.try_emplace(call_id, std::move(handler));
429 
430  // if we are not reading, start the read operation
431  if (!self->reading_) {
432  PACKIO_DEBUG("start reading");
433  self->async_read(parser_type{});
434  }
435 
436  // send the request buffer
437  self->async_send(
438  std::move(packer_buf),
439  [self = std::move(self), call_id](
440  error_code ec, std::size_t length) mutable {
441  if (ec) {
442  PACKIO_WARN("write error: {}", ec.message());
443  if (ec != net::error::operation_aborted)
444  self->close(ec);
445  return;
446  }
447 
448  PACKIO_TRACE("write: {}", length);
449  (void)length;
450  });
451  });
452  }
453 
454  private:
455  client* self_;
456  };
457 
458  socket_type socket_;
459  std::size_t buffer_reserve_size_{kDefaultBufferReserveSize};
460  std::atomic<uint64_t> id_{0};
461 
462  net::strand<executor_type> strand_;
463  internal::manual_strand<executor_type> wstrand_;
464 
465  Map<id_type, async_call_handler_type> pending_;
466  bool reading_{false};
467 };
468 
473 template <typename Rpc, typename Socket, template <class...> class Map = default_map>
474 auto make_client(Socket&& socket)
475 {
476  return std::make_shared<client<Rpc, Socket, Map>>(
477  std::forward<Socket>(socket));
478 }
479 
480 } // packio
481 
482 #endif // PACKIO_CLIENT_H
The client class.
Definition: client.h:34
std::size_t get_buffer_reserve_size() const noexcept
Get the size reserved by the reception buffer.
Definition: client.h:73
Rpc rpc_type
The RPC protocol type.
Definition: client.h:37
typename socket_type::protocol_type protocol_type
The protocol type.
Definition: client.h:45
client(socket_type socket)
The constructor.
Definition: client.h:56
void cancel()
Cancel all pending calls.
Definition: client.h:98
typename rpc_type::id_type id_type
The call ID type.
Definition: client.h:39
typename socket_type::executor_type executor_type
The executor type.
Definition: client.h:47
executor_type get_executor()
Get the executor associated with the object.
Definition: client.h:79
socket_type & socket() noexcept
Get the underlying socket.
Definition: client.h:62
Socket socket_type
The socket type.
Definition: client.h:43
typename rpc_type::response_type response_type
The response of a RPC call.
Definition: client.h:41
void set_buffer_reserve_size(std::size_t size) noexcept
Set the size reserved by the reception buffer.
Definition: client.h:68
auto async_call(std::string_view name, CallHandler &&handler=typename net::default_completion_token< executor_type >::type(), std::optional< std::reference_wrapper< id_type >> call_id=std::nullopt)
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: client.h:179
auto async_notify(std::string_view name, NotifyHandler &&handler=typename net::default_completion_token< executor_type >::type())
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: client.h:140
const socket_type & socket() const noexcept
Get the underlying socket, const.
Definition: client.h:65
void cancel(id_type id)
Cancel a pending call.
Definition: client.h:85
auto async_call(std::string_view name, ArgsTuple &&args, CallHandler &&handler=typename net::default_completion_token< executor_type >::type(), std::optional< std::reference_wrapper< id_type >> call_id=std::nullopt)
Call a remote procedure.
Definition: client.h:160
auto async_notify(std::string_view name, ArgsTuple &&args, NotifyHandler &&handler=typename net::default_completion_token< executor_type >::type())
Send a notify request to the server with argument.
Definition: client.h:123
static constexpr size_t kDefaultBufferReserveSize
The default size reserved by the reception buffer.
Definition: client.h:52
::packio::client< rpc, Socket, Map > client
The client for JSON-RPC.
Definition: json_rpc.h:31
The packio namespace.
Definition: arg.h:14
auto make_client(Socket &&socket)
Create a client from a socket.
Definition: client.h:474
Traits definition.