GRPC C++  1.16.0-dev
method_handler_impl.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
20 #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
21 
26 
27 namespace grpc {
28 
29 namespace internal {
30 
31 // Invoke the method handler, fill in the status, and
32 // return whether or not we finished safely (without an exception).
33 // Note that exception handling is 0-cost in most compiler/library
34 // implementations (except when an exception is actually thrown),
35 // so this process doesn't require additional overhead in the common case.
36 // Additionally, we don't need to return if we caught an exception or not;
37 // the handling is the same in either case.
38 template <class Callable>
39 Status CatchingFunctionHandler(Callable&& handler) {
40 #if GRPC_ALLOW_EXCEPTIONS
41  try {
42  return handler();
43  } catch (...) {
44  return Status(StatusCode::UNKNOWN, "Unexpected error in RPC handling");
45  }
46 #else // GRPC_ALLOW_EXCEPTIONS
47  return handler();
48 #endif // GRPC_ALLOW_EXCEPTIONS
49 }
50 
52 template <class ServiceType, class RequestType, class ResponseType>
53 class RpcMethodHandler : public MethodHandler {
54  public:
55  RpcMethodHandler(std::function<Status(ServiceType*, ServerContext*,
56  const RequestType*, ResponseType*)>
57  func,
58  ServiceType* service)
59  : func_(func), service_(service) {}
60 
61  void RunHandler(const HandlerParameter& param) final {
62  RequestType req;
64  param.request.bbuf_ptr(), &req);
65  ResponseType rsp;
66  if (status.ok()) {
67  status = CatchingFunctionHandler([this, &param, &req, &rsp] {
68  return func_(service_, param.server_context, &req, &rsp);
69  });
70  }
71 
72  GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
75  ops;
76  ops.SendInitialMetadata(param.server_context->initial_metadata_,
77  param.server_context->initial_metadata_flags());
78  if (param.server_context->compression_level_set()) {
79  ops.set_compression_level(param.server_context->compression_level());
80  }
81  if (status.ok()) {
82  status = ops.SendMessage(rsp);
83  }
84  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
85  param.call->PerformOps(&ops);
86  param.call->cq()->Pluck(&ops);
87  }
88 
89  private:
91  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
92  ResponseType*)>
93  func_;
94  // The class the above handler function lives in.
95  ServiceType* service_;
96 };
97 
99 template <class ServiceType, class RequestType, class ResponseType>
100 class ClientStreamingHandler : public MethodHandler {
101  public:
103  std::function<Status(ServiceType*, ServerContext*,
104  ServerReader<RequestType>*, ResponseType*)>
105  func,
106  ServiceType* service)
107  : func_(func), service_(service) {}
108 
109  void RunHandler(const HandlerParameter& param) final {
110  ServerReader<RequestType> reader(param.call, param.server_context);
111  ResponseType rsp;
112  Status status = CatchingFunctionHandler([this, &param, &reader, &rsp] {
113  return func_(service_, param.server_context, &reader, &rsp);
114  });
115 
118  ops;
119  if (!param.server_context->sent_initial_metadata_) {
120  ops.SendInitialMetadata(param.server_context->initial_metadata_,
121  param.server_context->initial_metadata_flags());
122  if (param.server_context->compression_level_set()) {
123  ops.set_compression_level(param.server_context->compression_level());
124  }
125  }
126  if (status.ok()) {
127  status = ops.SendMessage(rsp);
128  }
129  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
130  param.call->PerformOps(&ops);
131  param.call->cq()->Pluck(&ops);
132  }
133 
134  private:
135  std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
136  ResponseType*)>
137  func_;
138  ServiceType* service_;
139 };
140 
142 template <class ServiceType, class RequestType, class ResponseType>
143 class ServerStreamingHandler : public MethodHandler {
144  public:
146  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
148  func,
149  ServiceType* service)
150  : func_(func), service_(service) {}
151 
152  void RunHandler(const HandlerParameter& param) final {
153  RequestType req;
155  param.request.bbuf_ptr(), &req);
156 
157  if (status.ok()) {
158  ServerWriter<ResponseType> writer(param.call, param.server_context);
159  status = CatchingFunctionHandler([this, &param, &req, &writer] {
160  return func_(service_, param.server_context, &req, &writer);
161  });
162  }
163 
165  if (!param.server_context->sent_initial_metadata_) {
166  ops.SendInitialMetadata(param.server_context->initial_metadata_,
167  param.server_context->initial_metadata_flags());
168  if (param.server_context->compression_level_set()) {
169  ops.set_compression_level(param.server_context->compression_level());
170  }
171  }
172  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
173  param.call->PerformOps(&ops);
174  if (param.server_context->has_pending_ops_) {
175  param.call->cq()->Pluck(&param.server_context->pending_ops_);
176  }
177  param.call->cq()->Pluck(&ops);
178  }
179 
180  private:
181  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
183  func_;
184  ServiceType* service_;
185 };
186 
194 template <class Streamer, bool WriteNeeded>
196  public:
198  std::function<Status(ServerContext*, Streamer*)> func)
199  : func_(func), write_needed_(WriteNeeded) {}
200 
201  void RunHandler(const HandlerParameter& param) final {
202  Streamer stream(param.call, param.server_context);
203  Status status = CatchingFunctionHandler([this, &param, &stream] {
204  return func_(param.server_context, &stream);
205  });
206 
208  if (!param.server_context->sent_initial_metadata_) {
209  ops.SendInitialMetadata(param.server_context->initial_metadata_,
210  param.server_context->initial_metadata_flags());
211  if (param.server_context->compression_level_set()) {
212  ops.set_compression_level(param.server_context->compression_level());
213  }
214  if (write_needed_ && status.ok()) {
215  // If we needed a write but never did one, we need to mark the
216  // status as a fail
217  status = Status(StatusCode::INTERNAL,
218  "Service did not provide response message");
219  }
220  }
221  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
222  param.call->PerformOps(&ops);
223  if (param.server_context->has_pending_ops_) {
224  param.call->cq()->Pluck(&param.server_context->pending_ops_);
225  }
226  param.call->cq()->Pluck(&ops);
227  }
228 
229  private:
230  std::function<Status(ServerContext*, Streamer*)> func_;
231  const bool write_needed_;
232 };
233 
234 template <class ServiceType, class RequestType, class ResponseType>
237  ServerReaderWriter<ResponseType, RequestType>, false> {
238  public:
240  std::function<Status(ServiceType*, ServerContext*,
242  func,
243  ServiceType* service)
245  ServerReaderWriter<ResponseType, RequestType>, false>(std::bind(
246  func, service, std::placeholders::_1, std::placeholders::_2)) {}
247 };
248 
249 template <class RequestType, class ResponseType>
252  ServerUnaryStreamer<RequestType, ResponseType>, true> {
253  public:
255  std::function<Status(ServerContext*,
257  func)
259  ServerUnaryStreamer<RequestType, ResponseType>, true>(func) {}
260 };
261 
262 template <class RequestType, class ResponseType>
265  ServerSplitStreamer<RequestType, ResponseType>, false> {
266  public:
268  std::function<Status(ServerContext*,
270  func)
272  ServerSplitStreamer<RequestType, ResponseType>, false>(func) {}
273 };
274 
277 template <StatusCode code>
278 class ErrorMethodHandler : public MethodHandler {
279  public:
280  template <class T>
281  static void FillOps(ServerContext* context, T* ops) {
282  Status status(code, "");
283  if (!context->sent_initial_metadata_) {
284  ops->SendInitialMetadata(context->initial_metadata_,
285  context->initial_metadata_flags());
286  if (context->compression_level_set()) {
287  ops->set_compression_level(context->compression_level());
288  }
289  context->sent_initial_metadata_ = true;
290  }
291  ops->ServerSendStatus(context->trailing_metadata_, status);
292  }
293 
294  void RunHandler(const HandlerParameter& param) final {
296  FillOps(param.server_context, &ops);
297  param.call->PerformOps(&ops);
298  param.call->cq()->Pluck(&ops);
299  // We also have to destroy any request payload in the handler parameter
300  ByteBuffer* payload = param.request.bbuf_ptr();
301  if (payload != nullptr) {
302  payload->Clear();
303  }
304  }
305 };
306 
310 
311 } // namespace internal
312 } // namespace grpc
313 
314 #endif // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
void Clear()
Remove all data.
Definition: byte_buffer.h:102
A wrapper class of an application provided server streaming handler.
Definition: byte_buffer.h:47
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:82
ServerStreamingHandler(std::function< Status(ServiceType *, ServerContext *, const RequestType *, ServerWriter< ResponseType > *)> func, ServiceType *service)
Definition: method_handler_impl.h:145
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:109
StreamedUnaryHandler(std::function< Status(ServerContext *, ServerUnaryStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler_impl.h:254
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:138
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:878
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:294
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:813
Primary implementation of CallOpSetInterface.
Definition: call.h:618
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:152
ClientStreamingHandler(std::function< Status(ServiceType *, ServerContext *, ServerReader< RequestType > *, ResponseType *)> func, ServiceType *service)
Definition: method_handler_impl.h:102
Definition: async_unary_call.h:303
RpcMethodHandler(std::function< Status(ServiceType *, ServerContext *, const RequestType *, ResponseType *)> func, ServiceType *service)
Definition: method_handler_impl.h:55
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:176
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:201
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:53
ErrorMethodHandler< StatusCode::RESOURCE_EXHAUSTED > ResourceExhaustedHandler
Definition: method_handler_impl.h:309
SplitServerStreamingHandler(std::function< Status(ServerContext *, ServerSplitStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler_impl.h:267
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:191
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
Definition: method_handler_impl.h:263
Definition: call.h:266
ErrorMethodHandler< StatusCode::UNIMPLEMENTED > UnknownMethodHandler
Definition: method_handler_impl.h:307
An Alarm posts the user provided tag to its associated completion queue upon expiry or cancellation...
Definition: alarm.h:31
A wrapper class of an application provided client streaming handler.
Definition: completion_queue.h:76
A wrapper class of an application provided rpc method handler.
Definition: byte_buffer.h:45
Definition: rpc_service_method.h:42
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:97
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs, where the outgoing message stream coming from the server has messages of type W.
Definition: completion_queue.h:55
bool ok() const
Is the status OK?
Definition: status.h:118
TemplatedBidiStreamingHandler(std::function< Status(ServerContext *, Streamer *)> func)
Definition: method_handler_impl.h:197
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:772
Base class for running an RPC handler.
Definition: rpc_service_method.h:39
General method handler class for errors that prevent real method use e.g., handle unknown method by r...
Definition: byte_buffer.h:49
Did it work? If it didn&#39;t, why?
Definition: status.h:31
Definition: method_handler_impl.h:250
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:61
Definition: completion_queue.h:80
Status CatchingFunctionHandler(Callable &&handler)
Definition: method_handler_impl.h:39
Unknown error.
Definition: status_code_enum.h:35
Internal errors.
Definition: status_code_enum.h:119
A sequence of bytes.
Definition: byte_buffer.h:55
static void FillOps(ServerContext *context, T *ops)
Definition: method_handler_impl.h:281
BidiStreamingHandler(std::function< Status(ServiceType *, ServerContext *, ServerReaderWriter< ResponseType, RequestType > *)> func, ServiceType *service)
Definition: method_handler_impl.h:239