GRPC C++  1.33.1
method_handler.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
20 #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
21 
26 
27 namespace grpc {
28 
29 namespace internal {
30 
31 // Invoke the method handler, fill in the status, and
32 // return whether or not we finished safely (without an exception).
33 // Note that exception handling is 0-cost in most compiler/library
34 // implementations (except when an exception is actually thrown),
35 // so this process doesn't require additional overhead in the common case.
36 // Additionally, we don't need to return if we caught an exception or not;
37 // the handling is the same in either case.
38 template <class Callable>
40 #if GRPC_ALLOW_EXCEPTIONS
41  try {
42  return handler();
43  } catch (...) {
45  "Unexpected error in RPC handling");
46  }
47 #else // GRPC_ALLOW_EXCEPTIONS
48  return handler();
49 #endif // GRPC_ALLOW_EXCEPTIONS
50 }
51 
53 template <class ServiceType, class RequestType, class ResponseType>
54 class RpcMethodHandler : public ::grpc::internal::MethodHandler {
55  public:
57  std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
58  const RequestType*, ResponseType*)>
59  func,
60  ServiceType* service)
61  : func_(func), service_(service) {}
62 
63  void RunHandler(const HandlerParameter& param) final {
64  ResponseType rsp;
65  ::grpc::Status status = param.status;
66  if (status.ok()) {
67  status = CatchingFunctionHandler([this, &param, &rsp] {
68  return func_(service_,
69  static_cast<::grpc::ServerContext*>(param.server_context),
70  static_cast<RequestType*>(param.request), &rsp);
71  });
72  static_cast<RequestType*>(param.request)->~RequestType();
73  }
74 
75  GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
79  ops;
80  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
81  param.server_context->initial_metadata_flags());
82  if (param.server_context->compression_level_set()) {
83  ops.set_compression_level(param.server_context->compression_level());
84  }
85  if (status.ok()) {
86  status = ops.SendMessagePtr(&rsp);
87  }
88  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
89  param.call->PerformOps(&ops);
90  param.call->cq()->Pluck(&ops);
91  }
92 
94  ::grpc::Status* status, void** /*handler_data*/) final {
96  buf.set_buffer(req);
97  auto* request =
99  call, sizeof(RequestType))) RequestType();
100  *status =
102  buf.Release();
103  if (status->ok()) {
104  return request;
105  }
106  request->~RequestType();
107  return nullptr;
108  }
109 
110  private:
112  std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
113  const RequestType*, ResponseType*)>
114  func_;
115  // The class the above handler function lives in.
116  ServiceType* service_;
117 };
118 
120 template <class ServiceType, class RequestType, class ResponseType>
121 class ClientStreamingHandler : public ::grpc::internal::MethodHandler {
122  public:
124  std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
125  ServerReader<RequestType>*, ResponseType*)>
126  func,
127  ServiceType* service)
128  : func_(func), service_(service) {}
129 
130  void RunHandler(const HandlerParameter& param) final {
132  param.call, static_cast<::grpc::ServerContext*>(param.server_context));
133  ResponseType rsp;
134  ::grpc::Status status = CatchingFunctionHandler([this, &param, &reader,
135  &rsp] {
136  return func_(service_,
137  static_cast<::grpc::ServerContext*>(param.server_context),
138  &reader, &rsp);
139  });
140 
144  ops;
145  if (!param.server_context->sent_initial_metadata_) {
146  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
147  param.server_context->initial_metadata_flags());
148  if (param.server_context->compression_level_set()) {
149  ops.set_compression_level(param.server_context->compression_level());
150  }
151  }
152  if (status.ok()) {
153  status = ops.SendMessagePtr(&rsp);
154  }
155  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
156  param.call->PerformOps(&ops);
157  param.call->cq()->Pluck(&ops);
158  }
159 
160  private:
161  std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
162  ServerReader<RequestType>*, ResponseType*)>
163  func_;
164  ServiceType* service_;
165 };
166 
168 template <class ServiceType, class RequestType, class ResponseType>
169 class ServerStreamingHandler : public ::grpc::internal::MethodHandler {
170  public:
172  ServiceType*, ::grpc::ServerContext*,
173  const RequestType*, ServerWriter<ResponseType>*)>
174  func,
175  ServiceType* service)
176  : func_(func), service_(service) {}
177 
178  void RunHandler(const HandlerParameter& param) final {
179  ::grpc::Status status = param.status;
180  if (status.ok()) {
182  param.call,
183  static_cast<::grpc::ServerContext*>(param.server_context));
184  status = CatchingFunctionHandler([this, &param, &writer] {
185  return func_(service_,
186  static_cast<::grpc::ServerContext*>(param.server_context),
187  static_cast<RequestType*>(param.request), &writer);
188  });
189  static_cast<RequestType*>(param.request)->~RequestType();
190  }
191 
194  ops;
195  if (!param.server_context->sent_initial_metadata_) {
196  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
197  param.server_context->initial_metadata_flags());
198  if (param.server_context->compression_level_set()) {
199  ops.set_compression_level(param.server_context->compression_level());
200  }
201  }
202  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
203  param.call->PerformOps(&ops);
204  if (param.server_context->has_pending_ops_) {
205  param.call->cq()->Pluck(&param.server_context->pending_ops_);
206  }
207  param.call->cq()->Pluck(&ops);
208  }
209 
211  ::grpc::Status* status, void** /*handler_data*/) final {
212  ::grpc::ByteBuffer buf;
213  buf.set_buffer(req);
214  auto* request =
216  call, sizeof(RequestType))) RequestType();
217  *status =
219  buf.Release();
220  if (status->ok()) {
221  return request;
222  }
223  request->~RequestType();
224  return nullptr;
225  }
226 
227  private:
228  std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
229  const RequestType*, ServerWriter<ResponseType>*)>
230  func_;
231  ServiceType* service_;
232 };
233 
241 template <class Streamer, bool WriteNeeded>
242 class TemplatedBidiStreamingHandler : public ::grpc::internal::MethodHandler {
243  public:
245  std::function<::grpc::Status(::grpc::ServerContext*, Streamer*)> func)
246  : func_(func), write_needed_(WriteNeeded) {}
247 
248  void RunHandler(const HandlerParameter& param) final {
249  Streamer stream(param.call,
250  static_cast<::grpc::ServerContext*>(param.server_context));
251  ::grpc::Status status = CatchingFunctionHandler([this, &param, &stream] {
252  return func_(static_cast<::grpc::ServerContext*>(param.server_context),
253  &stream);
254  });
255 
258  ops;
259  if (!param.server_context->sent_initial_metadata_) {
260  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
261  param.server_context->initial_metadata_flags());
262  if (param.server_context->compression_level_set()) {
263  ops.set_compression_level(param.server_context->compression_level());
264  }
265  if (write_needed_ && status.ok()) {
266  // If we needed a write but never did one, we need to mark the
267  // status as a fail
269  "Service did not provide response message");
270  }
271  }
272  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
273  param.call->PerformOps(&ops);
274  if (param.server_context->has_pending_ops_) {
275  param.call->cq()->Pluck(&param.server_context->pending_ops_);
276  }
277  param.call->cq()->Pluck(&ops);
278  }
279 
280  private:
281  std::function<::grpc::Status(::grpc::ServerContext*, Streamer*)> func_;
282  const bool write_needed_;
283 };
284 
285 template <class ServiceType, class RequestType, class ResponseType>
288  ServerReaderWriter<ResponseType, RequestType>, false> {
289  public:
291  ServiceType*, ::grpc::ServerContext*,
293  func,
294  ServiceType* service)
295  // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
297  ServerReaderWriter<ResponseType, RequestType>, false>(
298  [func, service](
299  ::grpc::ServerContext* ctx,
300  ServerReaderWriter<ResponseType, RequestType>* streamer) {
301  return func(service, ctx, streamer);
302  }) {}
303 };
304 
305 template <class RequestType, class ResponseType>
308  ServerUnaryStreamer<RequestType, ResponseType>, true> {
309  public:
311  std::function<
314  func)
316  ServerUnaryStreamer<RequestType, ResponseType>, true>(
317  std::move(func)) {}
318 };
319 
320 template <class RequestType, class ResponseType>
323  ServerSplitStreamer<RequestType, ResponseType>, false> {
324  public:
326  std::function<
329  func)
331  ServerSplitStreamer<RequestType, ResponseType>, false>(
332  std::move(func)) {}
333 };
334 
337 template <::grpc::StatusCode code>
338 class ErrorMethodHandler : public ::grpc::internal::MethodHandler {
339  public:
340  template <class T>
341  static void FillOps(::grpc::ServerContextBase* context, T* ops) {
342  ::grpc::Status status(code, "");
343  if (!context->sent_initial_metadata_) {
344  ops->SendInitialMetadata(&context->initial_metadata_,
345  context->initial_metadata_flags());
346  if (context->compression_level_set()) {
347  ops->set_compression_level(context->compression_level());
348  }
349  context->sent_initial_metadata_ = true;
350  }
351  ops->ServerSendStatus(&context->trailing_metadata_, status);
352  }
353 
354  void RunHandler(const HandlerParameter& param) final {
357  ops;
358  FillOps(param.server_context, &ops);
359  param.call->PerformOps(&ops);
360  param.call->cq()->Pluck(&ops);
361  }
362 
363  void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
364  ::grpc::Status* /*status*/, void** /*handler_data*/) final {
365  // We have to destroy any request payload
366  if (req != nullptr) {
368  }
369  return nullptr;
370  }
371 };
372 
373 typedef ErrorMethodHandler<::grpc::StatusCode::UNIMPLEMENTED>
377 
378 } // namespace internal
379 } // namespace grpc
380 
381 #endif // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
grpc::internal::ResourceExhaustedHandler
ErrorMethodHandler<::grpc::StatusCode::RESOURCE_EXHAUSTED > ResourceExhaustedHandler
Definition: method_handler.h:376
grpc::internal::ClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:130
grpc::ServerContext
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:527
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:653
rpc_service_method.h
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:850
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:287
grpc::internal::ErrorMethodHandler
General method handler class for errors that prevent real method use e.g., handle unknown method by r...
Definition: byte_buffer.h:48
grpc::internal::ServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:178
grpc::CoreCodegenInterface::grpc_call_arena_alloc
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
grpc::internal::StreamedUnaryHandler::StreamedUnaryHandler
StreamedUnaryHandler(std::function< ::grpc::Status(::grpc::ServerContext *, ServerUnaryStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler.h:310
grpc::ServerWriter
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs,...
Definition: completion_queue.h:57
grpc::internal::SplitServerStreamingHandler
Definition: method_handler.h:321
grpc::internal::SplitServerStreamingHandler::SplitServerStreamingHandler
SplitServerStreamingHandler(std::function< ::grpc::Status(::grpc::ServerContext *, ServerSplitStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler.h:325
grpc::internal::TemplatedBidiStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:248
grpc::ServerContextBase
Base class of ServerContext. Experimental until callback API is final.
Definition: server_context.h:125
core_codegen_interface.h
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:217
grpc::internal::CallOpServerSendStatus::ServerSendStatus
void ServerSendStatus(std::multimap< std::string, std::string > *trailing_metadata, const Status &status)
Definition: call_op_set.h:657
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:118
grpc::CoreCodegenInterface::grpc_byte_buffer_destroy
virtual void grpc_byte_buffer_destroy(grpc_byte_buffer *bb)=0
byte_buffer.h
grpc::ServerSplitStreamer
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:886
grpc::internal::RpcMethodHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: method_handler.h:93
grpc::internal::ClientStreamingHandler::ClientStreamingHandler
ClientStreamingHandler(std::function<::grpc::Status(ServiceType *, ::grpc::ServerContext *, ServerReader< RequestType > *, ResponseType *)> func, ServiceType *service)
Definition: method_handler.h:123
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:31
grpc::internal::ServerStreamingHandler::ServerStreamingHandler
ServerStreamingHandler(std::function<::grpc::Status(ServiceType *, ::grpc::ServerContext *, const RequestType *, ServerWriter< ResponseType > *)> func, ServiceType *service)
Definition: method_handler.h:171
grpc::ServerReader
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:55
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
grpc_byte_buffer
Definition: grpc_types.h:40
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:60
grpc::ServerContextBase::compression_level_set
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:242
grpc::SerializationTraits
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
grpc::internal::ErrorMethodHandler::Deserialize
void * Deserialize(grpc_call *, grpc_byte_buffer *req, ::grpc::Status *, void **) final
Definition: method_handler.h:363
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:38
grpc::protobuf::util::Status
::google::protobuf::util::Status Status
Definition: config_protobuf.h:90
grpc::ServerReaderWriter
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:779
grpc::internal::ServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: method_handler.h:210
grpc::internal::TemplatedBidiStreamingHandler
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:69
grpc::internal::BidiStreamingHandler
Definition: method_handler.h:286
grpc::internal::BidiStreamingHandler::BidiStreamingHandler
BidiStreamingHandler(std::function<::grpc::Status(ServiceType *, ::grpc::ServerContext *, ServerReaderWriter< ResponseType, RequestType > *)> func, ServiceType *service)
Definition: method_handler.h:290
std
Definition: async_unary_call.h:301
grpc::internal::UnknownMethodHandler
ErrorMethodHandler<::grpc::StatusCode::UNIMPLEMENTED > UnknownMethodHandler
Definition: method_handler.h:374
grpc::internal::TemplatedBidiStreamingHandler::TemplatedBidiStreamingHandler
TemplatedBidiStreamingHandler(std::function<::grpc::Status(::grpc::ServerContext *, Streamer *)> func)
Definition: method_handler.h:244
grpc::internal::ErrorMethodHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:354
grpc::internal::CatchingFunctionHandler
::grpc::Status CatchingFunctionHandler(Callable &&handler)
Definition: method_handler.h:39
grpc::ServerUnaryStreamer
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:820
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:90
GPR_CODEGEN_ASSERT
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
grpc::UNKNOWN
@ UNKNOWN
Unknown error.
Definition: status_code_enum.h:35
grpc::internal::ErrorMethodHandler::FillOps
static void FillOps(::grpc::ServerContextBase *context, T *ops)
Definition: method_handler.h:341
grpc::internal::RpcMethodHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:63
grpc::internal::RpcMethodHandler::RpcMethodHandler
RpcMethodHandler(std::function<::grpc::Status(ServiceType *, ::grpc::ServerContext *, const RequestType *, ResponseType *)> func, ServiceType *service)
Definition: method_handler.h:56
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:139
grpc::internal::StreamedUnaryHandler
Definition: method_handler.h:306
sync_stream.h
grpc::INTERNAL
@ INTERNAL
Internal errors.
Definition: status_code_enum.h:119
grpc::ServerContextBase::compression_level
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:227