GRPC C++  1.13.0-dev
method_handler_impl.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
20 #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
21 
26 
27 namespace grpc {
28 
29 namespace internal {
30 
31 // Invoke the method handler, fill in the status, and
32 // return whether or not we finished safely (without an exception).
33 // Note that exception handling is 0-cost in most compiler/library
34 // implementations (except when an exception is actually thrown),
35 // so this process doesn't require additional overhead in the common case.
36 // Additionally, we don't need to return if we caught an exception or not;
37 // the handling is the same in either case.
38 template <class Callable>
39 Status CatchingFunctionHandler(Callable&& handler) {
40 #if GRPC_ALLOW_EXCEPTIONS
41  try {
42  return handler();
43  } catch (...) {
44  return Status(StatusCode::UNKNOWN, "Unexpected error in RPC handling");
45  }
46 #else // GRPC_ALLOW_EXCEPTIONS
47  return handler();
48 #endif // GRPC_ALLOW_EXCEPTIONS
49 }
50 
52 template <class ServiceType, class RequestType, class ResponseType>
53 class RpcMethodHandler : public MethodHandler {
54  public:
55  RpcMethodHandler(std::function<Status(ServiceType*, ServerContext*,
56  const RequestType*, ResponseType*)>
57  func,
58  ServiceType* service)
59  : func_(func), service_(service) {}
60 
61  void RunHandler(const HandlerParameter& param) final {
62  RequestType req;
64  param.request.bbuf_ptr(), &req);
65  ResponseType rsp;
66  if (status.ok()) {
67  status = CatchingFunctionHandler([this, &param, &req, &rsp] {
68  return func_(service_, param.server_context, &req, &rsp);
69  });
70  }
71 
72  GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
75  ops;
76  ops.SendInitialMetadata(param.server_context->initial_metadata_,
77  param.server_context->initial_metadata_flags());
78  if (param.server_context->compression_level_set()) {
79  ops.set_compression_level(param.server_context->compression_level());
80  }
81  if (status.ok()) {
82  status = ops.SendMessage(rsp);
83  }
84  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
85  param.call->PerformOps(&ops);
86  param.call->cq()->Pluck(&ops);
87  }
88 
89  private:
91  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
92  ResponseType*)>
93  func_;
94  // The class the above handler function lives in.
95  ServiceType* service_;
96 };
97 
99 template <class ServiceType, class RequestType, class ResponseType>
100 class ClientStreamingHandler : public MethodHandler {
101  public:
103  std::function<Status(ServiceType*, ServerContext*,
104  ServerReader<RequestType>*, ResponseType*)>
105  func,
106  ServiceType* service)
107  : func_(func), service_(service) {}
108 
109  void RunHandler(const HandlerParameter& param) final {
110  ServerReader<RequestType> reader(param.call, param.server_context);
111  ResponseType rsp;
112  Status status = CatchingFunctionHandler([this, &param, &reader, &rsp] {
113  return func_(service_, param.server_context, &reader, &rsp);
114  });
115 
116  GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
119  ops;
120  ops.SendInitialMetadata(param.server_context->initial_metadata_,
121  param.server_context->initial_metadata_flags());
122  if (param.server_context->compression_level_set()) {
123  ops.set_compression_level(param.server_context->compression_level());
124  }
125  if (status.ok()) {
126  status = ops.SendMessage(rsp);
127  }
128  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
129  param.call->PerformOps(&ops);
130  param.call->cq()->Pluck(&ops);
131  }
132 
133  private:
134  std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
135  ResponseType*)>
136  func_;
137  ServiceType* service_;
138 };
139 
141 template <class ServiceType, class RequestType, class ResponseType>
142 class ServerStreamingHandler : public MethodHandler {
143  public:
145  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
147  func,
148  ServiceType* service)
149  : func_(func), service_(service) {}
150 
151  void RunHandler(const HandlerParameter& param) final {
152  RequestType req;
154  param.request.bbuf_ptr(), &req);
155 
156  if (status.ok()) {
157  ServerWriter<ResponseType> writer(param.call, param.server_context);
158  status = CatchingFunctionHandler([this, &param, &req, &writer] {
159  return func_(service_, param.server_context, &req, &writer);
160  });
161  }
162 
164  if (!param.server_context->sent_initial_metadata_) {
165  ops.SendInitialMetadata(param.server_context->initial_metadata_,
166  param.server_context->initial_metadata_flags());
167  if (param.server_context->compression_level_set()) {
168  ops.set_compression_level(param.server_context->compression_level());
169  }
170  }
171  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
172  param.call->PerformOps(&ops);
173  if (param.server_context->has_pending_ops_) {
174  param.call->cq()->Pluck(&param.server_context->pending_ops_);
175  }
176  param.call->cq()->Pluck(&ops);
177  }
178 
179  private:
180  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
182  func_;
183  ServiceType* service_;
184 };
185 
193 template <class Streamer, bool WriteNeeded>
195  public:
197  std::function<Status(ServerContext*, Streamer*)> func)
198  : func_(func), write_needed_(WriteNeeded) {}
199 
200  void RunHandler(const HandlerParameter& param) final {
201  Streamer stream(param.call, param.server_context);
202  Status status = CatchingFunctionHandler([this, &param, &stream] {
203  return func_(param.server_context, &stream);
204  });
205 
207  if (!param.server_context->sent_initial_metadata_) {
208  ops.SendInitialMetadata(param.server_context->initial_metadata_,
209  param.server_context->initial_metadata_flags());
210  if (param.server_context->compression_level_set()) {
211  ops.set_compression_level(param.server_context->compression_level());
212  }
213  if (write_needed_ && status.ok()) {
214  // If we needed a write but never did one, we need to mark the
215  // status as a fail
216  status = Status(StatusCode::INTERNAL,
217  "Service did not provide response message");
218  }
219  }
220  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
221  param.call->PerformOps(&ops);
222  if (param.server_context->has_pending_ops_) {
223  param.call->cq()->Pluck(&param.server_context->pending_ops_);
224  }
225  param.call->cq()->Pluck(&ops);
226  }
227 
228  private:
229  std::function<Status(ServerContext*, Streamer*)> func_;
230  const bool write_needed_;
231 };
232 
233 template <class ServiceType, class RequestType, class ResponseType>
236  ServerReaderWriter<ResponseType, RequestType>, false> {
237  public:
239  std::function<Status(ServiceType*, ServerContext*,
241  func,
242  ServiceType* service)
244  ServerReaderWriter<ResponseType, RequestType>, false>(std::bind(
245  func, service, std::placeholders::_1, std::placeholders::_2)) {}
246 };
247 
248 template <class RequestType, class ResponseType>
251  ServerUnaryStreamer<RequestType, ResponseType>, true> {
252  public:
254  std::function<Status(ServerContext*,
256  func)
258  ServerUnaryStreamer<RequestType, ResponseType>, true>(func) {}
259 };
260 
261 template <class RequestType, class ResponseType>
264  ServerSplitStreamer<RequestType, ResponseType>, false> {
265  public:
267  std::function<Status(ServerContext*,
269  func)
271  ServerSplitStreamer<RequestType, ResponseType>, false>(func) {}
272 };
273 
276  public:
277  template <class T>
278  static void FillOps(ServerContext* context, T* ops) {
279  Status status(StatusCode::UNIMPLEMENTED, "");
280  if (!context->sent_initial_metadata_) {
281  ops->SendInitialMetadata(context->initial_metadata_,
282  context->initial_metadata_flags());
283  if (context->compression_level_set()) {
284  ops->set_compression_level(context->compression_level());
285  }
286  context->sent_initial_metadata_ = true;
287  }
288  ops->ServerSendStatus(context->trailing_metadata_, status);
289  }
290 
291  void RunHandler(const HandlerParameter& param) final {
293  FillOps(param.server_context, &ops);
294  param.call->PerformOps(&ops);
295  param.call->cq()->Pluck(&ops);
296  }
297 };
298 
299 } // namespace internal
300 } // namespace grpc
301 
302 #endif // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
A wrapper class of an application provided server streaming handler.
Definition: byte_buffer.h:47
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:291
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:83
ServerStreamingHandler(std::function< Status(ServiceType *, ServerContext *, const RequestType *, ServerWriter< ResponseType > *)> func, ServiceType *service)
Definition: method_handler_impl.h:144
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:109
StreamedUnaryHandler(std::function< Status(ServerContext *, ServerUnaryStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler_impl.h:253
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:138
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:878
static void FillOps(ServerContext *context, T *ops)
Definition: method_handler_impl.h:278
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:813
Primary implementation of CallOpSetInterface.
Definition: call.h:619
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:151
ClientStreamingHandler(std::function< Status(ServiceType *, ServerContext *, ServerReader< RequestType > *, ResponseType *)> func, ServiceType *service)
Definition: method_handler_impl.h:102
Handle unknown method by returning UNIMPLEMENTED error.
Definition: method_handler_impl.h:275
Definition: async_unary_call.h:303
RpcMethodHandler(std::function< Status(ServiceType *, ServerContext *, const RequestType *, ResponseType *)> func, ServiceType *service)
Definition: method_handler_impl.h:55
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:175
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:200
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:53
SplitServerStreamingHandler(std::function< Status(ServerContext *, ServerSplitStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler_impl.h:266
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:190
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
Definition: method_handler_impl.h:262
Definition: call.h:268
An Alarm posts the user provided tag to its associated completion queue upon expiry or cancellation...
Definition: alarm.h:31
A wrapper class of an application provided client streaming handler.
Definition: completion_queue.h:76
A wrapper class of an application provided rpc method handler.
Definition: byte_buffer.h:45
Definition: rpc_service_method.h:41
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:96
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs, where the outgoing message stream coming from the server has messages of type W.
Definition: completion_queue.h:55
bool ok() const
Is the status OK?
Definition: status.h:118
TemplatedBidiStreamingHandler(std::function< Status(ServerContext *, Streamer *)> func)
Definition: method_handler_impl.h:196
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:772
Base class for running an RPC handler.
Definition: rpc_service_method.h:38
Did it work? If it didn&#39;t, why?
Definition: status.h:31
Definition: method_handler_impl.h:249
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:61
Definition: completion_queue.h:80
Status CatchingFunctionHandler(Callable &&handler)
Definition: method_handler_impl.h:39
Unknown error.
Definition: status_code_enum.h:35
Internal errors.
Definition: status_code_enum.h:119
BidiStreamingHandler(std::function< Status(ServiceType *, ServerContext *, ServerReaderWriter< ResponseType, RequestType > *)> func, ServiceType *service)
Definition: method_handler_impl.h:238