GRPC C++  1.20.0
method_handler_impl.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
20 #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
21 
26 
27 namespace grpc {
28 
29 namespace internal {
30 
31 // Invoke the method handler, fill in the status, and
32 // return whether or not we finished safely (without an exception).
33 // Note that exception handling is 0-cost in most compiler/library
34 // implementations (except when an exception is actually thrown),
35 // so this process doesn't require additional overhead in the common case.
36 // Additionally, we don't need to return if we caught an exception or not;
37 // the handling is the same in either case.
38 template <class Callable>
39 Status CatchingFunctionHandler(Callable&& handler) {
40 #if GRPC_ALLOW_EXCEPTIONS
41  try {
42  return handler();
43  } catch (...) {
44  return Status(StatusCode::UNKNOWN, "Unexpected error in RPC handling");
45  }
46 #else // GRPC_ALLOW_EXCEPTIONS
47  return handler();
48 #endif // GRPC_ALLOW_EXCEPTIONS
49 }
50 
52 template <class ServiceType, class RequestType, class ResponseType>
53 class RpcMethodHandler : public MethodHandler {
54  public:
55  RpcMethodHandler(std::function<Status(ServiceType*, ServerContext*,
56  const RequestType*, ResponseType*)>
57  func,
58  ServiceType* service)
59  : func_(func), service_(service) {}
60 
61  void RunHandler(const HandlerParameter& param) final {
62  ResponseType rsp;
63  Status status = param.status;
64  if (status.ok()) {
65  status = CatchingFunctionHandler([this, &param, &rsp] {
66  return func_(service_, param.server_context,
67  static_cast<RequestType*>(param.request), &rsp);
68  });
69  static_cast<RequestType*>(param.request)->~RequestType();
70  }
71 
72  GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
75  ops;
76  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
77  param.server_context->initial_metadata_flags());
78  if (param.server_context->compression_level_set()) {
79  ops.set_compression_level(param.server_context->compression_level());
80  }
81  if (status.ok()) {
82  status = ops.SendMessagePtr(&rsp);
83  }
84  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
85  param.call->PerformOps(&ops);
86  param.call->cq()->Pluck(&ops);
87  }
88 
90  Status* status) final {
91  ByteBuffer buf;
92  buf.set_buffer(req);
93  auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
94  call, sizeof(RequestType))) RequestType();
95  *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
96  buf.Release();
97  if (status->ok()) {
98  return request;
99  }
100  request->~RequestType();
101  return nullptr;
102  }
103 
104  private:
106  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
107  ResponseType*)>
108  func_;
109  // The class the above handler function lives in.
110  ServiceType* service_;
111 };
112 
114 template <class ServiceType, class RequestType, class ResponseType>
115 class ClientStreamingHandler : public MethodHandler {
116  public:
118  std::function<Status(ServiceType*, ServerContext*,
119  ServerReader<RequestType>*, ResponseType*)>
120  func,
121  ServiceType* service)
122  : func_(func), service_(service) {}
123 
124  void RunHandler(const HandlerParameter& param) final {
125  ServerReader<RequestType> reader(param.call, param.server_context);
126  ResponseType rsp;
127  Status status = CatchingFunctionHandler([this, &param, &reader, &rsp] {
128  return func_(service_, param.server_context, &reader, &rsp);
129  });
130 
133  ops;
134  if (!param.server_context->sent_initial_metadata_) {
135  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
136  param.server_context->initial_metadata_flags());
137  if (param.server_context->compression_level_set()) {
138  ops.set_compression_level(param.server_context->compression_level());
139  }
140  }
141  if (status.ok()) {
142  status = ops.SendMessagePtr(&rsp);
143  }
144  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
145  param.call->PerformOps(&ops);
146  param.call->cq()->Pluck(&ops);
147  }
148 
149  private:
150  std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
151  ResponseType*)>
152  func_;
153  ServiceType* service_;
154 };
155 
157 template <class ServiceType, class RequestType, class ResponseType>
158 class ServerStreamingHandler : public MethodHandler {
159  public:
161  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
163  func,
164  ServiceType* service)
165  : func_(func), service_(service) {}
166 
167  void RunHandler(const HandlerParameter& param) final {
168  Status status = param.status;
169  if (status.ok()) {
170  ServerWriter<ResponseType> writer(param.call, param.server_context);
171  status = CatchingFunctionHandler([this, &param, &writer] {
172  return func_(service_, param.server_context,
173  static_cast<RequestType*>(param.request), &writer);
174  });
175  static_cast<RequestType*>(param.request)->~RequestType();
176  }
177 
179  if (!param.server_context->sent_initial_metadata_) {
180  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
181  param.server_context->initial_metadata_flags());
182  if (param.server_context->compression_level_set()) {
183  ops.set_compression_level(param.server_context->compression_level());
184  }
185  }
186  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
187  param.call->PerformOps(&ops);
188  if (param.server_context->has_pending_ops_) {
189  param.call->cq()->Pluck(&param.server_context->pending_ops_);
190  }
191  param.call->cq()->Pluck(&ops);
192  }
193 
195  Status* status) final {
196  ByteBuffer buf;
197  buf.set_buffer(req);
198  auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
199  call, sizeof(RequestType))) RequestType();
200  *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
201  buf.Release();
202  if (status->ok()) {
203  return request;
204  }
205  request->~RequestType();
206  return nullptr;
207  }
208 
209  private:
210  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
212  func_;
213  ServiceType* service_;
214 };
215 
223 template <class Streamer, bool WriteNeeded>
225  public:
227  std::function<Status(ServerContext*, Streamer*)> func)
228  : func_(func), write_needed_(WriteNeeded) {}
229 
230  void RunHandler(const HandlerParameter& param) final {
231  Streamer stream(param.call, param.server_context);
232  Status status = CatchingFunctionHandler([this, &param, &stream] {
233  return func_(param.server_context, &stream);
234  });
235 
237  if (!param.server_context->sent_initial_metadata_) {
238  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
239  param.server_context->initial_metadata_flags());
240  if (param.server_context->compression_level_set()) {
241  ops.set_compression_level(param.server_context->compression_level());
242  }
243  if (write_needed_ && status.ok()) {
244  // If we needed a write but never did one, we need to mark the
245  // status as a fail
246  status = Status(StatusCode::INTERNAL,
247  "Service did not provide response message");
248  }
249  }
250  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
251  param.call->PerformOps(&ops);
252  if (param.server_context->has_pending_ops_) {
253  param.call->cq()->Pluck(&param.server_context->pending_ops_);
254  }
255  param.call->cq()->Pluck(&ops);
256  }
257 
258  private:
259  std::function<Status(ServerContext*, Streamer*)> func_;
260  const bool write_needed_;
261 };
262 
263 template <class ServiceType, class RequestType, class ResponseType>
266  ServerReaderWriter<ResponseType, RequestType>, false> {
267  public:
269  std::function<Status(ServiceType*, ServerContext*,
271  func,
272  ServiceType* service)
274  ServerReaderWriter<ResponseType, RequestType>, false>(std::bind(
275  func, service, std::placeholders::_1, std::placeholders::_2)) {}
276 };
277 
278 template <class RequestType, class ResponseType>
281  ServerUnaryStreamer<RequestType, ResponseType>, true> {
282  public:
284  std::function<Status(ServerContext*,
286  func)
288  ServerUnaryStreamer<RequestType, ResponseType>, true>(func) {}
289 };
290 
291 template <class RequestType, class ResponseType>
294  ServerSplitStreamer<RequestType, ResponseType>, false> {
295  public:
297  std::function<Status(ServerContext*,
299  func)
301  ServerSplitStreamer<RequestType, ResponseType>, false>(func) {}
302 };
303 
306 template <StatusCode code>
307 class ErrorMethodHandler : public MethodHandler {
308  public:
309  template <class T>
310  static void FillOps(ServerContext* context, T* ops) {
311  Status status(code, "");
312  if (!context->sent_initial_metadata_) {
313  ops->SendInitialMetadata(&context->initial_metadata_,
314  context->initial_metadata_flags());
315  if (context->compression_level_set()) {
316  ops->set_compression_level(context->compression_level());
317  }
318  context->sent_initial_metadata_ = true;
319  }
320  ops->ServerSendStatus(&context->trailing_metadata_, status);
321  }
322 
323  void RunHandler(const HandlerParameter& param) final {
325  FillOps(param.server_context, &ops);
326  param.call->PerformOps(&ops);
327  param.call->cq()->Pluck(&ops);
328  }
329 
331  Status* status) final {
332  // We have to destroy any request payload
333  if (req != nullptr) {
335  }
336  return nullptr;
337  }
338 };
339 
343 
344 } // namespace internal
345 } // namespace grpc
346 
347 #endif // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
A wrapper class of an application provided server streaming handler.
Definition: byte_buffer.h:47
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:82
ServerStreamingHandler(std::function< Status(ServiceType *, ServerContext *, const RequestType *, ServerWriter< ResponseType > *)> func, ServiceType *service)
Definition: method_handler_impl.h:160
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:124
StreamedUnaryHandler(std::function< Status(ServerContext *, ServerUnaryStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler_impl.h:283
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:144
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:878
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:323
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:813
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:828
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status) final
Definition: method_handler_impl.h:330
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:167
ClientStreamingHandler(std::function< Status(ServiceType *, ServerContext *, ServerReader< RequestType > *, ResponseType *)> func, ServiceType *service)
Definition: method_handler_impl.h:117
Definition: async_unary_call.h:304
Definition: grpc_types.h:40
RpcMethodHandler(std::function< Status(ServiceType *, ServerContext *, const RequestType *, ResponseType *)> func, ServiceType *service)
Definition: method_handler_impl.h:55
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:203
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:230
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:53
ErrorMethodHandler< StatusCode::RESOURCE_EXHAUSTED > ResourceExhaustedHandler
Definition: method_handler_impl.h:342
::google::protobuf::util::Status Status
Definition: config_protobuf.h:93
SplitServerStreamingHandler(std::function< Status(ServerContext *, ServerSplitStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler_impl.h:296
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status) final
Definition: method_handler_impl.h:89
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:218
virtual void grpc_byte_buffer_destroy(grpc_byte_buffer *bb)=0
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
Definition: call_op_set.h:636
Definition: call_op_set.h:224
Definition: method_handler_impl.h:292
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:143
Definition: call_op_set.h:294
ErrorMethodHandler< StatusCode::UNIMPLEMENTED > UnknownMethodHandler
Definition: method_handler_impl.h:340
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
A wrapper class of an application provided client streaming handler.
Definition: completion_queue.h:76
A wrapper class of an application provided rpc method handler.
Definition: byte_buffer.h:45
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:51
Definition: rpc_service_method.h:42
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:110
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs, where the outgoing message stream coming from the server has messages of type W.
Definition: completion_queue.h:55
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
bool ok() const
Is the status OK?
Definition: status.h:118
TemplatedBidiStreamingHandler(std::function< Status(ServerContext *, Streamer *)> func)
Definition: method_handler_impl.h:226
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status) final
Definition: method_handler_impl.h:194
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:772
Base class for running an RPC handler.
Definition: rpc_service_method.h:39
General method handler class for errors that prevent real method use e.g., handle unknown method by r...
Definition: byte_buffer.h:53
Did it work? If it didn&#39;t, why?
Definition: status.h:31
Definition: method_handler_impl.h:279
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:61
Definition: completion_queue.h:80
Status CatchingFunctionHandler(Callable &&handler)
Definition: method_handler_impl.h:39
Unknown error.
Definition: status_code_enum.h:35
Internal errors.
Definition: status_code_enum.h:119
A sequence of bytes.
Definition: byte_buffer.h:64
static void FillOps(ServerContext *context, T *ops)
Definition: method_handler_impl.h:310
BidiStreamingHandler(std::function< Status(ServiceType *, ServerContext *, ServerReaderWriter< ResponseType, RequestType > *)> func, ServiceType *service)
Definition: method_handler_impl.h:268