GRPC C++  1.33.1
server_callback_handlers.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
20 
26 
27 namespace grpc {
28 namespace internal {
29 
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
32  public:
35  const RequestType*, ResponseType*)>
36  get_reactor)
37  : get_reactor_(std::move(get_reactor)) {}
38 
41  allocator) {
42  allocator_ = allocator;
43  }
44 
45  void RunHandler(const HandlerParameter& param) final {
46  // Arena allocate a controller structure (that includes request/response)
48  auto* allocator_state = static_cast<
50  param.internal_data);
51 
53  param.call->call(), sizeof(ServerCallbackUnaryImpl)))
54  ServerCallbackUnaryImpl(
55  static_cast<::grpc::CallbackServerContext*>(param.server_context),
56  param.call, allocator_state, std::move(param.call_requester));
57  param.server_context->BeginCompletionOp(
58  param.call, [call](bool) { call->MaybeDone(); }, call);
59 
60  ServerUnaryReactor* reactor = nullptr;
61  if (param.status.ok()) {
62  reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
63  get_reactor_,
64  static_cast<::grpc::CallbackServerContext*>(param.server_context),
65  call->request(), call->response());
66  }
67 
68  if (reactor == nullptr) {
69  // if deserialization or reactor creator failed, we need to fail the call
71  param.call->call(), sizeof(UnimplementedUnaryReactor)))
74  }
75 
77  call->SetupReactor(reactor);
78  }
79 
81  ::grpc::Status* status, void** handler_data) final {
83  buf.set_buffer(req);
84  RequestType* request = nullptr;
86  allocator_state = nullptr;
87  if (allocator_ != nullptr) {
88  allocator_state = allocator_->AllocateMessages();
89  } else {
90  allocator_state =
94  }
95  *handler_data = allocator_state;
96  request = allocator_state->request();
97  *status =
99  buf.Release();
100  if (status->ok()) {
101  return request;
102  }
103  // Clean up on deserialization failure.
104  allocator_state->Release();
105  return nullptr;
106  }
107 
108  private:
110  const RequestType*, ResponseType*)>
111  get_reactor_;
113  allocator_ = nullptr;
114 
115  class ServerCallbackUnaryImpl : public ServerCallbackUnary {
116  public:
117  void Finish(::grpc::Status s) override {
118  // A callback that only contains a call to MaybeDone can be run as an
119  // inline callback regardless of whether or not OnDone is inlineable
120  // because if the actual OnDone callback needs to be scheduled, MaybeDone
121  // is responsible for dispatching to an executor thread if needed. Thus,
122  // when setting up the finish_tag_, we can set its own callback to
123  // inlineable.
124  finish_tag_.Set(
125  call_.call(),
126  [this](bool) {
127  this->MaybeDone(
128  reactor_.load(std::memory_order_relaxed)->InternalInlineable());
129  },
130  &finish_ops_, /*can_inline=*/true);
131  finish_ops_.set_core_cq_tag(&finish_tag_);
132 
133  if (!ctx_->sent_initial_metadata_) {
134  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
135  ctx_->initial_metadata_flags());
136  if (ctx_->compression_level_set()) {
137  finish_ops_.set_compression_level(ctx_->compression_level());
138  }
139  ctx_->sent_initial_metadata_ = true;
140  }
141  // The response is dropped if the status is not OK.
142  if (s.ok()) {
143  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
144  finish_ops_.SendMessagePtr(response()));
145  } else {
146  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
147  }
148  finish_ops_.set_core_cq_tag(&finish_tag_);
149  call_.PerformOps(&finish_ops_);
150  }
151 
152  void SendInitialMetadata() override {
153  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
154  this->Ref();
155  // The callback for this function should not be marked inline because it
156  // is directly invoking a user-controlled reaction
157  // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
158  // thread. However, any OnDone needed after that can be inlined because it
159  // is already running on an executor thread.
160  meta_tag_.Set(call_.call(),
161  [this](bool ok) {
162  ServerUnaryReactor* reactor =
163  reactor_.load(std::memory_order_relaxed);
164  reactor->OnSendInitialMetadataDone(ok);
165  this->MaybeDone(/*inlineable_ondone=*/true);
166  },
167  &meta_ops_, /*can_inline=*/false);
168  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
169  ctx_->initial_metadata_flags());
170  if (ctx_->compression_level_set()) {
171  meta_ops_.set_compression_level(ctx_->compression_level());
172  }
173  ctx_->sent_initial_metadata_ = true;
174  meta_ops_.set_core_cq_tag(&meta_tag_);
175  call_.PerformOps(&meta_ops_);
176  }
177 
178  private:
179  friend class CallbackUnaryHandler<RequestType, ResponseType>;
180 
181  ServerCallbackUnaryImpl(
184  allocator_state,
185  std::function<void()> call_requester)
186  : ctx_(ctx),
187  call_(*call),
188  allocator_state_(allocator_state),
189  call_requester_(std::move(call_requester)) {
190  ctx_->set_message_allocator_state(allocator_state);
191  }
192 
197  void SetupReactor(ServerUnaryReactor* reactor) {
198  reactor_.store(reactor, std::memory_order_relaxed);
199  this->BindReactor(reactor);
200  this->MaybeCallOnCancel(reactor);
201  this->MaybeDone(reactor->InternalInlineable());
202  }
203 
204  const RequestType* request() { return allocator_state_->request(); }
205  ResponseType* response() { return allocator_state_->response(); }
206 
207  void CallOnDone() override {
208  reactor_.load(std::memory_order_relaxed)->OnDone();
209  grpc_call* call = call_.call();
210  auto call_requester = std::move(call_requester_);
211  allocator_state_->Release();
212  this->~ServerCallbackUnaryImpl(); // explicitly call destructor
214  call_requester();
215  }
216 
217  ServerReactor* reactor() override {
218  return reactor_.load(std::memory_order_relaxed);
219  }
220 
222  meta_ops_;
227  finish_ops_;
229 
230  ::grpc::CallbackServerContext* const ctx_;
233  allocator_state_;
234  std::function<void()> call_requester_;
235  // reactor_ can always be loaded/stored with relaxed memory ordering because
236  // its value is only set once, independently of other data in the object,
237  // and the loads that use it will always actually come provably later even
238  // though they are from different threads since they are triggered by
239  // actions initiated only by the setting up of the reactor_ variable. In
240  // a sense, it's a delayed "const": it gets its value from the SetupReactor
241  // method (not the constructor, so it's not a true const), but it doesn't
242  // change after that and it only gets used by actions caused, directly or
243  // indirectly, by that setup. This comment also applies to the reactor_
244  // variables of the other streaming objects in this file.
245  std::atomic<ServerUnaryReactor*> reactor_;
246  // callbacks_outstanding_ follows a refcount pattern
247  std::atomic<intptr_t> callbacks_outstanding_{
248  3}; // reserve for start, Finish, and CompletionOp
249  };
250 };
251 
252 template <class RequestType, class ResponseType>
253 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
254  public:
256  std::function<ServerReadReactor<RequestType>*(
257  ::grpc::CallbackServerContext*, ResponseType*)>
258  get_reactor)
259  : get_reactor_(std::move(get_reactor)) {}
260  void RunHandler(const HandlerParameter& param) final {
261  // Arena allocate a reader structure (that includes response)
262  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
263 
265  param.call->call(), sizeof(ServerCallbackReaderImpl)))
266  ServerCallbackReaderImpl(
267  static_cast<::grpc::CallbackServerContext*>(param.server_context),
268  param.call, std::move(param.call_requester));
269  // Inlineable OnDone can be false in the CompletionOp callback because there
270  // is no read reactor that has an inlineable OnDone; this only applies to
271  // the DefaultReactor (which is unary).
272  param.server_context->BeginCompletionOp(
273  param.call,
274  [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
275  reader);
276 
277  ServerReadReactor<RequestType>* reactor = nullptr;
278  if (param.status.ok()) {
281  get_reactor_,
282  static_cast<::grpc::CallbackServerContext*>(param.server_context),
283  reader->response());
284  }
285 
286  if (reactor == nullptr) {
287  // if deserialization or reactor creator failed, we need to fail the call
289  param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
292  }
293 
294  reader->SetupReactor(reactor);
295  }
296 
297  private:
298  std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*,
299  ResponseType*)>
300  get_reactor_;
301 
302  class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
303  public:
304  void Finish(::grpc::Status s) override {
305  // A finish tag with only MaybeDone can have its callback inlined
306  // regardless even if OnDone is not inlineable because this callback just
307  // checks a ref and then decides whether or not to dispatch OnDone.
308  finish_tag_.Set(call_.call(),
309  [this](bool) {
310  // Inlineable OnDone can be false here because there is
311  // no read reactor that has an inlineable OnDone; this
312  // only applies to the DefaultReactor (which is unary).
313  this->MaybeDone(/*inlineable_ondone=*/false);
314  },
315  &finish_ops_, /*can_inline=*/true);
316  if (!ctx_->sent_initial_metadata_) {
317  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
318  ctx_->initial_metadata_flags());
319  if (ctx_->compression_level_set()) {
320  finish_ops_.set_compression_level(ctx_->compression_level());
321  }
322  ctx_->sent_initial_metadata_ = true;
323  }
324  // The response is dropped if the status is not OK.
325  if (s.ok()) {
326  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
327  finish_ops_.SendMessagePtr(&resp_));
328  } else {
329  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
330  }
331  finish_ops_.set_core_cq_tag(&finish_tag_);
332  call_.PerformOps(&finish_ops_);
333  }
334 
335  void SendInitialMetadata() override {
336  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
337  this->Ref();
338  // The callback for this function should not be inlined because it invokes
339  // a user-controlled reaction, but any resulting OnDone can be inlined in
340  // the executor to which this callback is dispatched.
341  meta_tag_.Set(call_.call(),
342  [this](bool ok) {
343  ServerReadReactor<RequestType>* reactor =
344  reactor_.load(std::memory_order_relaxed);
345  reactor->OnSendInitialMetadataDone(ok);
346  this->MaybeDone(/*inlineable_ondone=*/true);
347  },
348  &meta_ops_, /*can_inline=*/false);
349  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
350  ctx_->initial_metadata_flags());
351  if (ctx_->compression_level_set()) {
352  meta_ops_.set_compression_level(ctx_->compression_level());
353  }
354  ctx_->sent_initial_metadata_ = true;
355  meta_ops_.set_core_cq_tag(&meta_tag_);
356  call_.PerformOps(&meta_ops_);
357  }
358 
359  void Read(RequestType* req) override {
360  this->Ref();
361  read_ops_.RecvMessage(req);
362  call_.PerformOps(&read_ops_);
363  }
364 
365  private:
366  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
367 
368  ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx,
369  ::grpc::internal::Call* call,
370  std::function<void()> call_requester)
371  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
372 
373  void SetupReactor(ServerReadReactor<RequestType>* reactor) {
374  reactor_.store(reactor, std::memory_order_relaxed);
375  // The callback for this function should not be inlined because it invokes
376  // a user-controlled reaction, but any resulting OnDone can be inlined in
377  // the executor to which this callback is dispatched.
378  read_tag_.Set(call_.call(),
379  [this, reactor](bool ok) {
380  reactor->OnReadDone(ok);
381  this->MaybeDone(/*inlineable_ondone=*/true);
382  },
383  &read_ops_, /*can_inline=*/false);
384  read_ops_.set_core_cq_tag(&read_tag_);
385  this->BindReactor(reactor);
386  this->MaybeCallOnCancel(reactor);
387  // Inlineable OnDone can be false here because there is no read
388  // reactor that has an inlineable OnDone; this only applies to the
389  // DefaultReactor (which is unary).
390  this->MaybeDone(/*inlineable_ondone=*/false);
391  }
392 
393  ~ServerCallbackReaderImpl() {}
394 
395  ResponseType* response() { return &resp_; }
396 
397  void CallOnDone() override {
398  reactor_.load(std::memory_order_relaxed)->OnDone();
399  grpc_call* call = call_.call();
400  auto call_requester = std::move(call_requester_);
401  this->~ServerCallbackReaderImpl(); // explicitly call destructor
403  call_requester();
404  }
405 
406  ServerReactor* reactor() override {
407  return reactor_.load(std::memory_order_relaxed);
408  }
409 
411  meta_ops_;
416  finish_ops_;
420  read_ops_;
422 
423  ::grpc::CallbackServerContext* const ctx_;
425  ResponseType resp_;
426  std::function<void()> call_requester_;
427  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
428  std::atomic<ServerReadReactor<RequestType>*> reactor_;
429  // callbacks_outstanding_ follows a refcount pattern
430  std::atomic<intptr_t> callbacks_outstanding_{
431  3}; // reserve for OnStarted, Finish, and CompletionOp
432  };
433 };
434 
435 template <class RequestType, class ResponseType>
436 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
437  public:
439  std::function<ServerWriteReactor<ResponseType>*(
440  ::grpc::CallbackServerContext*, const RequestType*)>
441  get_reactor)
442  : get_reactor_(std::move(get_reactor)) {}
443  void RunHandler(const HandlerParameter& param) final {
444  // Arena allocate a writer structure
445  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
446 
448  param.call->call(), sizeof(ServerCallbackWriterImpl)))
449  ServerCallbackWriterImpl(
450  static_cast<::grpc::CallbackServerContext*>(param.server_context),
451  param.call, static_cast<RequestType*>(param.request),
452  std::move(param.call_requester));
453  // Inlineable OnDone can be false in the CompletionOp callback because there
454  // is no write reactor that has an inlineable OnDone; this only applies to
455  // the DefaultReactor (which is unary).
456  param.server_context->BeginCompletionOp(
457  param.call,
458  [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
459  writer);
460 
461  ServerWriteReactor<ResponseType>* reactor = nullptr;
462  if (param.status.ok()) {
465  get_reactor_,
466  static_cast<::grpc::CallbackServerContext*>(param.server_context),
467  writer->request());
468  }
469  if (reactor == nullptr) {
470  // if deserialization or reactor creator failed, we need to fail the call
472  param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
475  }
476 
477  writer->SetupReactor(reactor);
478  }
479 
481  ::grpc::Status* status, void** /*handler_data*/) final {
482  ::grpc::ByteBuffer buf;
483  buf.set_buffer(req);
484  auto* request =
486  call, sizeof(RequestType))) RequestType();
487  *status =
489  buf.Release();
490  if (status->ok()) {
491  return request;
492  }
493  request->~RequestType();
494  return nullptr;
495  }
496 
497  private:
498  std::function<ServerWriteReactor<ResponseType>*(
499  ::grpc::CallbackServerContext*, const RequestType*)>
500  get_reactor_;
501 
502  class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
503  public:
504  void Finish(::grpc::Status s) override {
505  // A finish tag with only MaybeDone can have its callback inlined
506  // regardless even if OnDone is not inlineable because this callback just
507  // checks a ref and then decides whether or not to dispatch OnDone.
508  finish_tag_.Set(call_.call(),
509  [this](bool) {
510  // Inlineable OnDone can be false here because there is
511  // no write reactor that has an inlineable OnDone; this
512  // only applies to the DefaultReactor (which is unary).
513  this->MaybeDone(/*inlineable_ondone=*/false);
514  },
515  &finish_ops_, /*can_inline=*/true);
516  finish_ops_.set_core_cq_tag(&finish_tag_);
517 
518  if (!ctx_->sent_initial_metadata_) {
519  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
520  ctx_->initial_metadata_flags());
521  if (ctx_->compression_level_set()) {
522  finish_ops_.set_compression_level(ctx_->compression_level());
523  }
524  ctx_->sent_initial_metadata_ = true;
525  }
526  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
527  call_.PerformOps(&finish_ops_);
528  }
529 
530  void SendInitialMetadata() override {
531  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
532  this->Ref();
533  // The callback for this function should not be inlined because it invokes
534  // a user-controlled reaction, but any resulting OnDone can be inlined in
535  // the executor to which this callback is dispatched.
536  meta_tag_.Set(call_.call(),
537  [this](bool ok) {
538  ServerWriteReactor<ResponseType>* reactor =
539  reactor_.load(std::memory_order_relaxed);
540  reactor->OnSendInitialMetadataDone(ok);
541  this->MaybeDone(/*inlineable_ondone=*/true);
542  },
543  &meta_ops_, /*can_inline=*/false);
544  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
545  ctx_->initial_metadata_flags());
546  if (ctx_->compression_level_set()) {
547  meta_ops_.set_compression_level(ctx_->compression_level());
548  }
549  ctx_->sent_initial_metadata_ = true;
550  meta_ops_.set_core_cq_tag(&meta_tag_);
551  call_.PerformOps(&meta_ops_);
552  }
553 
554  void Write(const ResponseType* resp,
555  ::grpc::WriteOptions options) override {
556  this->Ref();
557  if (options.is_last_message()) {
558  options.set_buffer_hint();
559  }
560  if (!ctx_->sent_initial_metadata_) {
561  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
562  ctx_->initial_metadata_flags());
563  if (ctx_->compression_level_set()) {
564  write_ops_.set_compression_level(ctx_->compression_level());
565  }
566  ctx_->sent_initial_metadata_ = true;
567  }
568  // TODO(vjpai): don't assert
569  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
570  call_.PerformOps(&write_ops_);
571  }
572 
573  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
574  ::grpc::Status s) override {
575  // This combines the write into the finish callback
576  // TODO(vjpai): don't assert
577  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
578  Finish(std::move(s));
579  }
580 
581  private:
582  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
583 
584  ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx,
585  ::grpc::internal::Call* call,
586  const RequestType* req,
587  std::function<void()> call_requester)
588  : ctx_(ctx),
589  call_(*call),
590  req_(req),
591  call_requester_(std::move(call_requester)) {}
592 
593  void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
594  reactor_.store(reactor, std::memory_order_relaxed);
595  // The callback for this function should not be inlined because it invokes
596  // a user-controlled reaction, but any resulting OnDone can be inlined in
597  // the executor to which this callback is dispatched.
598  write_tag_.Set(call_.call(),
599  [this, reactor](bool ok) {
600  reactor->OnWriteDone(ok);
601  this->MaybeDone(/*inlineable_ondone=*/true);
602  },
603  &write_ops_, /*can_inline=*/false);
604  write_ops_.set_core_cq_tag(&write_tag_);
605  this->BindReactor(reactor);
606  this->MaybeCallOnCancel(reactor);
607  // Inlineable OnDone can be false here because there is no write
608  // reactor that has an inlineable OnDone; this only applies to the
609  // DefaultReactor (which is unary).
610  this->MaybeDone(/*inlineable_ondone=*/false);
611  }
612  ~ServerCallbackWriterImpl() { req_->~RequestType(); }
613 
614  const RequestType* request() { return req_; }
615 
616  void CallOnDone() override {
617  reactor_.load(std::memory_order_relaxed)->OnDone();
618  grpc_call* call = call_.call();
619  auto call_requester = std::move(call_requester_);
620  this->~ServerCallbackWriterImpl(); // explicitly call destructor
622  call_requester();
623  }
624 
625  ServerReactor* reactor() override {
626  return reactor_.load(std::memory_order_relaxed);
627  }
628 
630  meta_ops_;
635  finish_ops_;
639  write_ops_;
641 
642  ::grpc::CallbackServerContext* const ctx_;
644  const RequestType* req_;
645  std::function<void()> call_requester_;
646  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
647  std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
648  // callbacks_outstanding_ follows a refcount pattern
649  std::atomic<intptr_t> callbacks_outstanding_{
650  3}; // reserve for OnStarted, Finish, and CompletionOp
651  };
652 };
653 
654 template <class RequestType, class ResponseType>
655 class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
656  public:
660  get_reactor)
661  : get_reactor_(std::move(get_reactor)) {}
662  void RunHandler(const HandlerParameter& param) final {
663  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
664 
666  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
667  ServerCallbackReaderWriterImpl(
668  static_cast<::grpc::CallbackServerContext*>(param.server_context),
669  param.call, std::move(param.call_requester));
670  // Inlineable OnDone can be false in the CompletionOp callback because there
671  // is no bidi reactor that has an inlineable OnDone; this only applies to
672  // the DefaultReactor (which is unary).
673  param.server_context->BeginCompletionOp(
674  param.call,
675  [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
676  stream);
677 
679  if (param.status.ok()) {
682  get_reactor_,
683  static_cast<::grpc::CallbackServerContext*>(param.server_context));
684  }
685 
686  if (reactor == nullptr) {
687  // if deserialization or reactor creator failed, we need to fail the call
689  param.call->call(),
693  }
694 
695  stream->SetupReactor(reactor);
696  }
697 
698  private:
699  std::function<ServerBidiReactor<RequestType, ResponseType>*(
701  get_reactor_;
702 
703  class ServerCallbackReaderWriterImpl
704  : public ServerCallbackReaderWriter<RequestType, ResponseType> {
705  public:
706  void Finish(::grpc::Status s) override {
707  // A finish tag with only MaybeDone can have its callback inlined
708  // regardless even if OnDone is not inlineable because this callback just
709  // checks a ref and then decides whether or not to dispatch OnDone.
710  finish_tag_.Set(call_.call(),
711  [this](bool) {
712  // Inlineable OnDone can be false here because there is
713  // no bidi reactor that has an inlineable OnDone; this
714  // only applies to the DefaultReactor (which is unary).
715  this->MaybeDone(/*inlineable_ondone=*/false);
716  },
717  &finish_ops_, /*can_inline=*/true);
718  finish_ops_.set_core_cq_tag(&finish_tag_);
719 
720  if (!ctx_->sent_initial_metadata_) {
721  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
722  ctx_->initial_metadata_flags());
723  if (ctx_->compression_level_set()) {
724  finish_ops_.set_compression_level(ctx_->compression_level());
725  }
726  ctx_->sent_initial_metadata_ = true;
727  }
728  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
729  call_.PerformOps(&finish_ops_);
730  }
731 
732  void SendInitialMetadata() override {
733  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
734  this->Ref();
735  // The callback for this function should not be inlined because it invokes
736  // a user-controlled reaction, but any resulting OnDone can be inlined in
737  // the executor to which this callback is dispatched.
738  meta_tag_.Set(call_.call(),
739  [this](bool ok) {
740  ServerBidiReactor<RequestType, ResponseType>* reactor =
741  reactor_.load(std::memory_order_relaxed);
742  reactor->OnSendInitialMetadataDone(ok);
743  this->MaybeDone(/*inlineable_ondone=*/true);
744  },
745  &meta_ops_, /*can_inline=*/false);
746  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
747  ctx_->initial_metadata_flags());
748  if (ctx_->compression_level_set()) {
749  meta_ops_.set_compression_level(ctx_->compression_level());
750  }
751  ctx_->sent_initial_metadata_ = true;
752  meta_ops_.set_core_cq_tag(&meta_tag_);
753  call_.PerformOps(&meta_ops_);
754  }
755 
756  void Write(const ResponseType* resp,
757  ::grpc::WriteOptions options) override {
758  this->Ref();
759  if (options.is_last_message()) {
760  options.set_buffer_hint();
761  }
762  if (!ctx_->sent_initial_metadata_) {
763  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
764  ctx_->initial_metadata_flags());
765  if (ctx_->compression_level_set()) {
766  write_ops_.set_compression_level(ctx_->compression_level());
767  }
768  ctx_->sent_initial_metadata_ = true;
769  }
770  // TODO(vjpai): don't assert
771  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
772  call_.PerformOps(&write_ops_);
773  }
774 
775  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
776  ::grpc::Status s) override {
777  // TODO(vjpai): don't assert
778  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
779  Finish(std::move(s));
780  }
781 
782  void Read(RequestType* req) override {
783  this->Ref();
784  read_ops_.RecvMessage(req);
785  call_.PerformOps(&read_ops_);
786  }
787 
788  private:
789  friend class CallbackBidiHandler<RequestType, ResponseType>;
790 
791  ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx,
792  ::grpc::internal::Call* call,
793  std::function<void()> call_requester)
794  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
795 
796  void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
797  reactor_.store(reactor, std::memory_order_relaxed);
798  // The callbacks for these functions should not be inlined because they
799  // invoke user-controlled reactions, but any resulting OnDones can be
800  // inlined in the executor to which a callback is dispatched.
801  write_tag_.Set(call_.call(),
802  [this, reactor](bool ok) {
803  reactor->OnWriteDone(ok);
804  this->MaybeDone(/*inlineable_ondone=*/true);
805  },
806  &write_ops_, /*can_inline=*/false);
807  write_ops_.set_core_cq_tag(&write_tag_);
808  read_tag_.Set(call_.call(),
809  [this, reactor](bool ok) {
810  reactor->OnReadDone(ok);
811  this->MaybeDone(/*inlineable_ondone=*/true);
812  },
813  &read_ops_, /*can_inline=*/false);
814  read_ops_.set_core_cq_tag(&read_tag_);
815  this->BindReactor(reactor);
816  this->MaybeCallOnCancel(reactor);
817  // Inlineable OnDone can be false here because there is no bidi
818  // reactor that has an inlineable OnDone; this only applies to the
819  // DefaultReactor (which is unary).
820  this->MaybeDone(/*inlineable_ondone=*/false);
821  }
822 
823  void CallOnDone() override {
824  reactor_.load(std::memory_order_relaxed)->OnDone();
825  grpc_call* call = call_.call();
826  auto call_requester = std::move(call_requester_);
827  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
829  call_requester();
830  }
831 
832  ServerReactor* reactor() override {
833  return reactor_.load(std::memory_order_relaxed);
834  }
835 
837  meta_ops_;
842  finish_ops_;
846  write_ops_;
850  read_ops_;
852 
853  ::grpc::CallbackServerContext* const ctx_;
855  std::function<void()> call_requester_;
856  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
857  std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
858  // callbacks_outstanding_ follows a refcount pattern
859  std::atomic<intptr_t> callbacks_outstanding_{
860  3}; // reserve for OnStarted, Finish, and CompletionOp
861  };
862 };
863 
864 } // namespace internal
865 } // namespace grpc
866 
867 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
grpc::ServerReadReactor
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:182
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:136
grpc::ServerCallbackReaderWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::CallbackUnaryHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:80
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:653
rpc_service_method.h
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:850
status.h
grpc::CoreCodegenInterface::grpc_call_ref
virtual void grpc_call_ref(grpc_call *call)=0
grpc::CallbackServerContext
Definition: server_context.h:567
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:287
grpc::CoreCodegenInterface::grpc_call_arena_alloc
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:35
grpc::internal::CallbackClientStreamingHandler::CallbackClientStreamingHandler
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(::grpc::CallbackServerContext *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:255
grpc::experimental::MessageAllocator< RequestType, ResponseType >
grpc::internal::CallbackClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:260
grpc::ServerCallbackUnary::BindReactor
void BindReactor(Reactor *reactor)
Definition: server_callback.h:201
server_callback.h
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:217
grpc::internal::ServerCallbackCall::MaybeCallOnCancel
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:106
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:118
grpc::ServerCallbackReaderWriter
Definition: server_callback.h:238
grpc::internal::CallbackUnaryHandler::SetMessageAllocator
void SetMessageAllocator(::grpc::experimental::MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_handlers.h:39
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:31
grpc::internal::CallbackServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: server_callback_handlers.h:480
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
grpc::ServerBidiReactor
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:186
grpc_byte_buffer
Definition: grpc_types.h:40
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:60
grpc::ServerCallbackWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::DefaultMessageHolder
Definition: server_callback.h:160
grpc::internal::CallbackBidiHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:662
grpc::SerializationTraits
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
grpc::internal::CallbackUnaryHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:45
grpc::ServerCallbackReader< RequestType >::BindReactor
void BindReactor(ServerReadReactor< RequestType > *reactor)
Definition: server_callback.h:215
grpc::ServerCallbackWriter< ResponseType >::BindReactor
void BindReactor(ServerWriteReactor< ResponseType > *reactor)
Definition: server_callback.h:232
grpc::internal::FinishOnlyReactor
Definition: server_callback.h:758
grpc::internal::CallbackUnaryHandler::CallbackUnaryHandler
CallbackUnaryHandler(std::function< ServerUnaryReactor *(::grpc::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:33
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:38
grpc::CoreCodegenInterface::grpc_call_unref
virtual void grpc_call_unref(grpc_call *call)=0
grpc::internal::CallbackServerStreamingHandler::CallbackServerStreamingHandler
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(::grpc::CallbackServerContext *, const RequestType *)> get_reactor)
Definition: server_callback_handlers.h:438
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:79
grpc::ServerWriteReactor
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:184
grpc::internal::UnimplementedUnaryReactor
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
Definition: server_callback.h:764
grpc::UNIMPLEMENTED
@ UNIMPLEMENTED
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
grpc::ServerCallbackReaderWriter< RequestType, ResponseType >::BindReactor
void BindReactor(ServerBidiReactor< RequestType, ResponseType > *reactor)
Definition: server_callback.h:250
grpc::internal::CallbackBidiHandler::CallbackBidiHandler
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(::grpc::CallbackServerContext *)> get_reactor)
Definition: server_callback_handlers.h:657
grpc::ServerCallbackWriter
Definition: server_callback.h:221
grpc::experimental::MessageHolder::response
ResponseT * response()
Definition: message_allocator.h:49
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:69
std
Definition: async_unary_call.h:301
server_context.h
grpc::WriteOptions::set_buffer_hint
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:122
grpc::ServerCallbackUnary::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::experimental::ServerUnaryReactor
::grpc::ServerUnaryReactor ServerUnaryReactor
Definition: server_callback.h:788
grpc::internal::ServerCallbackCall::Ref
void Ref()
Increases the reference count.
Definition: server_callback.h:124
grpc::ServerCallbackReader
Definition: server_callback.h:207
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:90
GPR_CODEGEN_ASSERT
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
grpc::WriteOptions::is_last_message
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:186
grpc::experimental::MessageAllocator::AllocateMessages
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
grpc::internal::CallOpRecvMessage
Definition: byte_buffer.h:51
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:139
grpc::experimental::MessageHolder::Release
virtual void Release()=0
grpc::internal::CatchingReactorGetter
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
Definition: callback_common.h:51
grpc::ServerCallbackUnary
Definition: server_callback.h:191
grpc::ServerUnaryReactor
Definition: server_callback.h:688
message_allocator.h
grpc::internal::CallbackServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:443
grpc::experimental::MessageHolder< RequestType, ResponseType >
grpc::experimental::MessageHolder::request
RequestT * request()
Definition: message_allocator.h:48
grpc::ServerCallbackReader::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::ServerCallbackCall::MaybeDone
void MaybeDone()
Definition: server_callback.h:92