GRPC C++  1.6.0
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
method_handler_impl.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
20 #define GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
21 
25 
26 namespace grpc {
27 
29 template <class ServiceType, class RequestType, class ResponseType>
30 class RpcMethodHandler : public MethodHandler {
31  public:
32  RpcMethodHandler(std::function<Status(ServiceType*, ServerContext*,
33  const RequestType*, ResponseType*)>
34  func,
35  ServiceType* service)
36  : func_(func), service_(service) {}
37 
38  void RunHandler(const HandlerParameter& param) final {
39  RequestType req;
40  Status status =
42  ResponseType rsp;
43  if (status.ok()) {
44  status = func_(service_, param.server_context, &req, &rsp);
45  }
46 
47  GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
50  ops;
51  ops.SendInitialMetadata(param.server_context->initial_metadata_,
52  param.server_context->initial_metadata_flags());
53  if (param.server_context->compression_level_set()) {
54  ops.set_compression_level(param.server_context->compression_level());
55  }
56  if (status.ok()) {
57  status = ops.SendMessage(rsp);
58  }
59  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
60  param.call->PerformOps(&ops);
61  param.call->cq()->Pluck(&ops);
62  }
63 
64  private:
66  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
67  ResponseType*)>
68  func_;
69  // The class the above handler function lives in.
70  ServiceType* service_;
71 };
72 
74 template <class ServiceType, class RequestType, class ResponseType>
75 class ClientStreamingHandler : public MethodHandler {
76  public:
78  std::function<Status(ServiceType*, ServerContext*,
79  ServerReader<RequestType>*, ResponseType*)>
80  func,
81  ServiceType* service)
82  : func_(func), service_(service) {}
83 
84  void RunHandler(const HandlerParameter& param) final {
85  ServerReader<RequestType> reader(param.call, param.server_context);
86  ResponseType rsp;
87  Status status = func_(service_, param.server_context, &reader, &rsp);
88 
89  GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
92  ops;
93  ops.SendInitialMetadata(param.server_context->initial_metadata_,
94  param.server_context->initial_metadata_flags());
95  if (param.server_context->compression_level_set()) {
96  ops.set_compression_level(param.server_context->compression_level());
97  }
98  if (status.ok()) {
99  status = ops.SendMessage(rsp);
100  }
101  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
102  param.call->PerformOps(&ops);
103  param.call->cq()->Pluck(&ops);
104  }
105 
106  private:
107  std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
108  ResponseType*)>
109  func_;
110  ServiceType* service_;
111 };
112 
114 template <class ServiceType, class RequestType, class ResponseType>
115 class ServerStreamingHandler : public MethodHandler {
116  public:
118  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
120  func,
121  ServiceType* service)
122  : func_(func), service_(service) {}
123 
124  void RunHandler(const HandlerParameter& param) final {
125  RequestType req;
126  Status status =
128 
129  if (status.ok()) {
130  ServerWriter<ResponseType> writer(param.call, param.server_context);
131  status = func_(service_, param.server_context, &req, &writer);
132  }
133 
135  if (!param.server_context->sent_initial_metadata_) {
136  ops.SendInitialMetadata(param.server_context->initial_metadata_,
137  param.server_context->initial_metadata_flags());
138  if (param.server_context->compression_level_set()) {
139  ops.set_compression_level(param.server_context->compression_level());
140  }
141  }
142  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
143  param.call->PerformOps(&ops);
144  if (param.server_context->has_pending_ops_) {
145  param.call->cq()->Pluck(&param.server_context->pending_ops_);
146  }
147  param.call->cq()->Pluck(&ops);
148  }
149 
150  private:
151  std::function<Status(ServiceType*, ServerContext*, const RequestType*,
153  func_;
154  ServiceType* service_;
155 };
156 
164 template <class Streamer, bool WriteNeeded>
166  public:
168  std::function<Status(ServerContext*, Streamer*)> func)
169  : func_(func), write_needed_(WriteNeeded) {}
170 
171  void RunHandler(const HandlerParameter& param) final {
172  Streamer stream(param.call, param.server_context);
173  Status status = func_(param.server_context, &stream);
174 
176  if (!param.server_context->sent_initial_metadata_) {
177  ops.SendInitialMetadata(param.server_context->initial_metadata_,
178  param.server_context->initial_metadata_flags());
179  if (param.server_context->compression_level_set()) {
180  ops.set_compression_level(param.server_context->compression_level());
181  }
182  if (write_needed_ && status.ok()) {
183  // If we needed a write but never did one, we need to mark the
184  // status as a fail
185  status = Status(StatusCode::INTERNAL,
186  "Service did not provide response message");
187  }
188  }
189  ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
190  param.call->PerformOps(&ops);
191  if (param.server_context->has_pending_ops_) {
192  param.call->cq()->Pluck(&param.server_context->pending_ops_);
193  }
194  param.call->cq()->Pluck(&ops);
195  }
196 
197  private:
198  std::function<Status(ServerContext*, Streamer*)> func_;
199  const bool write_needed_;
200 };
201 
202 template <class ServiceType, class RequestType, class ResponseType>
203 class BidiStreamingHandler
204  : public TemplatedBidiStreamingHandler<
205  ServerReaderWriter<ResponseType, RequestType>, false> {
206  public:
208  std::function<Status(ServiceType*, ServerContext*,
210  func,
211  ServiceType* service)
213  ServerReaderWriter<ResponseType, RequestType>, false>(std::bind(
214  func, service, std::placeholders::_1, std::placeholders::_2)) {}
215 };
216 
217 template <class RequestType, class ResponseType>
220  ServerUnaryStreamer<RequestType, ResponseType>, true> {
221  public:
223  std::function<Status(ServerContext*,
225  func)
227  ServerUnaryStreamer<RequestType, ResponseType>, true>(func) {}
228 };
229 
230 template <class RequestType, class ResponseType>
233  ServerSplitStreamer<RequestType, ResponseType>, false> {
234  public:
236  std::function<Status(ServerContext*,
238  func)
240  ServerSplitStreamer<RequestType, ResponseType>, false>(func) {}
241 };
242 
245  public:
246  template <class T>
247  static void FillOps(ServerContext* context, T* ops) {
248  Status status(StatusCode::UNIMPLEMENTED, "");
249  if (!context->sent_initial_metadata_) {
250  ops->SendInitialMetadata(context->initial_metadata_,
251  context->initial_metadata_flags());
252  if (context->compression_level_set()) {
253  ops->set_compression_level(context->compression_level());
254  }
255  context->sent_initial_metadata_ = true;
256  }
257  ops->ServerSendStatus(context->trailing_metadata_, status);
258  }
259 
260  void RunHandler(const HandlerParameter& param) final {
262  FillOps(param.server_context, &ops);
263  param.call->PerformOps(&ops);
264  param.call->cq()->Pluck(&ops);
265  }
266 };
267 
268 } // namespace grpc
269 
270 #endif // GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
SplitServerStreamingHandler(std::function< Status(ServerContext *, ServerSplitStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler_impl.h:235
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:170
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:84
Base class for running an RPC handler.
Definition: rpc_service_method.h:41
Definition: method_handler_impl.h:231
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:799
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:126
ClientStreamingHandler(std::function< Status(ServiceType *, ServerContext *, ServerReader< RequestType > *, ResponseType *)> func, ServiceType *service)
Definition: method_handler_impl.h:77
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:736
StreamedUnaryHandler(std::function< Status(ServerContext *, ServerUnaryStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler_impl.h:222
ServerStreamingHandler(std::function< Status(ServiceType *, ServerContext *, const RequestType *, ServerWriter< ResponseType > *)> func, ServiceType *service)
Definition: method_handler_impl.h:117
static void FillOps(ServerContext *context, T *ops)
Definition: method_handler_impl.h:247
Definition: method_handler_impl.h:218
Definition: call.h:448
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:53
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:260
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:49
Handle unknown method by returning UNIMPLEMENTED error.
Definition: method_handler_impl.h:244
Definition: call.h:256
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:38
A wrapper class of an application provided bidi-streaming handler.
Definition: method_handler_impl.h:165
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:171
RpcMethodHandler(std::function< Status(ServiceType *, ServerContext *, const RequestType *, ResponseType *)> func, ServiceType *service)
Definition: method_handler_impl.h:32
Primary implementaiton of CallOpSetInterface.
Definition: call.h:591
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:124
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:95
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs, where the outgoing message stream coming from the server has messages of type W.
Definition: completion_queue.h:55
BidiStreamingHandler(std::function< Status(ServiceType *, ServerContext *, ServerReaderWriter< ResponseType, RequestType > *)> func, ServiceType *service)
Definition: method_handler_impl.h:207
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:698
bool ok() const
Is the status OK?
Definition: status.h:64
Definition: rpc_service_method.h:44
Did it work? If it didn't, why?
Definition: status.h:30
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
Definition: call.h:203
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:185
Internal errors.
Definition: status_code_enum.h:119
TemplatedBidiStreamingHandler(std::function< Status(ServerContext *, Streamer *)> func)
Definition: method_handler_impl.h:167