GRPC C++  1.23.0
method_handler_impl.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
20 #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
21 
26 
27 namespace grpc {
28 
29 namespace internal {
30 
31 // Invoke the method handler, fill in the status, and
32 // return whether or not we finished safely (without an exception).
33 // Note that exception handling is 0-cost in most compiler/library
34 // implementations (except when an exception is actually thrown),
35 // so this process doesn't require additional overhead in the common case.
36 // Additionally, we don't need to return if we caught an exception or not;
37 // the handling is the same in either case.
38 template <class Callable>
39 Status CatchingFunctionHandler(Callable&& handler) {
40 #if GRPC_ALLOW_EXCEPTIONS
41  try {
42  return handler();
43  } catch (...) {
44  return Status(StatusCode::UNKNOWN, "Unexpected error in RPC handling");
45  }
46 #else // GRPC_ALLOW_EXCEPTIONS
47  return handler();
48 #endif // GRPC_ALLOW_EXCEPTIONS
49 }
50 
52 template <class ServiceType, class RequestType, class ResponseType>
53 class RpcMethodHandler : public MethodHandler {
54  public:
56  std::function<Status(ServiceType*, ::grpc_impl::ServerContext*,
57  const RequestType*, ResponseType*)>
58  func,
59  ServiceType* service)
60  : func_(func), service_(service) {}
61 
62  void RunHandler(const HandlerParameter& param) final {
63  ResponseType rsp;
64  Status status = param.status;
65  if (status.ok()) {
66  status = CatchingFunctionHandler([this, &param, &rsp] {
67  return func_(service_, param.server_context,
68  static_cast<RequestType*>(param.request), &rsp);
69  });
70  static_cast<RequestType*>(param.request)->~RequestType();
71  }
72 
73  GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
76  ops;
77  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
78  param.server_context->initial_metadata_flags());
79  if (param.server_context->compression_level_set()) {
80  ops.set_compression_level(param.server_context->compression_level());
81  }
82  if (status.ok()) {
83  status = ops.SendMessagePtr(&rsp);
84  }
85  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
86  param.call->PerformOps(&ops);
87  param.call->cq()->Pluck(&ops);
88  }
89 
90  void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status,
91  void** handler_data) final {
92  ByteBuffer buf;
93  buf.set_buffer(req);
94  auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
95  call, sizeof(RequestType))) RequestType();
96  *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
97  buf.Release();
98  if (status->ok()) {
99  return request;
100  }
101  request->~RequestType();
102  return nullptr;
103  }
104 
105  private:
107  std::function<Status(ServiceType*, ::grpc_impl::ServerContext*,
108  const RequestType*, ResponseType*)>
109  func_;
110  // The class the above handler function lives in.
111  ServiceType* service_;
112 };
113 
115 template <class ServiceType, class RequestType, class ResponseType>
116 class ClientStreamingHandler : public MethodHandler {
117  public:
119  std::function<Status(ServiceType*, ::grpc_impl::ServerContext*,
120  ServerReader<RequestType>*, ResponseType*)>
121  func,
122  ServiceType* service)
123  : func_(func), service_(service) {}
124 
125  void RunHandler(const HandlerParameter& param) final {
126  ServerReader<RequestType> reader(param.call, param.server_context);
127  ResponseType rsp;
128  Status status = CatchingFunctionHandler([this, &param, &reader, &rsp] {
129  return func_(service_, param.server_context, &reader, &rsp);
130  });
131 
134  ops;
135  if (!param.server_context->sent_initial_metadata_) {
136  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
137  param.server_context->initial_metadata_flags());
138  if (param.server_context->compression_level_set()) {
139  ops.set_compression_level(param.server_context->compression_level());
140  }
141  }
142  if (status.ok()) {
143  status = ops.SendMessagePtr(&rsp);
144  }
145  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
146  param.call->PerformOps(&ops);
147  param.call->cq()->Pluck(&ops);
148  }
149 
150  private:
151  std::function<Status(ServiceType*, ::grpc_impl::ServerContext*,
152  ServerReader<RequestType>*, ResponseType*)>
153  func_;
154  ServiceType* service_;
155 };
156 
158 template <class ServiceType, class RequestType, class ResponseType>
159 class ServerStreamingHandler : public MethodHandler {
160  public:
162  std::function<Status(ServiceType*, ::grpc_impl::ServerContext*,
163  const RequestType*, ServerWriter<ResponseType>*)>
164  func,
165  ServiceType* service)
166  : func_(func), service_(service) {}
167 
168  void RunHandler(const HandlerParameter& param) final {
169  Status status = param.status;
170  if (status.ok()) {
171  ServerWriter<ResponseType> writer(param.call, param.server_context);
172  status = CatchingFunctionHandler([this, &param, &writer] {
173  return func_(service_, param.server_context,
174  static_cast<RequestType*>(param.request), &writer);
175  });
176  static_cast<RequestType*>(param.request)->~RequestType();
177  }
178 
180  if (!param.server_context->sent_initial_metadata_) {
181  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
182  param.server_context->initial_metadata_flags());
183  if (param.server_context->compression_level_set()) {
184  ops.set_compression_level(param.server_context->compression_level());
185  }
186  }
187  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
188  param.call->PerformOps(&ops);
189  if (param.server_context->has_pending_ops_) {
190  param.call->cq()->Pluck(&param.server_context->pending_ops_);
191  }
192  param.call->cq()->Pluck(&ops);
193  }
194 
195  void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status,
196  void** handler_data) final {
197  ByteBuffer buf;
198  buf.set_buffer(req);
199  auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
200  call, sizeof(RequestType))) RequestType();
201  *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
202  buf.Release();
203  if (status->ok()) {
204  return request;
205  }
206  request->~RequestType();
207  return nullptr;
208  }
209 
210  private:
211  std::function<Status(ServiceType*, ::grpc_impl::ServerContext*,
212  const RequestType*, ServerWriter<ResponseType>*)>
213  func_;
214  ServiceType* service_;
215 };
216 
224 template <class Streamer, bool WriteNeeded>
226  public:
228  std::function<Status(::grpc_impl::ServerContext*, Streamer*)> func)
229  : func_(func), write_needed_(WriteNeeded) {}
230 
231  void RunHandler(const HandlerParameter& param) final {
232  Streamer stream(param.call, param.server_context);
233  Status status = CatchingFunctionHandler([this, &param, &stream] {
234  return func_(param.server_context, &stream);
235  });
236 
238  if (!param.server_context->sent_initial_metadata_) {
239  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
240  param.server_context->initial_metadata_flags());
241  if (param.server_context->compression_level_set()) {
242  ops.set_compression_level(param.server_context->compression_level());
243  }
244  if (write_needed_ && status.ok()) {
245  // If we needed a write but never did one, we need to mark the
246  // status as a fail
247  status = Status(StatusCode::INTERNAL,
248  "Service did not provide response message");
249  }
250  }
251  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
252  param.call->PerformOps(&ops);
253  if (param.server_context->has_pending_ops_) {
254  param.call->cq()->Pluck(&param.server_context->pending_ops_);
255  }
256  param.call->cq()->Pluck(&ops);
257  }
258 
259  private:
260  std::function<Status(::grpc_impl::ServerContext*, Streamer*)> func_;
261  const bool write_needed_;
262 };
263 
264 template <class ServiceType, class RequestType, class ResponseType>
267  ServerReaderWriter<ResponseType, RequestType>, false> {
268  public:
270  std::function<Status(ServiceType*, ::grpc_impl::ServerContext*,
272  func,
273  ServiceType* service)
275  ServerReaderWriter<ResponseType, RequestType>, false>(std::bind(
276  func, service, std::placeholders::_1, std::placeholders::_2)) {}
277 };
278 
279 template <class RequestType, class ResponseType>
282  ServerUnaryStreamer<RequestType, ResponseType>, true> {
283  public:
285  std::function<Status(::grpc_impl::ServerContext*,
287  func)
289  ServerUnaryStreamer<RequestType, ResponseType>, true>(func) {}
290 };
291 
292 template <class RequestType, class ResponseType>
295  ServerSplitStreamer<RequestType, ResponseType>, false> {
296  public:
298  std::function<Status(::grpc_impl::ServerContext*,
300  func)
302  ServerSplitStreamer<RequestType, ResponseType>, false>(func) {}
303 };
304 
307 template <StatusCode code>
308 class ErrorMethodHandler : public MethodHandler {
309  public:
310  template <class T>
311  static void FillOps(::grpc_impl::ServerContext* context, T* ops) {
312  Status status(code, "");
313  if (!context->sent_initial_metadata_) {
314  ops->SendInitialMetadata(&context->initial_metadata_,
315  context->initial_metadata_flags());
316  if (context->compression_level_set()) {
317  ops->set_compression_level(context->compression_level());
318  }
319  context->sent_initial_metadata_ = true;
320  }
321  ops->ServerSendStatus(&context->trailing_metadata_, status);
322  }
323 
324  void RunHandler(const HandlerParameter& param) final {
326  FillOps(param.server_context, &ops);
327  param.call->PerformOps(&ops);
328  param.call->cq()->Pluck(&ops);
329  }
330 
331  void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status,
332  void** handler_data) final {
333  // We have to destroy any request payload
334  if (req != nullptr) {
336  }
337  return nullptr;
338  }
339 };
340 
344 
345 } // namespace internal
346 } // namespace grpc
347 
348 #endif // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
A wrapper class of an application provided server streaming handler.
Definition: byte_buffer.h:58
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue_impl.h:82
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:145
RpcMethodHandler(std::function< Status(ServiceType *, ::grpc_impl::ServerContext *, const RequestType *, ResponseType *)> func, ServiceType *service)
Definition: method_handler_impl.h:55
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:324
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:821
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:168
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status, void **handler_data) final
Definition: method_handler_impl.h:195
Definition: async_unary_call_impl.h:302
StreamedUnaryHandler(std::function< Status(::grpc_impl::ServerContext *, ServerUnaryStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler_impl.h:284
Definition: grpc_types.h:40
A ServerContext allows the person implementing a service handler to:
Definition: server_context_impl.h:118
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:231
ErrorMethodHandler< StatusCode::RESOURCE_EXHAUSTED > ResourceExhaustedHandler
Definition: method_handler_impl.h:343
::google::protobuf::util::Status Status
Definition: config_protobuf.h:96
ClientStreamingHandler(std::function< Status(ServiceType *, ::grpc_impl::ServerContext *, ServerReader< RequestType > *, ResponseType *)> func, ServiceType *service)
Definition: method_handler_impl.h:118
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status, void **handler_data) final
Definition: method_handler_impl.h:331
virtual void grpc_byte_buffer_destroy(grpc_byte_buffer *bb)=0
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue_impl.h:57
Definition: call_op_set.h:629
SplitServerStreamingHandler(std::function< Status(::grpc_impl::ServerContext *, ServerSplitStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler_impl.h:297
Definition: call_op_set.h:218
Definition: method_handler_impl.h:293
BidiStreamingHandler(std::function< Status(ServiceType *, ::grpc_impl::ServerContext *, ServerReaderWriter< ResponseType, RequestType > *)> func, ServiceType *service)
Definition: method_handler_impl.h:269
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:151
Definition: call_op_set.h:288
ErrorMethodHandler< StatusCode::UNIMPLEMENTED > UnknownMethodHandler
Definition: method_handler_impl.h:341
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context_impl.h:211
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs, where the outgoing message stream coming from the server has messages of type W.
Definition: completion_queue_impl.h:59
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
A wrapper class of an application provided client streaming handler.
Definition: completion_queue_impl.h:76
A wrapper class of an application provided rpc method handler.
Definition: byte_buffer.h:56
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue_impl.h:91
Definition: rpc_service_method.h:44
A class to represent a flow-controlled unary call.
Definition: sync_stream_impl.h:819
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream_impl.h:886
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
bool ok() const
Is the status OK?
Definition: status.h:118
static void FillOps(::grpc_impl::ServerContext *context, T *ops)
Definition: method_handler_impl.h:311
Base class for running an RPC handler.
Definition: rpc_service_method.h:41
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context_impl.h:226
General method handler class for errors that prevent real method use e.g., handle unknown method by r...
Definition: byte_buffer.h:60
Did it work? If it didn&#39;t, why?
Definition: status.h:31
Definition: method_handler_impl.h:280
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:62
Definition: completion_queue_impl.h:80
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status, void **handler_data) final
Definition: method_handler_impl.h:90
Status CatchingFunctionHandler(Callable &&handler)
Definition: method_handler_impl.h:39
TemplatedBidiStreamingHandler(std::function< Status(::grpc_impl::ServerContext *, Streamer *)> func)
Definition: method_handler_impl.h:227
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream_impl.h:777
Unknown error.
Definition: status_code_enum.h:35
Internal errors.
Definition: status_code_enum.h:119
A sequence of bytes.
Definition: byte_buffer.h:72
ServerStreamingHandler(std::function< Status(ServiceType *, ::grpc_impl::ServerContext *, const RequestType *, ServerWriter< ResponseType > *)> func, ServiceType *service)
Definition: method_handler_impl.h:161