packio
rpc.h
1 // This Source Code Form is subject to the terms of the Mozilla Public
2 // License, v. 2.0. If a copy of the MPL was not distributed with this
3 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
4 
5 #ifndef PACKIO_MSGPACK_RPC_RPC_H
6 #define PACKIO_MSGPACK_RPC_RPC_H
7 
8 #include <optional>
9 
10 #include <msgpack.hpp>
11 
12 #include "../arg.h"
13 #include "../args_specs.h"
14 #include "../internal/config.h"
15 #include "../internal/expected.h"
16 #include "../internal/log.h"
17 #include "../internal/rpc.h"
18 
19 namespace packio {
20 namespace msgpack_rpc {
21 namespace internal {
22 
23 template <typename... Args>
24 constexpr bool positional_args_v = (!is_arg_v<Args> && ...);
25 
26 enum class msgpack_rpc_type { request = 0, response = 1, notification = 2 };
27 
28 using id_type = uint32_t;
29 using native_type = ::msgpack::object;
30 using packio::internal::expected;
31 using packio::internal::unexpected;
32 
34 struct request {
35  call_type type;
36  id_type id;
37  std::string method;
38  native_type args;
39 
40  std::unique_ptr<::msgpack::zone> zone;
41 };
42 
44 struct response {
45  id_type id;
46  native_type result;
47  native_type error;
48 
49  std::unique_ptr<::msgpack::zone> zone;
50 };
51 
54 public:
55  incremental_parser() : unpacker_{std::make_unique<::msgpack::unpacker>()} {}
56 
57  expected<request, std::string> get_request()
58  {
59  try_parse_object();
60  if (!parsed_) {
61  return unexpected{"no request parsed"};
62  }
63  auto object = std::move(*parsed_);
64  parsed_.reset();
65  return parse_request(std::move(object));
66  }
67 
68  expected<response, std::string> get_response()
69  {
70  try_parse_object();
71  if (!parsed_) {
72  return unexpected{"no response parsed"};
73  }
74  auto object = std::move(*parsed_);
75  parsed_.reset();
76  return parse_response(std::move(object));
77  }
78 
79  char* buffer() const
80  { //
81  return unpacker_->buffer();
82  }
83 
84  std::size_t buffer_capacity() const { return unpacker_->buffer_capacity(); }
85 
86  void buffer_consumed(std::size_t bytes)
87  {
88  unpacker_->buffer_consumed(bytes);
89  }
90 
91  void reserve_buffer(std::size_t bytes) { unpacker_->reserve_buffer(bytes); }
92 
93 private:
94  void try_parse_object()
95  {
96  if (parsed_) {
97  return;
98  }
99  ::msgpack::object_handle object;
100  if (unpacker_->next(object)) {
101  parsed_ = std::move(object);
102  }
103  }
104 
105  static expected<response, std::string> parse_response(
106  ::msgpack::object_handle&& res)
107  {
108  if (res->type != ::msgpack::type::ARRAY) {
109  return unexpected{
110  "unexpected message type: " + std::to_string(res->type)};
111  }
112  if (res->via.array.size != 4) {
113  return unexpected{
114  "unexpected message size: " + std::to_string(res->via.array.size)};
115  }
116  int type = res->via.array.ptr[0].as<int>();
117  if (type != static_cast<int>(msgpack_rpc_type::response)) {
118  return unexpected{"unexpected type: " + std::to_string(type)};
119  }
120 
121  response parsed;
122  parsed.zone = std::move(res.zone());
123  const auto& array = res->via.array.ptr;
124 
125  parsed.id = array[1].as<id_type>();
126  if (array[2].type != ::msgpack::type::NIL) {
127  parsed.error = array[2];
128  }
129  else {
130  parsed.result = array[3];
131  }
132  return {std::move(parsed)};
133  }
134 
135  static expected<request, std::string> parse_request(::msgpack::object_handle&& req)
136  {
137  if (req->type != ::msgpack::type::ARRAY || req->via.array.size < 3) {
138  return unexpected{
139  "unexpected message type: " + std::to_string(req->type)};
140  }
141 
142  request parsed;
143  parsed.zone = std::move(req.zone());
144  const auto& array = req->via.array.ptr;
145  auto array_size = req->via.array.size;
146  ;
147 
148  try {
149  int idx = 0;
150  msgpack_rpc_type type = static_cast<msgpack_rpc_type>(
151  array[idx++].as<int>());
152 
153  std::size_t expected_size;
154  switch (type) {
155  case msgpack_rpc_type::request:
156  parsed.id = array[idx++].as<id_type>();
157  expected_size = 4;
158  parsed.type = call_type::request;
159  break;
160  case msgpack_rpc_type::notification:
161  expected_size = 3;
162  parsed.type = call_type::notification;
163  break;
164  default:
165  return unexpected{
166  "unexpected type: " + std::to_string(static_cast<int>(type))};
167  }
168 
169  if (array_size != expected_size) {
170  return unexpected{
171  "unexpected message size: " + std::to_string(array_size)};
172  }
173 
174  parsed.method = array[idx++].as<std::string>();
175  parsed.args = array[idx++];
176 
177  return {std::move(parsed)};
178  }
179  catch (::msgpack::type_error& exc) {
180  return unexpected{
181  std::string{"unexpected message content: "} + exc.what()};
182  }
183  }
184 
185  std::optional<::msgpack::object_handle> parsed_;
186  std::unique_ptr<::msgpack::unpacker> unpacker_;
187 };
188 
189 } // internal
190 
192 class rpc {
193 public:
195  using id_type = internal::id_type;
196 
198  using native_type = internal::native_type;
199 
202 
205 
208 
209  static std::string format_id(const id_type& id)
210  {
211  return std::to_string(id);
212  }
213 
214  template <typename... Args>
215  static auto serialize_notification(std::string_view method, Args&&... args)
216  -> std::enable_if_t<internal::positional_args_v<Args...>, ::msgpack::sbuffer>
217  {
218  ::msgpack::sbuffer buffer;
219  ::msgpack::pack(
220  buffer,
221  std::forward_as_tuple(
222  static_cast<int>(internal::msgpack_rpc_type::notification),
223  method,
224  std::forward_as_tuple(std::forward<Args>(args)...)));
225  return buffer;
226  }
227 
228  template <typename... Args>
229  static auto serialize_notification(std::string_view, Args&&...)
230  -> std::enable_if_t<!internal::positional_args_v<Args...>, ::msgpack::sbuffer>
231  {
232  static_assert(
233  internal::positional_args_v<Args...>,
234  "msgpack-RPC does not support named arguments");
235  }
236 
237  template <typename... Args>
238  static auto serialize_request(id_type id, std::string_view method, Args&&... args)
239  -> std::enable_if_t<internal::positional_args_v<Args...>, ::msgpack::sbuffer>
240  {
241  ::msgpack::sbuffer buffer;
242  ::msgpack::pack(
243  buffer,
244  std::forward_as_tuple(
245  static_cast<int>(internal::msgpack_rpc_type::request),
246  id,
247  method,
248  std::forward_as_tuple(std::forward<Args>(args)...)));
249  return buffer;
250  }
251 
252  template <typename... Args>
253  static auto serialize_request(id_type, std::string_view, Args&&...)
254  -> std::enable_if_t<!internal::positional_args_v<Args...>, ::msgpack::sbuffer>
255  {
256  static_assert(
257  internal::positional_args_v<Args...>,
258  "msgpack-RPC does not support named arguments");
259  }
260 
261  static ::msgpack::sbuffer serialize_response(id_type id)
262  {
263  return serialize_response(id, ::msgpack::object{});
264  }
265 
266  template <typename T>
267  static ::msgpack::sbuffer serialize_response(id_type id, T&& value)
268  {
269  ::msgpack::sbuffer buffer;
270  ::msgpack::pack(
271  buffer,
272  std::forward_as_tuple(
273  static_cast<int>(internal::msgpack_rpc_type::response),
274  id,
275  ::msgpack::object{},
276  std::forward<T>(value)));
277  return buffer;
278  }
279 
280  template <typename T>
281  static ::msgpack::sbuffer serialize_error_response(id_type id, T&& value)
282  {
283  ::msgpack::sbuffer buffer;
284  ::msgpack::pack(
285  buffer,
286  std::forward_as_tuple(
287  static_cast<int>(internal::msgpack_rpc_type::response),
288  id,
289  std::forward<T>(value),
290  ::msgpack::object{}));
291  return buffer;
292  }
293 
294  static net::const_buffer buffer(const ::msgpack::sbuffer& buf)
295  {
296  return net::const_buffer(buf.data(), buf.size());
297  }
298 
299  template <typename T, typename F>
300  static internal::expected<T, std::string> extract_args(
301  const ::msgpack::object& args,
302  const args_specs<F>& specs)
303  {
304  try {
305  if (args.type != ::msgpack::type::ARRAY) {
306  throw std::runtime_error{"arguments is not an array"};
307  }
308  return convert_positional_args<T>(args.via.array, specs);
309  }
310  catch (const std::exception& exc) {
311  return internal::unexpected{
312  std::string{"cannot convert arguments: "} + exc.what()};
313  }
314  }
315 
316 private:
317  template <typename T, typename F>
318  static constexpr T convert_positional_args(
319  const ::msgpack::object_array& array,
320  const args_specs<F>& specs)
321  {
322  return convert_positional_args<T>(
323  array, specs, std::make_index_sequence<args_specs<F>::size()>());
324  }
325 
326  template <typename T, typename F, std::size_t... Idxs>
327  static constexpr T convert_positional_args(
328  const ::msgpack::object_array& array,
329  const args_specs<F>& specs,
330  std::index_sequence<Idxs...>)
331  {
332  if (!specs.options().allow_extra_arguments
333  && array.size > std::tuple_size_v<T>) {
334  throw std::runtime_error{"too many arguments"};
335  }
336  return {[&]() {
337  if (Idxs < array.size) {
338  try {
339  return array.ptr[Idxs].as<std::tuple_element_t<Idxs, T>>();
340  }
341  catch (const ::msgpack::type_error&) {
342  throw std::runtime_error{
343  "invalid type for argument "
344  + specs.template get<Idxs>().name()};
345  }
346  }
347  if (const auto& value = specs.template get<Idxs>().default_value()) {
348  return *value;
349  }
350  throw std::runtime_error{
351  "no value for argument " + specs.template get<Idxs>().name()};
352  }()...};
353  }
354 };
355 
356 } // msgpack_rpc
357 } // packio
358 
359 #endif // PACKIO_MSGPACK_RPC_RPC_H
The incremental parser for msgpack-RPC objects.
Definition: rpc.h:53
The msgpack RPC protocol implementation.
Definition: rpc.h:192
internal::native_type native_type
The native type of the serialization library.
Definition: rpc.h:198
internal::id_type id_type
Type of the call ID.
Definition: rpc.h:195
The packio namespace.
Definition: arg.h:14
The object representing a client request.
Definition: rpc.h:34
std::unique_ptr<::msgpack::zone > zone
Msgpack zone storing the args.
Definition: rpc.h:40
The object representing the response to a call.
Definition: rpc.h:44
std::unique_ptr<::msgpack::zone > zone
Msgpack zone storing error and result.
Definition: rpc.h:49