5 #ifndef PACKIO_CLIENT_H
6 #define PACKIO_CLIENT_H
16 #include <string_view>
17 #include <type_traits>
20 #include "internal/config.h"
21 #include "internal/manual_strand.h"
22 #include "internal/movable_function.h"
23 #include "internal/rpc.h"
24 #include "internal/utils.h"
33 template <
typename Rpc,
typename Socket,
template <
class...>
class Map = default_map>
34 class client :
public std::enable_shared_from_this<client<Rpc, Socket, Map>> {
39 using id_type =
typename rpc_type::id_type;
49 using std::enable_shared_from_this<client<Rpc, Socket, Map>>::shared_from_this;
70 buffer_reserve_size_ = size;
75 return buffer_reserve_size_;
87 PACKIO_TRACE(
"cancel {}", rpc_type::format_id(
id));
88 net::dispatch(strand_, [
self = shared_from_this(),
id] {
89 auto ec = make_error_code(net::error::operation_aborted);
90 self->async_call_handler(
id, ec, {});
91 self->maybe_cancel_reading();
100 PACKIO_TRACE(
"cancel all");
101 net::dispatch(strand_, [
self = shared_from_this()] {
102 auto ec = make_error_code(net::error::operation_aborted);
103 while (!self->pending_.empty()) {
104 self->async_call_handler(self->pending_.begin()->first, ec, {});
106 self->maybe_cancel_reading();
119 PACKIO_COMPLETION_TOKEN_FOR(
void(error_code))
120 NotifyHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(
executor_type),
122 typename = std::enable_if_t<internal::is_tuple_v<ArgsTuple>>>
124 std::string_view name,
126 NotifyHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(
executor_type))
128 return net::async_initiate<NotifyHandler, void(error_code)>(
129 initiate_async_notify(
this),
132 std::forward<ArgsTuple>(args));
137 PACKIO_COMPLETION_TOKEN_FOR(
void(error_code))
138 NotifyHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(
executor_type),
139 typename = std::enable_if_t<!internal::is_tuple_v<NotifyHandler>>>
141 std::string_view name,
142 NotifyHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(
executor_type))
145 name, std::tuple{}, std::forward<NotifyHandler>(handler));
157 CallHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(
executor_type),
159 typename = std::enable_if_t<internal::is_tuple_v<ArgsTuple>>>
161 std::string_view name,
163 CallHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(
executor_type),
164 std::optional<std::reference_wrapper<id_type>> call_id = std::nullopt)
166 return net::async_initiate<CallHandler, void(error_code, response_type)>(
167 initiate_async_call(
this),
170 std::forward<ArgsTuple>(args),
177 CallHandler PACKIO_DEFAULT_COMPLETION_TOKEN_TYPE(
executor_type),
178 typename = std::enable_if_t<!internal::is_tuple_v<CallHandler>>>
180 std::string_view name,
181 CallHandler&& handler PACKIO_DEFAULT_COMPLETION_TOKEN(
executor_type),
182 std::optional<std::reference_wrapper<id_type>> call_id = std::nullopt)
185 name, std::tuple{}, std::forward<CallHandler>(handler), call_id);
189 using parser_type =
typename rpc_type::incremental_parser_type;
190 using async_call_handler_type =
193 void close(error_code ec)
195 net::dispatch(strand_, [
self = shared_from_this(), ec] {
196 while (!self->pending_.empty()) {
197 self->async_call_handler(self->pending_.begin()->first, ec, {});
201 self->socket_.close(close_ec);
203 PACKIO_WARN(
"close failed: {}", close_ec.message());
208 void maybe_cancel_reading()
210 assert(strand_.running_in_this_thread());
211 if (reading_ && pending_.empty()) {
212 PACKIO_DEBUG(
"stop reading");
216 PACKIO_WARN(
"cancel failed: {}", ec.message());
221 template <
typename Buffer,
typename WriteHandler>
222 void async_send(std::unique_ptr<Buffer>&& buffer_ptr, WriteHandler&& handler)
224 wstrand_.push([
self = shared_from_this(),
225 buffer_ptr = std::move(buffer_ptr),
226 handler = std::forward<WriteHandler>(handler)]()
mutable {
227 assert(self->strand_.running_in_this_thread());
228 internal::set_no_delay(self->socket_);
230 auto buf = rpc_type::buffer(*buffer_ptr);
234 internal::bind_executor(
237 buffer_ptr = std::move(buffer_ptr),
238 handler = std::forward<WriteHandler>(handler)](
239 error_code ec,
size_t length)
mutable {
240 self->wstrand_.next();
246 void async_read(parser_type&& parser)
248 parser.reserve_buffer(buffer_reserve_size_);
249 auto buffer = net::buffer(parser.buffer(), parser.buffer_capacity());
251 assert(strand_.running_in_this_thread());
253 PACKIO_TRACE(
"reading ... {} call(s) pending", pending_.size());
254 socket_.async_read_some(
256 internal::bind_executor(
258 [
this,
self = shared_from_this(), parser = std::move(parser)](
260 error_code ec,
size_t length)
mutable {
262 assert(self->strand_.running_in_this_thread());
265 PACKIO_WARN(
"read error: {}", ec.message());
266 self->reading_ =
false;
267 if (ec != net::error::operation_aborted)
272 PACKIO_TRACE(
"read: {}", length);
273 parser.buffer_consumed(length);
276 auto response = parser.get_response();
278 PACKIO_INFO(
"stop reading: {}", response.error());
281 self->async_call_handler(std::move(*response));
284 if (self->pending_.empty()) {
285 PACKIO_TRACE(
"done reading, no more pending calls");
286 self->reading_ =
false;
290 self->async_read(std::move(parser));
294 void async_call_handler(response_type&& response)
296 auto id = response.id;
297 return async_call_handler(
id, {}, std::move(response));
300 void async_call_handler(id_type
id, error_code ec, response_type&& response)
306 self = shared_from_this(),
307 response = std::move(response)]()
mutable {
309 "calling handler for id: {}", rpc_type::format_id(
id));
311 assert(self->strand_.running_in_this_thread());
312 auto it =
self->pending_.find(
id);
313 if (it == self->pending_.end()) {
314 PACKIO_WARN(
"unexisting id: {}", rpc_type::format_id(
id));
318 auto handler = std::move(it->second);
319 self->pending_.erase(it);
326 self->socket_.get_executor(),
328 handler = std::move(handler),
329 response = std::move(response)]()
mutable {
330 handler(ec, std::move(response));
335 class initiate_async_notify {
339 explicit initiate_async_notify(
client*
self) : self_(self) {}
341 executor_type get_executor() const noexcept
343 return self_->get_executor();
346 template <
typename NotifyHandler,
typename ArgsTuple>
348 NotifyHandler&& handler,
349 std::string_view name,
350 ArgsTuple&& args)
const
352 PACKIO_STATIC_ASSERT_TRAIT(NotifyHandler);
353 PACKIO_DEBUG(
"async_notify: {}", name);
355 auto packer_buf = internal::to_unique_ptr(std::apply(
356 [&name](
auto&&... args) {
357 return rpc_type::serialize_notification(
358 name, std::forward<decltype(args)>(args)...);
360 std::forward<ArgsTuple>(args))
364 std::move(packer_buf),
365 [handler = std::forward<NotifyHandler>(handler),
366 self = self_->shared_from_this()](
367 error_code ec, std::size_t length)
mutable {
369 PACKIO_WARN(
"write error: {}", ec.message());
371 if (ec != net::error::operation_aborted)
376 PACKIO_TRACE(
"write: {}", length);
386 class initiate_async_call {
388 using executor_type =
typename client::executor_type;
390 explicit initiate_async_call(
client*
self) : self_(self) {}
392 executor_type get_executor() const noexcept
394 return self_->get_executor();
397 template <
typename CallHandler,
typename ArgsTuple>
399 CallHandler&& handler,
400 std::string_view name,
402 std::optional<std::reference_wrapper<id_type>> opt_call_id)
const
404 PACKIO_STATIC_ASSERT_TTRAIT(CallHandler, rpc_type);
405 PACKIO_DEBUG(
"async_call: {}", name);
407 id_type call_id = self_->id_.fetch_add(1, std::memory_order_acq_rel);
409 opt_call_id->get() = call_id;
412 auto packer_buf = internal::to_unique_ptr(std::apply(
413 [&name, &call_id](
auto&&... args) {
414 return rpc_type::serialize_request(
415 call_id, name, std::forward<decltype(args)>(args)...);
417 std::forward<ArgsTuple>(args)));
421 [
self = self_->shared_from_this(),
423 handler = std::forward<CallHandler>(handler),
424 packer_buf = std::move(packer_buf)]()
mutable {
427 assert(self->strand_.running_in_this_thread());
428 self->pending_.try_emplace(call_id, std::move(handler));
431 if (!self->reading_) {
432 PACKIO_DEBUG(
"start reading");
433 self->async_read(parser_type{});
438 std::move(packer_buf),
439 [
self = std::move(
self), call_id](
440 error_code ec, std::size_t length)
mutable {
442 PACKIO_WARN(
"write error: {}", ec.message());
443 if (ec != net::error::operation_aborted)
448 PACKIO_TRACE(
"write: {}", length);
459 std::size_t buffer_reserve_size_{kDefaultBufferReserveSize};
460 std::atomic<uint64_t> id_{0};
462 net::strand<executor_type> strand_;
463 internal::manual_strand<executor_type> wstrand_;
465 Map<id_type, async_call_handler_type> pending_;
466 bool reading_{
false};
473 template <
typename Rpc,
typename Socket,
template <
class...>
class Map = default_map>
476 return std::make_shared<client<Rpc, Socket, Map>>(
477 std::forward<Socket>(socket));
The client class.
Definition: client.h:34
std::size_t get_buffer_reserve_size() const noexcept
Get the size reserved by the reception buffer.
Definition: client.h:73
Rpc rpc_type
The RPC protocol type.
Definition: client.h:37
typename socket_type::protocol_type protocol_type
The protocol type.
Definition: client.h:45
client(socket_type socket)
The constructor.
Definition: client.h:56
void cancel()
Cancel all pending calls.
Definition: client.h:98
typename rpc_type::id_type id_type
The call ID type.
Definition: client.h:39
typename socket_type::executor_type executor_type
The executor type.
Definition: client.h:47
executor_type get_executor()
Get the executor associated with the object.
Definition: client.h:79
socket_type & socket() noexcept
Get the underlying socket.
Definition: client.h:62
Socket socket_type
The socket type.
Definition: client.h:43
typename rpc_type::response_type response_type
The response of a RPC call.
Definition: client.h:41
void set_buffer_reserve_size(std::size_t size) noexcept
Set the size reserved by the reception buffer.
Definition: client.h:68
auto async_call(std::string_view name, CallHandler &&handler=typename net::default_completion_token< executor_type >::type(), std::optional< std::reference_wrapper< id_type >> call_id=std::nullopt)
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: client.h:179
auto async_notify(std::string_view name, NotifyHandler &&handler=typename net::default_completion_token< executor_type >::type())
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition: client.h:140
const socket_type & socket() const noexcept
Get the underlying socket, const.
Definition: client.h:65
void cancel(id_type id)
Cancel a pending call.
Definition: client.h:85
auto async_call(std::string_view name, ArgsTuple &&args, CallHandler &&handler=typename net::default_completion_token< executor_type >::type(), std::optional< std::reference_wrapper< id_type >> call_id=std::nullopt)
Call a remote procedure.
Definition: client.h:160
auto async_notify(std::string_view name, ArgsTuple &&args, NotifyHandler &&handler=typename net::default_completion_token< executor_type >::type())
Send a notify request to the server with argument.
Definition: client.h:123
static constexpr size_t kDefaultBufferReserveSize
The default size reserved by the reception buffer.
Definition: client.h:52
::packio::client< rpc, Socket, Map > client
The client for JSON-RPC.
Definition: json_rpc.h:31
The packio namespace.
Definition: arg.h:14
auto make_client(Socket &&socket)
Create a client from a socket.
Definition: client.h:474