5 #ifndef PACKIO_SERVER_SESSION_H
6 #define PACKIO_SERVER_SESSION_H
15 #include "internal/config.h"
16 #include "internal/log.h"
17 #include "internal/manual_strand.h"
18 #include "internal/rpc.h"
19 #include "internal/utils.h"
24 template <
typename Rpc,
typename Socket,
typename Dispatcher>
26 :
public std::enable_shared_from_this<server_session<Rpc, Socket, Dispatcher>> {
30 typename socket_type::protocol_type;
32 typename socket_type::executor_type;
34 using std::enable_shared_from_this<server_session<Rpc, Socket, Dispatcher>>::shared_from_this;
40 : socket_{std::move(sock)},
41 dispatcher_ptr_{std::move(dispatcher_ptr)},
58 buffer_reserve_size_ = size;
63 return buffer_reserve_size_;
69 net::dispatch(strand_, [
self = shared_from_this()]() {
70 self->async_read(parser_type{});
75 using parser_type =
typename Rpc::incremental_parser_type;
76 using request_type =
typename Rpc::request_type;
78 void async_read(parser_type&& parser)
80 assert(strand_.running_in_this_thread());
83 if (!socket_.is_open()) {
87 parser.reserve_buffer(buffer_reserve_size_);
88 auto buffer = net::buffer(parser.buffer(), parser.buffer_capacity());
90 socket_.async_read_some(
92 internal::bind_executor(
94 [
self = shared_from_this(), parser = std::move(parser)](
95 error_code ec,
size_t length)
mutable {
96 assert(self->strand_.running_in_this_thread());
99 PACKIO_WARN(
"read error: {}", ec.message());
100 self->close_connection();
104 PACKIO_TRACE(
"read: {}", length);
105 parser.buffer_consumed(length);
108 auto request = parser.get_request();
110 PACKIO_INFO(
"stop reading: {}", request.error());
118 self->get_executor(),
119 [
self, request = std::move(*request)]()
mutable {
120 self->async_handle_request(std::move(request));
124 self->async_read(std::move(parser));
128 void async_handle_request(request_type&& request)
130 completion_handler<Rpc> handler(
132 [type = request.type,
id = request.id,
self = shared_from_this()](
133 auto&& response_buffer) {
134 if (type == call_type::request) {
135 PACKIO_TRACE(
"result (id={})", Rpc::format_id(
id));
137 self->async_send_response(std::move(response_buffer));
141 const auto function = dispatcher_ptr_->get(request.method);
144 "call: {} (id={})", request.method, Rpc::format_id(request.id));
145 (*function)(std::move(handler), std::move(request.args));
148 PACKIO_DEBUG(
"unknown function {}", request.method);
149 handler.set_error(
"unknown function");
153 template <
typename Buffer>
154 void async_send_response(Buffer&& response_buffer)
157 if (!socket_.is_open()) {
161 auto message_ptr = internal::to_unique_ptr(std::move(response_buffer));
164 self = shared_from_this(),
165 message_ptr = std::move(message_ptr)]()
mutable {
166 assert(strand_.running_in_this_thread());
167 auto buf = Rpc::buffer(*message_ptr);
171 internal::bind_executor(
173 [
self = std::move(
self), message_ptr = std::move(message_ptr)](
174 error_code ec,
size_t length) {
175 self->wstrand_.next();
178 PACKIO_WARN(
"write error: {}", ec.message());
179 self->close_connection();
183 PACKIO_TRACE(
"write: {}", length);
189 void close_connection()
194 PACKIO_WARN(
"close error: {}", ec.message());
200 std::shared_ptr<Dispatcher> dispatcher_ptr_;
202 net::strand<executor_type> strand_;
203 internal::manual_strand<executor_type> wstrand_;
The server_session class, created by the server.
Definition: server_session.h:26
void set_buffer_reserve_size(std::size_t size) noexcept
Set the size reserved by the reception buffer.
Definition: server_session.h:56
Socket socket_type
The socket type.
Definition: server_session.h:28
const socket_type & socket() const
Get the underlying socket, const.
Definition: server_session.h:50
typename socket_type::executor_type executor_type
The executor type.
Definition: server_session.h:32
typename socket_type::protocol_type protocol_type
The protocol type.
Definition: server_session.h:30
std::size_t get_buffer_reserve_size() const noexcept
Get the size reserved by the reception buffer.
Definition: server_session.h:61
socket_type & socket()
Get the underlying socket.
Definition: server_session.h:48
static constexpr size_t kDefaultBufferReserveSize
The default size reserved by the reception buffer.
Definition: server_session.h:37
void start()
Start the session.
Definition: server_session.h:67
executor_type get_executor()
Get the executor associated with the object.
Definition: server_session.h:53
Class completion_handler.
The packio namespace.
Definition: arg.h:14