GRPC C++  1.34.0
server_callback_handlers.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
20 
26 
27 namespace grpc {
28 namespace internal {
29 
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
32  public:
35  const RequestType*, ResponseType*)>
36  get_reactor)
37  : get_reactor_(std::move(get_reactor)) {}
38 
41  allocator) {
42  allocator_ = allocator;
43  }
44 
45  void RunHandler(const HandlerParameter& param) final {
46  // Arena allocate a controller structure (that includes request/response)
48  auto* allocator_state = static_cast<
50  param.internal_data);
51 
53  param.call->call(), sizeof(ServerCallbackUnaryImpl)))
54  ServerCallbackUnaryImpl(
55  static_cast<::grpc::CallbackServerContext*>(param.server_context),
56  param.call, allocator_state, std::move(param.call_requester));
57  param.server_context->BeginCompletionOp(
58  param.call, [call](bool) { call->MaybeDone(); }, call);
59 
60  ServerUnaryReactor* reactor = nullptr;
61  if (param.status.ok()) {
62  reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
63  get_reactor_,
64  static_cast<::grpc::CallbackServerContext*>(param.server_context),
65  call->request(), call->response());
66  }
67 
68  if (reactor == nullptr) {
69  // if deserialization or reactor creator failed, we need to fail the call
71  param.call->call(), sizeof(UnimplementedUnaryReactor)))
74  }
75 
77  call->SetupReactor(reactor);
78  }
79 
81  ::grpc::Status* status, void** handler_data) final {
83  buf.set_buffer(req);
84  RequestType* request = nullptr;
86  allocator_state = nullptr;
87  if (allocator_ != nullptr) {
88  allocator_state = allocator_->AllocateMessages();
89  } else {
90  allocator_state =
94  }
95  *handler_data = allocator_state;
96  request = allocator_state->request();
97  *status =
99  buf.Release();
100  if (status->ok()) {
101  return request;
102  }
103  // Clean up on deserialization failure.
104  allocator_state->Release();
105  return nullptr;
106  }
107 
108  private:
110  const RequestType*, ResponseType*)>
111  get_reactor_;
113  allocator_ = nullptr;
114 
115  class ServerCallbackUnaryImpl : public ServerCallbackUnary {
116  public:
117  void Finish(::grpc::Status s) override {
118  // A callback that only contains a call to MaybeDone can be run as an
119  // inline callback regardless of whether or not OnDone is inlineable
120  // because if the actual OnDone callback needs to be scheduled, MaybeDone
121  // is responsible for dispatching to an executor thread if needed. Thus,
122  // when setting up the finish_tag_, we can set its own callback to
123  // inlineable.
124  finish_tag_.Set(
125  call_.call(),
126  [this](bool) {
127  this->MaybeDone(
128  reactor_.load(std::memory_order_relaxed)->InternalInlineable());
129  },
130  &finish_ops_, /*can_inline=*/true);
131  finish_ops_.set_core_cq_tag(&finish_tag_);
132 
133  if (!ctx_->sent_initial_metadata_) {
134  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
135  ctx_->initial_metadata_flags());
136  if (ctx_->compression_level_set()) {
137  finish_ops_.set_compression_level(ctx_->compression_level());
138  }
139  ctx_->sent_initial_metadata_ = true;
140  }
141  // The response is dropped if the status is not OK.
142  if (s.ok()) {
143  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
144  finish_ops_.SendMessagePtr(response()));
145  } else {
146  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
147  }
148  finish_ops_.set_core_cq_tag(&finish_tag_);
149  call_.PerformOps(&finish_ops_);
150  }
151 
152  void SendInitialMetadata() override {
153  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
154  this->Ref();
155  // The callback for this function should not be marked inline because it
156  // is directly invoking a user-controlled reaction
157  // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
158  // thread. However, any OnDone needed after that can be inlined because it
159  // is already running on an executor thread.
160  meta_tag_.Set(
161  call_.call(),
162  [this](bool ok) {
163  ServerUnaryReactor* reactor =
164  reactor_.load(std::memory_order_relaxed);
165  reactor->OnSendInitialMetadataDone(ok);
166  this->MaybeDone(/*inlineable_ondone=*/true);
167  },
168  &meta_ops_, /*can_inline=*/false);
169  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
170  ctx_->initial_metadata_flags());
171  if (ctx_->compression_level_set()) {
172  meta_ops_.set_compression_level(ctx_->compression_level());
173  }
174  ctx_->sent_initial_metadata_ = true;
175  meta_ops_.set_core_cq_tag(&meta_tag_);
176  call_.PerformOps(&meta_ops_);
177  }
178 
179  private:
180  friend class CallbackUnaryHandler<RequestType, ResponseType>;
181 
182  ServerCallbackUnaryImpl(
185  allocator_state,
186  std::function<void()> call_requester)
187  : ctx_(ctx),
188  call_(*call),
189  allocator_state_(allocator_state),
190  call_requester_(std::move(call_requester)) {
191  ctx_->set_message_allocator_state(allocator_state);
192  }
193 
198  void SetupReactor(ServerUnaryReactor* reactor) {
199  reactor_.store(reactor, std::memory_order_relaxed);
200  this->BindReactor(reactor);
201  this->MaybeCallOnCancel(reactor);
202  this->MaybeDone(reactor->InternalInlineable());
203  }
204 
205  const RequestType* request() { return allocator_state_->request(); }
206  ResponseType* response() { return allocator_state_->response(); }
207 
208  void CallOnDone() override {
209  reactor_.load(std::memory_order_relaxed)->OnDone();
210  grpc_call* call = call_.call();
211  auto call_requester = std::move(call_requester_);
212  allocator_state_->Release();
213  this->~ServerCallbackUnaryImpl(); // explicitly call destructor
215  call_requester();
216  }
217 
218  ServerReactor* reactor() override {
219  return reactor_.load(std::memory_order_relaxed);
220  }
221 
223  meta_ops_;
228  finish_ops_;
230 
231  ::grpc::CallbackServerContext* const ctx_;
234  allocator_state_;
235  std::function<void()> call_requester_;
236  // reactor_ can always be loaded/stored with relaxed memory ordering because
237  // its value is only set once, independently of other data in the object,
238  // and the loads that use it will always actually come provably later even
239  // though they are from different threads since they are triggered by
240  // actions initiated only by the setting up of the reactor_ variable. In
241  // a sense, it's a delayed "const": it gets its value from the SetupReactor
242  // method (not the constructor, so it's not a true const), but it doesn't
243  // change after that and it only gets used by actions caused, directly or
244  // indirectly, by that setup. This comment also applies to the reactor_
245  // variables of the other streaming objects in this file.
246  std::atomic<ServerUnaryReactor*> reactor_;
247  // callbacks_outstanding_ follows a refcount pattern
248  std::atomic<intptr_t> callbacks_outstanding_{
249  3}; // reserve for start, Finish, and CompletionOp
250  };
251 };
252 
253 template <class RequestType, class ResponseType>
254 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
255  public:
257  std::function<ServerReadReactor<RequestType>*(
258  ::grpc::CallbackServerContext*, ResponseType*)>
259  get_reactor)
260  : get_reactor_(std::move(get_reactor)) {}
261  void RunHandler(const HandlerParameter& param) final {
262  // Arena allocate a reader structure (that includes response)
263  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
264 
266  param.call->call(), sizeof(ServerCallbackReaderImpl)))
267  ServerCallbackReaderImpl(
268  static_cast<::grpc::CallbackServerContext*>(param.server_context),
269  param.call, std::move(param.call_requester));
270  // Inlineable OnDone can be false in the CompletionOp callback because there
271  // is no read reactor that has an inlineable OnDone; this only applies to
272  // the DefaultReactor (which is unary).
273  param.server_context->BeginCompletionOp(
274  param.call,
275  [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
276  reader);
277 
278  ServerReadReactor<RequestType>* reactor = nullptr;
279  if (param.status.ok()) {
282  get_reactor_,
283  static_cast<::grpc::CallbackServerContext*>(param.server_context),
284  reader->response());
285  }
286 
287  if (reactor == nullptr) {
288  // if deserialization or reactor creator failed, we need to fail the call
290  param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
293  }
294 
295  reader->SetupReactor(reactor);
296  }
297 
298  private:
299  std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*,
300  ResponseType*)>
301  get_reactor_;
302 
303  class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
304  public:
305  void Finish(::grpc::Status s) override {
306  // A finish tag with only MaybeDone can have its callback inlined
307  // regardless even if OnDone is not inlineable because this callback just
308  // checks a ref and then decides whether or not to dispatch OnDone.
309  finish_tag_.Set(
310  call_.call(),
311  [this](bool) {
312  // Inlineable OnDone can be false here because there is
313  // no read reactor that has an inlineable OnDone; this
314  // only applies to the DefaultReactor (which is unary).
315  this->MaybeDone(/*inlineable_ondone=*/false);
316  },
317  &finish_ops_, /*can_inline=*/true);
318  if (!ctx_->sent_initial_metadata_) {
319  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
320  ctx_->initial_metadata_flags());
321  if (ctx_->compression_level_set()) {
322  finish_ops_.set_compression_level(ctx_->compression_level());
323  }
324  ctx_->sent_initial_metadata_ = true;
325  }
326  // The response is dropped if the status is not OK.
327  if (s.ok()) {
328  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
329  finish_ops_.SendMessagePtr(&resp_));
330  } else {
331  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
332  }
333  finish_ops_.set_core_cq_tag(&finish_tag_);
334  call_.PerformOps(&finish_ops_);
335  }
336 
337  void SendInitialMetadata() override {
338  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
339  this->Ref();
340  // The callback for this function should not be inlined because it invokes
341  // a user-controlled reaction, but any resulting OnDone can be inlined in
342  // the executor to which this callback is dispatched.
343  meta_tag_.Set(
344  call_.call(),
345  [this](bool ok) {
346  ServerReadReactor<RequestType>* reactor =
347  reactor_.load(std::memory_order_relaxed);
348  reactor->OnSendInitialMetadataDone(ok);
349  this->MaybeDone(/*inlineable_ondone=*/true);
350  },
351  &meta_ops_, /*can_inline=*/false);
352  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
353  ctx_->initial_metadata_flags());
354  if (ctx_->compression_level_set()) {
355  meta_ops_.set_compression_level(ctx_->compression_level());
356  }
357  ctx_->sent_initial_metadata_ = true;
358  meta_ops_.set_core_cq_tag(&meta_tag_);
359  call_.PerformOps(&meta_ops_);
360  }
361 
362  void Read(RequestType* req) override {
363  this->Ref();
364  read_ops_.RecvMessage(req);
365  call_.PerformOps(&read_ops_);
366  }
367 
368  private:
369  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
370 
371  ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx,
372  ::grpc::internal::Call* call,
373  std::function<void()> call_requester)
374  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
375 
376  void SetupReactor(ServerReadReactor<RequestType>* reactor) {
377  reactor_.store(reactor, std::memory_order_relaxed);
378  // The callback for this function should not be inlined because it invokes
379  // a user-controlled reaction, but any resulting OnDone can be inlined in
380  // the executor to which this callback is dispatched.
381  read_tag_.Set(
382  call_.call(),
383  [this, reactor](bool ok) {
384  reactor->OnReadDone(ok);
385  this->MaybeDone(/*inlineable_ondone=*/true);
386  },
387  &read_ops_, /*can_inline=*/false);
388  read_ops_.set_core_cq_tag(&read_tag_);
389  this->BindReactor(reactor);
390  this->MaybeCallOnCancel(reactor);
391  // Inlineable OnDone can be false here because there is no read
392  // reactor that has an inlineable OnDone; this only applies to the
393  // DefaultReactor (which is unary).
394  this->MaybeDone(/*inlineable_ondone=*/false);
395  }
396 
397  ~ServerCallbackReaderImpl() {}
398 
399  ResponseType* response() { return &resp_; }
400 
401  void CallOnDone() override {
402  reactor_.load(std::memory_order_relaxed)->OnDone();
403  grpc_call* call = call_.call();
404  auto call_requester = std::move(call_requester_);
405  this->~ServerCallbackReaderImpl(); // explicitly call destructor
407  call_requester();
408  }
409 
410  ServerReactor* reactor() override {
411  return reactor_.load(std::memory_order_relaxed);
412  }
413 
415  meta_ops_;
420  finish_ops_;
424  read_ops_;
426 
427  ::grpc::CallbackServerContext* const ctx_;
429  ResponseType resp_;
430  std::function<void()> call_requester_;
431  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
432  std::atomic<ServerReadReactor<RequestType>*> reactor_;
433  // callbacks_outstanding_ follows a refcount pattern
434  std::atomic<intptr_t> callbacks_outstanding_{
435  3}; // reserve for OnStarted, Finish, and CompletionOp
436  };
437 };
438 
439 template <class RequestType, class ResponseType>
440 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
441  public:
443  std::function<ServerWriteReactor<ResponseType>*(
444  ::grpc::CallbackServerContext*, const RequestType*)>
445  get_reactor)
446  : get_reactor_(std::move(get_reactor)) {}
447  void RunHandler(const HandlerParameter& param) final {
448  // Arena allocate a writer structure
449  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
450 
452  param.call->call(), sizeof(ServerCallbackWriterImpl)))
453  ServerCallbackWriterImpl(
454  static_cast<::grpc::CallbackServerContext*>(param.server_context),
455  param.call, static_cast<RequestType*>(param.request),
456  std::move(param.call_requester));
457  // Inlineable OnDone can be false in the CompletionOp callback because there
458  // is no write reactor that has an inlineable OnDone; this only applies to
459  // the DefaultReactor (which is unary).
460  param.server_context->BeginCompletionOp(
461  param.call,
462  [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
463  writer);
464 
465  ServerWriteReactor<ResponseType>* reactor = nullptr;
466  if (param.status.ok()) {
469  get_reactor_,
470  static_cast<::grpc::CallbackServerContext*>(param.server_context),
471  writer->request());
472  }
473  if (reactor == nullptr) {
474  // if deserialization or reactor creator failed, we need to fail the call
476  param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
479  }
480 
481  writer->SetupReactor(reactor);
482  }
483 
485  ::grpc::Status* status, void** /*handler_data*/) final {
486  ::grpc::ByteBuffer buf;
487  buf.set_buffer(req);
488  auto* request =
490  call, sizeof(RequestType))) RequestType();
491  *status =
493  buf.Release();
494  if (status->ok()) {
495  return request;
496  }
497  request->~RequestType();
498  return nullptr;
499  }
500 
501  private:
502  std::function<ServerWriteReactor<ResponseType>*(
503  ::grpc::CallbackServerContext*, const RequestType*)>
504  get_reactor_;
505 
506  class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
507  public:
508  void Finish(::grpc::Status s) override {
509  // A finish tag with only MaybeDone can have its callback inlined
510  // regardless even if OnDone is not inlineable because this callback just
511  // checks a ref and then decides whether or not to dispatch OnDone.
512  finish_tag_.Set(
513  call_.call(),
514  [this](bool) {
515  // Inlineable OnDone can be false here because there is
516  // no write reactor that has an inlineable OnDone; this
517  // only applies to the DefaultReactor (which is unary).
518  this->MaybeDone(/*inlineable_ondone=*/false);
519  },
520  &finish_ops_, /*can_inline=*/true);
521  finish_ops_.set_core_cq_tag(&finish_tag_);
522 
523  if (!ctx_->sent_initial_metadata_) {
524  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
525  ctx_->initial_metadata_flags());
526  if (ctx_->compression_level_set()) {
527  finish_ops_.set_compression_level(ctx_->compression_level());
528  }
529  ctx_->sent_initial_metadata_ = true;
530  }
531  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
532  call_.PerformOps(&finish_ops_);
533  }
534 
535  void SendInitialMetadata() override {
536  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
537  this->Ref();
538  // The callback for this function should not be inlined because it invokes
539  // a user-controlled reaction, but any resulting OnDone can be inlined in
540  // the executor to which this callback is dispatched.
541  meta_tag_.Set(
542  call_.call(),
543  [this](bool ok) {
544  ServerWriteReactor<ResponseType>* reactor =
545  reactor_.load(std::memory_order_relaxed);
546  reactor->OnSendInitialMetadataDone(ok);
547  this->MaybeDone(/*inlineable_ondone=*/true);
548  },
549  &meta_ops_, /*can_inline=*/false);
550  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
551  ctx_->initial_metadata_flags());
552  if (ctx_->compression_level_set()) {
553  meta_ops_.set_compression_level(ctx_->compression_level());
554  }
555  ctx_->sent_initial_metadata_ = true;
556  meta_ops_.set_core_cq_tag(&meta_tag_);
557  call_.PerformOps(&meta_ops_);
558  }
559 
560  void Write(const ResponseType* resp,
561  ::grpc::WriteOptions options) override {
562  this->Ref();
563  if (options.is_last_message()) {
564  options.set_buffer_hint();
565  }
566  if (!ctx_->sent_initial_metadata_) {
567  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
568  ctx_->initial_metadata_flags());
569  if (ctx_->compression_level_set()) {
570  write_ops_.set_compression_level(ctx_->compression_level());
571  }
572  ctx_->sent_initial_metadata_ = true;
573  }
574  // TODO(vjpai): don't assert
575  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
576  call_.PerformOps(&write_ops_);
577  }
578 
579  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
580  ::grpc::Status s) override {
581  // This combines the write into the finish callback
582  // TODO(vjpai): don't assert
583  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
584  Finish(std::move(s));
585  }
586 
587  private:
588  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
589 
590  ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx,
591  ::grpc::internal::Call* call,
592  const RequestType* req,
593  std::function<void()> call_requester)
594  : ctx_(ctx),
595  call_(*call),
596  req_(req),
597  call_requester_(std::move(call_requester)) {}
598 
599  void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
600  reactor_.store(reactor, std::memory_order_relaxed);
601  // The callback for this function should not be inlined because it invokes
602  // a user-controlled reaction, but any resulting OnDone can be inlined in
603  // the executor to which this callback is dispatched.
604  write_tag_.Set(
605  call_.call(),
606  [this, reactor](bool ok) {
607  reactor->OnWriteDone(ok);
608  this->MaybeDone(/*inlineable_ondone=*/true);
609  },
610  &write_ops_, /*can_inline=*/false);
611  write_ops_.set_core_cq_tag(&write_tag_);
612  this->BindReactor(reactor);
613  this->MaybeCallOnCancel(reactor);
614  // Inlineable OnDone can be false here because there is no write
615  // reactor that has an inlineable OnDone; this only applies to the
616  // DefaultReactor (which is unary).
617  this->MaybeDone(/*inlineable_ondone=*/false);
618  }
619  ~ServerCallbackWriterImpl() { req_->~RequestType(); }
620 
621  const RequestType* request() { return req_; }
622 
623  void CallOnDone() override {
624  reactor_.load(std::memory_order_relaxed)->OnDone();
625  grpc_call* call = call_.call();
626  auto call_requester = std::move(call_requester_);
627  this->~ServerCallbackWriterImpl(); // explicitly call destructor
629  call_requester();
630  }
631 
632  ServerReactor* reactor() override {
633  return reactor_.load(std::memory_order_relaxed);
634  }
635 
637  meta_ops_;
642  finish_ops_;
646  write_ops_;
648 
649  ::grpc::CallbackServerContext* const ctx_;
651  const RequestType* req_;
652  std::function<void()> call_requester_;
653  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
654  std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
655  // callbacks_outstanding_ follows a refcount pattern
656  std::atomic<intptr_t> callbacks_outstanding_{
657  3}; // reserve for OnStarted, Finish, and CompletionOp
658  };
659 };
660 
661 template <class RequestType, class ResponseType>
662 class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
663  public:
667  get_reactor)
668  : get_reactor_(std::move(get_reactor)) {}
669  void RunHandler(const HandlerParameter& param) final {
670  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
671 
673  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
674  ServerCallbackReaderWriterImpl(
675  static_cast<::grpc::CallbackServerContext*>(param.server_context),
676  param.call, std::move(param.call_requester));
677  // Inlineable OnDone can be false in the CompletionOp callback because there
678  // is no bidi reactor that has an inlineable OnDone; this only applies to
679  // the DefaultReactor (which is unary).
680  param.server_context->BeginCompletionOp(
681  param.call,
682  [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
683  stream);
684 
686  if (param.status.ok()) {
689  get_reactor_,
690  static_cast<::grpc::CallbackServerContext*>(param.server_context));
691  }
692 
693  if (reactor == nullptr) {
694  // if deserialization or reactor creator failed, we need to fail the call
696  param.call->call(),
700  }
701 
702  stream->SetupReactor(reactor);
703  }
704 
705  private:
706  std::function<ServerBidiReactor<RequestType, ResponseType>*(
708  get_reactor_;
709 
710  class ServerCallbackReaderWriterImpl
711  : public ServerCallbackReaderWriter<RequestType, ResponseType> {
712  public:
713  void Finish(::grpc::Status s) override {
714  // A finish tag with only MaybeDone can have its callback inlined
715  // regardless even if OnDone is not inlineable because this callback just
716  // checks a ref and then decides whether or not to dispatch OnDone.
717  finish_tag_.Set(
718  call_.call(),
719  [this](bool) {
720  // Inlineable OnDone can be false here because there is
721  // no bidi reactor that has an inlineable OnDone; this
722  // only applies to the DefaultReactor (which is unary).
723  this->MaybeDone(/*inlineable_ondone=*/false);
724  },
725  &finish_ops_, /*can_inline=*/true);
726  finish_ops_.set_core_cq_tag(&finish_tag_);
727 
728  if (!ctx_->sent_initial_metadata_) {
729  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
730  ctx_->initial_metadata_flags());
731  if (ctx_->compression_level_set()) {
732  finish_ops_.set_compression_level(ctx_->compression_level());
733  }
734  ctx_->sent_initial_metadata_ = true;
735  }
736  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
737  call_.PerformOps(&finish_ops_);
738  }
739 
740  void SendInitialMetadata() override {
741  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
742  this->Ref();
743  // The callback for this function should not be inlined because it invokes
744  // a user-controlled reaction, but any resulting OnDone can be inlined in
745  // the executor to which this callback is dispatched.
746  meta_tag_.Set(
747  call_.call(),
748  [this](bool ok) {
749  ServerBidiReactor<RequestType, ResponseType>* reactor =
750  reactor_.load(std::memory_order_relaxed);
751  reactor->OnSendInitialMetadataDone(ok);
752  this->MaybeDone(/*inlineable_ondone=*/true);
753  },
754  &meta_ops_, /*can_inline=*/false);
755  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
756  ctx_->initial_metadata_flags());
757  if (ctx_->compression_level_set()) {
758  meta_ops_.set_compression_level(ctx_->compression_level());
759  }
760  ctx_->sent_initial_metadata_ = true;
761  meta_ops_.set_core_cq_tag(&meta_tag_);
762  call_.PerformOps(&meta_ops_);
763  }
764 
765  void Write(const ResponseType* resp,
766  ::grpc::WriteOptions options) override {
767  this->Ref();
768  if (options.is_last_message()) {
769  options.set_buffer_hint();
770  }
771  if (!ctx_->sent_initial_metadata_) {
772  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
773  ctx_->initial_metadata_flags());
774  if (ctx_->compression_level_set()) {
775  write_ops_.set_compression_level(ctx_->compression_level());
776  }
777  ctx_->sent_initial_metadata_ = true;
778  }
779  // TODO(vjpai): don't assert
780  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
781  call_.PerformOps(&write_ops_);
782  }
783 
784  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
785  ::grpc::Status s) override {
786  // TODO(vjpai): don't assert
787  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
788  Finish(std::move(s));
789  }
790 
791  void Read(RequestType* req) override {
792  this->Ref();
793  read_ops_.RecvMessage(req);
794  call_.PerformOps(&read_ops_);
795  }
796 
797  private:
798  friend class CallbackBidiHandler<RequestType, ResponseType>;
799 
800  ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx,
801  ::grpc::internal::Call* call,
802  std::function<void()> call_requester)
803  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
804 
805  void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
806  reactor_.store(reactor, std::memory_order_relaxed);
807  // The callbacks for these functions should not be inlined because they
808  // invoke user-controlled reactions, but any resulting OnDones can be
809  // inlined in the executor to which a callback is dispatched.
810  write_tag_.Set(
811  call_.call(),
812  [this, reactor](bool ok) {
813  reactor->OnWriteDone(ok);
814  this->MaybeDone(/*inlineable_ondone=*/true);
815  },
816  &write_ops_, /*can_inline=*/false);
817  write_ops_.set_core_cq_tag(&write_tag_);
818  read_tag_.Set(
819  call_.call(),
820  [this, reactor](bool ok) {
821  reactor->OnReadDone(ok);
822  this->MaybeDone(/*inlineable_ondone=*/true);
823  },
824  &read_ops_, /*can_inline=*/false);
825  read_ops_.set_core_cq_tag(&read_tag_);
826  this->BindReactor(reactor);
827  this->MaybeCallOnCancel(reactor);
828  // Inlineable OnDone can be false here because there is no bidi
829  // reactor that has an inlineable OnDone; this only applies to the
830  // DefaultReactor (which is unary).
831  this->MaybeDone(/*inlineable_ondone=*/false);
832  }
833 
834  void CallOnDone() override {
835  reactor_.load(std::memory_order_relaxed)->OnDone();
836  grpc_call* call = call_.call();
837  auto call_requester = std::move(call_requester_);
838  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
840  call_requester();
841  }
842 
843  ServerReactor* reactor() override {
844  return reactor_.load(std::memory_order_relaxed);
845  }
846 
848  meta_ops_;
853  finish_ops_;
857  write_ops_;
861  read_ops_;
863 
864  ::grpc::CallbackServerContext* const ctx_;
866  std::function<void()> call_requester_;
867  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
868  std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
869  // callbacks_outstanding_ follows a refcount pattern
870  std::atomic<intptr_t> callbacks_outstanding_{
871  3}; // reserve for OnStarted, Finish, and CompletionOp
872  };
873 };
874 
875 } // namespace internal
876 } // namespace grpc
877 
878 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
grpc::ServerReadReactor
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:182
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:136
grpc::ServerCallbackReaderWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::CallbackUnaryHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:80
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:654
rpc_service_method.h
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:851
status.h
grpc::CoreCodegenInterface::grpc_call_ref
virtual void grpc_call_ref(grpc_call *call)=0
grpc::CallbackServerContext
Definition: server_context.h:577
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:282
grpc::CoreCodegenInterface::grpc_call_arena_alloc
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:35
grpc::internal::CallbackClientStreamingHandler::CallbackClientStreamingHandler
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(::grpc::CallbackServerContext *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:256
grpc::experimental::MessageAllocator< RequestType, ResponseType >
grpc::internal::CallbackClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:261
grpc::ServerCallbackUnary::BindReactor
void BindReactor(Reactor *reactor)
Definition: server_callback.h:201
server_callback.h
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:212
grpc::internal::ServerCallbackCall::MaybeCallOnCancel
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:106
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:118
grpc::ServerCallbackReaderWriter
Definition: server_callback.h:238
grpc::internal::CallbackUnaryHandler::SetMessageAllocator
void SetMessageAllocator(::grpc::experimental::MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_handlers.h:39
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:31
grpc::internal::CallbackServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: server_callback_handlers.h:484
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
grpc::ServerBidiReactor
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:186
grpc_byte_buffer
Definition: grpc_types.h:40
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:61
grpc::ServerCallbackWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::DefaultMessageHolder
Definition: server_callback.h:160
grpc::internal::CallbackBidiHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:669
grpc::SerializationTraits
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
grpc::internal::CallbackUnaryHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:45
grpc::ServerCallbackReader< RequestType >::BindReactor
void BindReactor(ServerReadReactor< RequestType > *reactor)
Definition: server_callback.h:215
grpc::ServerCallbackWriter< ResponseType >::BindReactor
void BindReactor(ServerWriteReactor< ResponseType > *reactor)
Definition: server_callback.h:232
grpc::internal::FinishOnlyReactor
Definition: server_callback.h:758
grpc::internal::CallbackUnaryHandler::CallbackUnaryHandler
CallbackUnaryHandler(std::function< ServerUnaryReactor *(::grpc::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:33
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:38
grpc::CoreCodegenInterface::grpc_call_unref
virtual void grpc_call_unref(grpc_call *call)=0
grpc::internal::CallbackServerStreamingHandler::CallbackServerStreamingHandler
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(::grpc::CallbackServerContext *, const RequestType *)> get_reactor)
Definition: server_callback_handlers.h:442
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:79
grpc::ServerWriteReactor
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:184
grpc::internal::UnimplementedUnaryReactor
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
Definition: server_callback.h:764
grpc::UNIMPLEMENTED
@ UNIMPLEMENTED
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
grpc::ServerCallbackReaderWriter< RequestType, ResponseType >::BindReactor
void BindReactor(ServerBidiReactor< RequestType, ResponseType > *reactor)
Definition: server_callback.h:250
grpc::internal::CallbackBidiHandler::CallbackBidiHandler
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(::grpc::CallbackServerContext *)> get_reactor)
Definition: server_callback_handlers.h:664
grpc::ServerCallbackWriter
Definition: server_callback.h:221
grpc::experimental::MessageHolder::response
ResponseT * response()
Definition: message_allocator.h:49
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:69
std
Definition: async_unary_call.h:398
server_context.h
grpc::WriteOptions::set_buffer_hint
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:117
grpc::ServerCallbackUnary::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::experimental::ServerUnaryReactor
::grpc::ServerUnaryReactor ServerUnaryReactor
Definition: server_callback.h:788
grpc::internal::ServerCallbackCall::Ref
void Ref()
Increases the reference count.
Definition: server_callback.h:124
grpc::ServerCallbackReader
Definition: server_callback.h:207
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:96
GPR_CODEGEN_ASSERT
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
grpc::WriteOptions::is_last_message
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:181
grpc::experimental::MessageAllocator::AllocateMessages
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
grpc::internal::CallOpRecvMessage
Definition: byte_buffer.h:52
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:140
grpc::experimental::MessageHolder::Release
virtual void Release()=0
grpc::internal::CatchingReactorGetter
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
Definition: callback_common.h:51
grpc::ServerCallbackUnary
Definition: server_callback.h:191
grpc::ServerUnaryReactor
Definition: server_callback.h:688
message_allocator.h
grpc::internal::CallbackServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:447
grpc::experimental::MessageHolder< RequestType, ResponseType >
grpc::experimental::MessageHolder::request
RequestT * request()
Definition: message_allocator.h:48
grpc::ServerCallbackReader::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::ServerCallbackCall::MaybeDone
void MaybeDone()
Definition: server_callback.h:92