GRPC C++  1.8.0
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
method_handler_impl.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
20 #define GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
21 
26 
27 namespace grpc {
28 
29 namespace internal {
31 template <class ServiceType, class RequestType, class ResponseType>
32 class RpcMethodHandler : public MethodHandler {
33  public:
34  RpcMethodHandler(std::function<Status(ServiceType*, ServerContext*,
35  const RequestType*, ResponseType*)>
36  func,
37  ServiceType* service)
38  : func_(func), service_(service) {}
39 
40  void RunHandler(const HandlerParameter& param) final {
41  RequestType req;
43  param.request.bbuf_ptr(), &req);
44  ResponseType rsp;
45  if (status.ok()) {
46  status = func_(service_, param.server_context, &req, &rsp);
47  }
48 
49  GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
52  ops;
53  ops.SendInitialMetadata(param.server_context->initial_metadata_,
54  param.server_context->initial_metadata_flags());
55  if (param.server_context->compression_level_set()) {
56  ops.set_compression_level(param.server_context->compression_level());
57  }
58  if (status.ok()) {
59  status = ops.SendMessage(rsp);
60  }
61  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
62  param.call->PerformOps(&ops);
63  param.call->cq()->Pluck(&ops);
64  }
65 
66  private:
68  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
69  ResponseType*)>
70  func_;
71  // The class the above handler function lives in.
72  ServiceType* service_;
73 };
74 
76 template <class ServiceType, class RequestType, class ResponseType>
77 class ClientStreamingHandler : public MethodHandler {
78  public:
80  std::function<Status(ServiceType*, ServerContext*,
81  ServerReader<RequestType>*, ResponseType*)>
82  func,
83  ServiceType* service)
84  : func_(func), service_(service) {}
85 
86  void RunHandler(const HandlerParameter& param) final {
87  ServerReader<RequestType> reader(param.call, param.server_context);
88  ResponseType rsp;
89  Status status = func_(service_, param.server_context, &reader, &rsp);
90 
91  GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
94  ops;
95  ops.SendInitialMetadata(param.server_context->initial_metadata_,
96  param.server_context->initial_metadata_flags());
97  if (param.server_context->compression_level_set()) {
98  ops.set_compression_level(param.server_context->compression_level());
99  }
100  if (status.ok()) {
101  status = ops.SendMessage(rsp);
102  }
103  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
104  param.call->PerformOps(&ops);
105  param.call->cq()->Pluck(&ops);
106  }
107 
108  private:
109  std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
110  ResponseType*)>
111  func_;
112  ServiceType* service_;
113 };
114 
116 template <class ServiceType, class RequestType, class ResponseType>
117 class ServerStreamingHandler : public MethodHandler {
118  public:
120  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
122  func,
123  ServiceType* service)
124  : func_(func), service_(service) {}
125 
126  void RunHandler(const HandlerParameter& param) final {
127  RequestType req;
129  param.request.bbuf_ptr(), &req);
130 
131  if (status.ok()) {
132  ServerWriter<ResponseType> writer(param.call, param.server_context);
133  status = func_(service_, param.server_context, &req, &writer);
134  }
135 
137  if (!param.server_context->sent_initial_metadata_) {
138  ops.SendInitialMetadata(param.server_context->initial_metadata_,
139  param.server_context->initial_metadata_flags());
140  if (param.server_context->compression_level_set()) {
141  ops.set_compression_level(param.server_context->compression_level());
142  }
143  }
144  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
145  param.call->PerformOps(&ops);
146  if (param.server_context->has_pending_ops_) {
147  param.call->cq()->Pluck(&param.server_context->pending_ops_);
148  }
149  param.call->cq()->Pluck(&ops);
150  }
151 
152  private:
153  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
155  func_;
156  ServiceType* service_;
157 };
158 
166 template <class Streamer, bool WriteNeeded>
167 class TemplatedBidiStreamingHandler : public MethodHandler {
168  public:
170  std::function<Status(ServerContext*, Streamer*)> func)
171  : func_(func), write_needed_(WriteNeeded) {}
172 
173  void RunHandler(const HandlerParameter& param) final {
174  Streamer stream(param.call, param.server_context);
175  Status status = func_(param.server_context, &stream);
176 
178  if (!param.server_context->sent_initial_metadata_) {
179  ops.SendInitialMetadata(param.server_context->initial_metadata_,
180  param.server_context->initial_metadata_flags());
181  if (param.server_context->compression_level_set()) {
182  ops.set_compression_level(param.server_context->compression_level());
183  }
184  if (write_needed_ && status.ok()) {
185  // If we needed a write but never did one, we need to mark the
186  // status as a fail
187  status = Status(StatusCode::INTERNAL,
188  "Service did not provide response message");
189  }
190  }
191  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
192  param.call->PerformOps(&ops);
193  if (param.server_context->has_pending_ops_) {
194  param.call->cq()->Pluck(&param.server_context->pending_ops_);
195  }
196  param.call->cq()->Pluck(&ops);
197  }
198 
199  private:
200  std::function<Status(ServerContext*, Streamer*)> func_;
201  const bool write_needed_;
202 };
203 
204 template <class ServiceType, class RequestType, class ResponseType>
205 class BidiStreamingHandler
206  : public TemplatedBidiStreamingHandler<
207  ServerReaderWriter<ResponseType, RequestType>, false> {
208  public:
210  std::function<Status(ServiceType*, ServerContext*,
212  func,
213  ServiceType* service)
215  ServerReaderWriter<ResponseType, RequestType>, false>(std::bind(
216  func, service, std::placeholders::_1, std::placeholders::_2)) {}
217 };
218 
219 template <class RequestType, class ResponseType>
222  ServerUnaryStreamer<RequestType, ResponseType>, true> {
223  public:
225  std::function<Status(ServerContext*,
227  func)
229  ServerUnaryStreamer<RequestType, ResponseType>, true>(func) {}
230 };
231 
232 template <class RequestType, class ResponseType>
235  ServerSplitStreamer<RequestType, ResponseType>, false> {
236  public:
238  std::function<Status(ServerContext*,
240  func)
242  ServerSplitStreamer<RequestType, ResponseType>, false>(func) {}
243 };
244 
247  public:
248  template <class T>
249  static void FillOps(ServerContext* context, T* ops) {
250  Status status(StatusCode::UNIMPLEMENTED, "");
251  if (!context->sent_initial_metadata_) {
252  ops->SendInitialMetadata(context->initial_metadata_,
253  context->initial_metadata_flags());
254  if (context->compression_level_set()) {
255  ops->set_compression_level(context->compression_level());
256  }
257  context->sent_initial_metadata_ = true;
258  }
259  ops->ServerSendStatus(context->trailing_metadata_, status);
260  }
261 
262  void RunHandler(const HandlerParameter& param) final {
264  FillOps(param.server_context, &ops);
265  param.call->PerformOps(&ops);
266  param.call->cq()->Pluck(&ops);
267  }
268 };
269 
270 } // namespace internal
271 } // namespace grpc
272 
273 #endif // GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:171
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:262
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:83
ServerStreamingHandler(std::function< Status(ServiceType *, ServerContext *, const RequestType *, ServerWriter< ResponseType > *)> func, ServiceType *service)
Definition: method_handler_impl.h:119
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:86
StreamedUnaryHandler(std::function< Status(ServerContext *, ServerUnaryStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler_impl.h:224
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:878
static void FillOps(ServerContext *context, T *ops)
Definition: method_handler_impl.h:249
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:135
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:813
Primary implementaiton of CallOpSetInterface.
Definition: call.h:627
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:126
ClientStreamingHandler(std::function< Status(ServiceType *, ServerContext *, ServerReader< RequestType > *, ResponseType *)> func, ServiceType *service)
Definition: method_handler_impl.h:79
Handle unknown method by returning UNIMPLEMENTED error.
Definition: method_handler_impl.h:246
RpcMethodHandler(std::function< Status(ServiceType *, ServerContext *, const RequestType *, ResponseType *)> func, ServiceType *service)
Definition: method_handler_impl.h:34
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:173
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:53
SplitServerStreamingHandler(std::function< Status(ServerContext *, ServerSplitStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler_impl.h:237
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
Definition: method_handler_impl.h:233
Definition: call.h:285
Definition: rpc_service_method.h:41
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:96
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs, where the outgoing message stream coming from the server has messages of type W.
Definition: completion_queue.h:55
TemplatedBidiStreamingHandler(std::function< Status(ServerContext *, Streamer *)> func)
Definition: method_handler_impl.h:169
Base class for running an RPC handler.
Definition: rpc_service_method.h:38
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:772
bool ok() const
Is the status OK?
Definition: status.h:64
Did it work? If it didn't, why?
Definition: status.h:30
Definition: method_handler_impl.h:220
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:40
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:186
Internal errors.
Definition: status_code_enum.h:119
BidiStreamingHandler(std::function< Status(ServiceType *, ServerContext *, ServerReaderWriter< ResponseType, RequestType > *)> func, ServiceType *service)
Definition: method_handler_impl.h:209