packio
server_session.h
Go to the documentation of this file.
1 // This Source Code Form is subject to the terms of the Mozilla Public
2 // License, v. 2.0. If a copy of the MPL was not distributed with this
3 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
4 
5 #ifndef PACKIO_SERVER_SESSION_H
6 #define PACKIO_SERVER_SESSION_H
7 
10 
11 #include <memory>
12 #include <queue>
13 
14 #include "handler.h"
15 #include "internal/config.h"
16 #include "internal/log.h"
17 #include "internal/manual_strand.h"
18 #include "internal/rpc.h"
19 #include "internal/utils.h"
20 
21 namespace packio {
22 
24 template <typename Rpc, typename Socket, typename Dispatcher>
26  : public std::enable_shared_from_this<server_session<Rpc, Socket, Dispatcher>> {
27 public:
28  using socket_type = Socket;
29  using protocol_type =
30  typename socket_type::protocol_type;
31  using executor_type =
32  typename socket_type::executor_type;
33 
34  using std::enable_shared_from_this<server_session<Rpc, Socket, Dispatcher>>::shared_from_this;
35 
37  static constexpr size_t kDefaultBufferReserveSize = 4096;
38 
39  server_session(socket_type sock, std::shared_ptr<Dispatcher> dispatcher_ptr)
40  : socket_{std::move(sock)},
41  dispatcher_ptr_{std::move(dispatcher_ptr)},
42  strand_(socket_.get_executor()),
43  wstrand_{strand_}
44  {
45  }
46 
48  socket_type& socket() { return socket_; }
50  const socket_type& socket() const { return socket_; }
51 
53  executor_type get_executor() { return socket().get_executor(); }
54 
56  void set_buffer_reserve_size(std::size_t size) noexcept
57  {
58  buffer_reserve_size_ = size;
59  }
61  std::size_t get_buffer_reserve_size() const noexcept
62  {
63  return buffer_reserve_size_;
64  }
65 
67  void start()
68  {
69  net::dispatch(strand_, [self = shared_from_this()]() {
70  self->async_read(parser_type{});
71  });
72  }
73 
74 private:
75  using parser_type = typename Rpc::incremental_parser_type;
76  using request_type = typename Rpc::request_type;
77 
78  void async_read(parser_type&& parser)
79  {
80  assert(strand_.running_in_this_thread());
81 
82  // abort R/W on error
83  if (!socket_.is_open()) {
84  return;
85  }
86 
87  parser.reserve_buffer(buffer_reserve_size_);
88  auto buffer = net::buffer(parser.buffer(), parser.buffer_capacity());
89 
90  socket_.async_read_some(
91  buffer,
92  internal::bind_executor(
93  strand_,
94  [self = shared_from_this(), parser = std::move(parser)](
95  error_code ec, size_t length) mutable {
96  assert(self->strand_.running_in_this_thread());
97 
98  if (ec) {
99  PACKIO_WARN("read error: {}", ec.message());
100  self->close_connection();
101  return;
102  }
103 
104  PACKIO_TRACE("read: {}", length);
105  parser.buffer_consumed(length);
106 
107  while (true) {
108  auto request = parser.get_request();
109  if (!request) {
110  PACKIO_INFO("stop reading: {}", request.error());
111  break;
112  }
113  // handle the call asynchronously (post)
114  // to schedule the next read immediately
115  // this will allow parallel call handling
116  // in multi-threaded environments
117  net::post(
118  self->get_executor(),
119  [self, request = std::move(*request)]() mutable {
120  self->async_handle_request(std::move(request));
121  });
122  }
123 
124  self->async_read(std::move(parser));
125  }));
126  }
127 
128  void async_handle_request(request_type&& request)
129  {
130  completion_handler<Rpc> handler(
131  request.id,
132  [type = request.type, id = request.id, self = shared_from_this()](
133  auto&& response_buffer) {
134  if (type == call_type::request) {
135  PACKIO_TRACE("result (id={})", Rpc::format_id(id));
136  (void)id;
137  self->async_send_response(std::move(response_buffer));
138  }
139  });
140 
141  const auto function = dispatcher_ptr_->get(request.method);
142  if (function) {
143  PACKIO_TRACE(
144  "call: {} (id={})", request.method, Rpc::format_id(request.id));
145  (*function)(std::move(handler), std::move(request.args));
146  }
147  else {
148  PACKIO_DEBUG("unknown function {}", request.method);
149  handler.set_error("unknown function");
150  }
151  }
152 
153  template <typename Buffer>
154  void async_send_response(Buffer&& response_buffer)
155  {
156  // abort R/W on error
157  if (!socket_.is_open()) {
158  return;
159  }
160 
161  auto message_ptr = internal::to_unique_ptr(std::move(response_buffer));
162 
163  wstrand_.push([this,
164  self = shared_from_this(),
165  message_ptr = std::move(message_ptr)]() mutable {
166  assert(strand_.running_in_this_thread());
167  auto buf = Rpc::buffer(*message_ptr);
168  net::async_write(
169  socket_,
170  buf,
171  internal::bind_executor(
172  strand_,
173  [self = std::move(self), message_ptr = std::move(message_ptr)](
174  error_code ec, size_t length) {
175  self->wstrand_.next();
176 
177  if (ec) {
178  PACKIO_WARN("write error: {}", ec.message());
179  self->close_connection();
180  return;
181  }
182 
183  PACKIO_TRACE("write: {}", length);
184  (void)length;
185  }));
186  });
187  }
188 
189  void close_connection()
190  {
191  error_code ec;
192  socket_.close(ec);
193  if (ec) {
194  PACKIO_WARN("close error: {}", ec.message());
195  }
196  }
197 
198  socket_type socket_;
199  std::size_t buffer_reserve_size_{kDefaultBufferReserveSize};
200  std::shared_ptr<Dispatcher> dispatcher_ptr_;
201 
202  net::strand<executor_type> strand_;
203  internal::manual_strand<executor_type> wstrand_;
204 };
205 
206 } // packio
207 
208 #endif // PACKIO_SERVER_SESSION_H
The server_session class, created by the server.
Definition: server_session.h:26
void set_buffer_reserve_size(std::size_t size) noexcept
Set the size reserved by the reception buffer.
Definition: server_session.h:56
Socket socket_type
The socket type.
Definition: server_session.h:28
const socket_type & socket() const
Get the underlying socket, const.
Definition: server_session.h:50
typename socket_type::executor_type executor_type
The executor type.
Definition: server_session.h:32
typename socket_type::protocol_type protocol_type
The protocol type.
Definition: server_session.h:30
std::size_t get_buffer_reserve_size() const noexcept
Get the size reserved by the reception buffer.
Definition: server_session.h:61
socket_type & socket()
Get the underlying socket.
Definition: server_session.h:48
static constexpr size_t kDefaultBufferReserveSize
The default size reserved by the reception buffer.
Definition: server_session.h:37
void start()
Start the session.
Definition: server_session.h:67
executor_type get_executor()
Get the executor associated with the object.
Definition: server_session.h:53
Class completion_handler.
The packio namespace.
Definition: arg.h:14