GRPC C++  1.17.0
server_callback.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
20 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
21 
22 #include <functional>
23 
32 
33 namespace grpc {
34 
35 // forward declarations
36 namespace internal {
37 template <class ServiceType, class RequestType, class ResponseType>
38 class CallbackUnaryHandler;
39 } // namespace internal
40 
41 namespace experimental {
42 
43 // For unary RPCs, the exposed controller class is only an interface
44 // and the actual implementation is an internal class.
46  public:
48 
49  // The method handler must call this function when it is done so that
50  // the library knows to free its resources
51  virtual void Finish(Status s) = 0;
52 
53  // Allow the method handler to push out the initial metadata before
54  // the response and status are ready
55  virtual void SendInitialMetadata(std::function<void(bool)>) = 0;
56 };
57 
58 } // namespace experimental
59 
60 namespace internal {
61 
62 template <class ServiceType, class RequestType, class ResponseType>
63 class CallbackUnaryHandler : public MethodHandler {
64  public:
66  std::function<void(ServerContext*, const RequestType*, ResponseType*,
68  func,
69  ServiceType* service)
70  : func_(func) {}
71  void RunHandler(const HandlerParameter& param) final {
72  // Arena allocate a controller structure (that includes request/response)
73  g_core_codegen_interface->grpc_call_ref(param.call->call());
74  auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc(
75  param.call->call(), sizeof(ServerCallbackRpcControllerImpl)))
76  ServerCallbackRpcControllerImpl(
77  param.server_context, param.call,
78  static_cast<RequestType*>(param.request),
79  std::move(param.call_requester));
80  Status status = param.status;
81 
82  if (status.ok()) {
83  // Call the actual function handler and expect the user to call finish
84  CatchingCallback(std::move(func_), param.server_context,
85  controller->request(), controller->response(),
86  controller);
87  } else {
88  // if deserialization failed, we need to fail the call
89  controller->Finish(status);
90  }
91  }
92 
94  Status* status) final {
95  ByteBuffer buf;
96  buf.set_buffer(req);
97  auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
98  call, sizeof(RequestType))) RequestType();
99  *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
100  buf.Release();
101  if (status->ok()) {
102  return request;
103  }
104  request->~RequestType();
105  return nullptr;
106  }
107 
108  private:
109  std::function<void(ServerContext*, const RequestType*, ResponseType*,
111  func_;
112 
113  // The implementation class of ServerCallbackRpcController is a private member
114  // of CallbackUnaryHandler since it is never exposed anywhere, and this allows
115  // it to take advantage of CallbackUnaryHandler's friendships.
116  class ServerCallbackRpcControllerImpl
118  public:
119  void Finish(Status s) override {
120  finish_tag_.Set(
121  call_.call(),
122  [this](bool) {
123  grpc_call* call = call_.call();
124  auto call_requester = std::move(call_requester_);
125  this->~ServerCallbackRpcControllerImpl(); // explicitly call
126  // destructor
128  call_requester();
129  },
130  &finish_buf_);
131  if (!ctx_->sent_initial_metadata_) {
132  finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
133  ctx_->initial_metadata_flags());
134  if (ctx_->compression_level_set()) {
135  finish_buf_.set_compression_level(ctx_->compression_level());
136  }
137  ctx_->sent_initial_metadata_ = true;
138  }
139  // The response is dropped if the status is not OK.
140  if (s.ok()) {
141  finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_,
142  finish_buf_.SendMessage(resp_));
143  } else {
144  finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, s);
145  }
146  finish_buf_.set_core_cq_tag(&finish_tag_);
147  call_.PerformOps(&finish_buf_);
148  }
149 
150  void SendInitialMetadata(std::function<void(bool)> f) override {
151  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
152 
153  meta_tag_.Set(call_.call(), std::move(f), &meta_buf_);
154  meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
155  ctx_->initial_metadata_flags());
156  if (ctx_->compression_level_set()) {
157  meta_buf_.set_compression_level(ctx_->compression_level());
158  }
159  ctx_->sent_initial_metadata_ = true;
160  meta_buf_.set_core_cq_tag(&meta_tag_);
161  call_.PerformOps(&meta_buf_);
162  }
163 
164  private:
165  template <class SrvType, class ReqType, class RespType>
166  friend class CallbackUnaryHandler;
167 
168  ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call,
169  RequestType* req,
170  std::function<void()> call_requester)
171  : ctx_(ctx),
172  call_(*call),
173  req_(req),
174  call_requester_(std::move(call_requester)) {}
175 
176  ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); }
177 
178  RequestType* request() { return req_; }
179  ResponseType* response() { return &resp_; }
180 
182  CallbackWithSuccessTag meta_tag_;
185  finish_buf_;
186  CallbackWithSuccessTag finish_tag_;
187 
188  ServerContext* ctx_;
189  Call call_;
190  RequestType* req_;
191  ResponseType resp_;
192  std::function<void()> call_requester_;
193  };
194 };
195 
196 } // namespace internal
197 
198 } // namespace grpc
199 
200 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:71
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:141
virtual void grpc_call_ref(grpc_call *call)=0
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:755
virtual void grpc_call_unref(grpc_call *call)=0
virtual void SendInitialMetadata(std::function< void(bool)>)=0
Definition: grpc_types.h:40
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
Definition: call_op_set.h:563
Definition: call_op_set.h:223
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:125
Definition: call_op_set.h:293
An Alarm posts the user provided tag to its associated completion queue upon expiry or cancellation...
Definition: alarm.h:33
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:50
Definition: rpc_service_method.h:42
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:102
CallbackUnaryHandler(std::function< void(ServerContext *, const RequestType *, ResponseType *, experimental::ServerCallbackRpcController *)> func, ServiceType *service)
Definition: server_callback.h:65
Definition: byte_buffer.h:49
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:116
bool ok() const
Is the status OK?
Definition: status.h:118
Did it work? If it didn&#39;t, why?
Definition: status.h:31
void CatchingCallback(Func &&func, Args &&... args)
An exception-safe way of invoking a user-specified callback function.
Definition: callback_common.h:36
Definition: server_callback.h:45
virtual ~ServerCallbackRpcController()
Definition: server_callback.h:47
A sequence of bytes.
Definition: byte_buffer.h:62
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status) final
Definition: server_callback.h:93
Straightforward wrapping of the C call object.
Definition: call.h:36