GRPC C++  1.22.0-dev
client_callback.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
20 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
21 
22 #include <functional>
23 
31 
32 namespace grpc_impl {
33 class Channel;
34 }
35 
36 namespace grpc {
37 
38 class ClientContext;
39 
40 namespace internal {
41 class RpcMethod;
42 
45 template <class InputMessage, class OutputMessage>
46 void CallbackUnaryCall(ChannelInterface* channel, const RpcMethod& method,
47  ClientContext* context, const InputMessage* request,
48  OutputMessage* result,
49  std::function<void(Status)> on_completion) {
51  channel, method, context, request, result, on_completion);
52 }
53 
54 template <class InputMessage, class OutputMessage>
56  public:
58  ClientContext* context, const InputMessage* request,
59  OutputMessage* result,
60  std::function<void(Status)> on_completion) {
61  CompletionQueue* cq = channel->CallbackCQ();
62  GPR_CODEGEN_ASSERT(cq != nullptr);
63  Call call(channel->CreateCall(method, context, cq));
64 
65  using FullCallOpSet =
69 
71  call.call(), sizeof(FullCallOpSet))) FullCallOpSet;
72 
74  call.call(), sizeof(CallbackWithStatusTag)))
75  CallbackWithStatusTag(call.call(), on_completion, ops);
76 
77  // TODO(vjpai): Unify code with sync API as much as possible
78  Status s = ops->SendMessagePtr(request);
79  if (!s.ok()) {
80  tag->force_run(s);
81  return;
82  }
83  ops->SendInitialMetadata(&context->send_initial_metadata_,
84  context->initial_metadata_flags());
85  ops->RecvInitialMetadata(context);
86  ops->RecvMessage(result);
87  ops->AllowNoMessage();
88  ops->ClientSendClose();
89  ops->ClientRecvStatus(context, tag->status_ptr());
90  ops->set_core_cq_tag(tag);
91  call.PerformOps(ops);
92  }
93 };
94 } // namespace internal
95 
96 namespace experimental {
97 
98 // Forward declarations
99 template <class Request, class Response>
101 template <class Response>
103 template <class Request>
105 class ClientUnaryReactor;
106 
107 // NOTE: The streaming objects are not actually implemented in the public API.
108 // These interfaces are provided for mocking only. Typical applications
109 // will interact exclusively with the reactors that they define.
110 template <class Request, class Response>
112  public:
114  virtual void StartCall() = 0;
115  virtual void Write(const Request* req, WriteOptions options) = 0;
116  virtual void WritesDone() = 0;
117  virtual void Read(Response* resp) = 0;
118  virtual void AddHold(int holds) = 0;
119  virtual void RemoveHold() = 0;
120 
121  protected:
123  reactor->BindStream(this);
124  }
125 };
126 
127 template <class Response>
129  public:
131  virtual void StartCall() = 0;
132  virtual void Read(Response* resp) = 0;
133  virtual void AddHold(int holds) = 0;
134  virtual void RemoveHold() = 0;
135 
136  protected:
138  reactor->BindReader(this);
139  }
140 };
141 
142 template <class Request>
144  public:
146  virtual void StartCall() = 0;
147  void Write(const Request* req) { Write(req, WriteOptions()); }
148  virtual void Write(const Request* req, WriteOptions options) = 0;
149  void WriteLast(const Request* req, WriteOptions options) {
150  Write(req, options.set_last_message());
151  }
152  virtual void WritesDone() = 0;
153 
154  virtual void AddHold(int holds) = 0;
155  virtual void RemoveHold() = 0;
156 
157  protected:
159  reactor->BindWriter(this);
160  }
161 };
162 
164  public:
165  virtual ~ClientCallbackUnary() {}
166  virtual void StartCall() = 0;
167 
168  protected:
169  void BindReactor(ClientUnaryReactor* reactor);
170 };
171 
172 // The following classes are the reactor interfaces that are to be implemented
173 // by the user. They are passed in to the library as an argument to a call on a
174 // stub (either a codegen-ed call or a generic call). The streaming RPC is
175 // activated by calling StartCall, possibly after initiating StartRead,
176 // StartWrite, or AddHold operations on the streaming object. Note that none of
177 // the classes are pure; all reactions have a default empty reaction so that the
178 // user class only needs to override those classes that it cares about.
179 // The reactor must be passed to the stub invocation before any of the below
180 // operations can be called.
181 
183 template <class Request, class Response>
184 class ClientBidiReactor {
185  public:
186  virtual ~ClientBidiReactor() {}
187 
192  void StartCall() { stream_->StartCall(); }
193 
199  void StartRead(Response* resp) { stream_->Read(resp); }
200 
207  void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); }
208 
215  void StartWrite(const Request* req, WriteOptions options) {
216  stream_->Write(req, std::move(options));
217  }
218 
228  void StartWriteLast(const Request* req, WriteOptions options) {
229  StartWrite(req, std::move(options.set_last_message()));
230  }
231 
237  void StartWritesDone() { stream_->WritesDone(); }
238 
260  void AddHold() { AddMultipleHolds(1); }
261  void AddMultipleHolds(int holds) { stream_->AddHold(holds); }
262  void RemoveHold() { stream_->RemoveHold(); }
263 
268  virtual void OnDone(const Status& s) {}
269 
277  virtual void OnReadInitialMetadataDone(bool ok) {}
278 
283  virtual void OnReadDone(bool ok) {}
284 
289  virtual void OnWriteDone(bool ok) {}
290 
297  virtual void OnWritesDoneDone(bool ok) {}
298 
299  private:
300  friend class ClientCallbackReaderWriter<Request, Response>;
301  void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
302  stream_ = stream;
303  }
305 };
306 
309 template <class Response>
310 class ClientReadReactor {
311  public:
312  virtual ~ClientReadReactor() {}
313 
314  void StartCall() { reader_->StartCall(); }
315  void StartRead(Response* resp) { reader_->Read(resp); }
316 
317  void AddHold() { AddMultipleHolds(1); }
318  void AddMultipleHolds(int holds) { reader_->AddHold(holds); }
319  void RemoveHold() { reader_->RemoveHold(); }
320 
321  virtual void OnDone(const Status& s) {}
322  virtual void OnReadInitialMetadataDone(bool ok) {}
323  virtual void OnReadDone(bool ok) {}
324 
325  private:
326  friend class ClientCallbackReader<Response>;
327  void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
329 };
330 
333 template <class Request>
334 class ClientWriteReactor {
335  public:
336  virtual ~ClientWriteReactor() {}
337 
338  void StartCall() { writer_->StartCall(); }
339  void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); }
340  void StartWrite(const Request* req, WriteOptions options) {
341  writer_->Write(req, std::move(options));
342  }
343  void StartWriteLast(const Request* req, WriteOptions options) {
344  StartWrite(req, std::move(options.set_last_message()));
345  }
346  void StartWritesDone() { writer_->WritesDone(); }
347 
348  void AddHold() { AddMultipleHolds(1); }
349  void AddMultipleHolds(int holds) { writer_->AddHold(holds); }
350  void RemoveHold() { writer_->RemoveHold(); }
351 
352  virtual void OnDone(const Status& s) {}
353  virtual void OnReadInitialMetadataDone(bool ok) {}
354  virtual void OnWriteDone(bool ok) {}
355  virtual void OnWritesDoneDone(bool ok) {}
356 
357  private:
358  friend class ClientCallbackWriter<Request>;
359  void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
361 };
362 
375  public:
376  virtual ~ClientUnaryReactor() {}
377 
378  void StartCall() { call_->StartCall(); }
379  virtual void OnDone(const Status& s) {}
380  virtual void OnReadInitialMetadataDone(bool ok) {}
381 
382  private:
383  friend class ClientCallbackUnary;
384  void BindCall(ClientCallbackUnary* call) { call_ = call; }
385  ClientCallbackUnary* call_;
386 };
387 
388 // Define function out-of-line from class to avoid forward declaration issue
389 inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
390  reactor->BindCall(this);
391 }
392 
393 } // namespace experimental
394 
395 namespace internal {
396 
397 // Forward declare factory classes for friendship
398 template <class Request, class Response>
399 class ClientCallbackReaderWriterFactory;
400 template <class Response>
401 class ClientCallbackReaderFactory;
402 template <class Request>
403 class ClientCallbackWriterFactory;
404 
405 template <class Request, class Response>
408  Response> {
409  public:
410  // always allocated against a call arena, no memory free required
411  static void operator delete(void* ptr, std::size_t size) {
412  assert(size == sizeof(ClientCallbackReaderWriterImpl));
413  }
414 
415  // This operator should never be called as the memory should be freed as part
416  // of the arena destruction. It only exists to provide a matching operator
417  // delete to the operator new so that some compilers will not complain (see
418  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
419  // there are no tests catching the compiler warning.
420  static void operator delete(void*, void*) { assert(0); }
421 
422  void MaybeFinish() {
423  if (--callbacks_outstanding_ == 0) {
424  Status s = std::move(finish_status_);
425  auto* reactor = reactor_;
426  auto* call = call_.call();
429  reactor->OnDone(s);
430  }
431  }
432 
433  void StartCall() override {
434  // This call initiates two batches, plus any backlog, each with a callback
435  // 1. Send initial metadata (unless corked) + recv initial metadata
436  // 2. Any read backlog
437  // 3. Any write backlog
438  // 4. Recv trailing metadata, on_completion callback
439  started_ = true;
440 
441  start_tag_.Set(call_.call(),
442  [this](bool ok) {
443  reactor_->OnReadInitialMetadataDone(ok);
444  MaybeFinish();
445  },
446  &start_ops_);
447  if (!start_corked_) {
448  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
449  context_->initial_metadata_flags());
450  }
451  start_ops_.RecvInitialMetadata(context_);
452  start_ops_.set_core_cq_tag(&start_tag_);
453  call_.PerformOps(&start_ops_);
454 
455  // Also set up the read and write tags so that they don't have to be set up
456  // each time
457  write_tag_.Set(call_.call(),
458  [this](bool ok) {
459  reactor_->OnWriteDone(ok);
460  MaybeFinish();
461  },
462  &write_ops_);
463  write_ops_.set_core_cq_tag(&write_tag_);
464 
465  read_tag_.Set(call_.call(),
466  [this](bool ok) {
467  reactor_->OnReadDone(ok);
468  MaybeFinish();
469  },
470  &read_ops_);
471  read_ops_.set_core_cq_tag(&read_tag_);
472  if (read_ops_at_start_) {
473  call_.PerformOps(&read_ops_);
474  }
475 
476  if (write_ops_at_start_) {
477  call_.PerformOps(&write_ops_);
478  }
479 
480  if (writes_done_ops_at_start_) {
481  call_.PerformOps(&writes_done_ops_);
482  }
483 
484  finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
485  &finish_ops_);
486  finish_ops_.ClientRecvStatus(context_, &finish_status_);
487  finish_ops_.set_core_cq_tag(&finish_tag_);
488  call_.PerformOps(&finish_ops_);
489  }
490 
491  void Read(Response* msg) override {
492  read_ops_.RecvMessage(msg);
493  callbacks_outstanding_++;
494  if (started_) {
495  call_.PerformOps(&read_ops_);
496  } else {
497  read_ops_at_start_ = true;
498  }
499  }
500 
501  void Write(const Request* msg, WriteOptions options) override {
502  if (start_corked_) {
503  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
504  context_->initial_metadata_flags());
505  start_corked_ = false;
506  }
507 
508  if (options.is_last_message()) {
509  options.set_buffer_hint();
510  write_ops_.ClientSendClose();
511  }
512  // TODO(vjpai): don't assert
513  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
514  callbacks_outstanding_++;
515  if (started_) {
516  call_.PerformOps(&write_ops_);
517  } else {
518  write_ops_at_start_ = true;
519  }
520  }
521  void WritesDone() override {
522  if (start_corked_) {
523  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
524  context_->initial_metadata_flags());
525  start_corked_ = false;
526  }
527  writes_done_ops_.ClientSendClose();
528  writes_done_tag_.Set(call_.call(),
529  [this](bool ok) {
530  reactor_->OnWritesDoneDone(ok);
531  MaybeFinish();
532  },
533  &writes_done_ops_);
534  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
535  callbacks_outstanding_++;
536  if (started_) {
537  call_.PerformOps(&writes_done_ops_);
538  } else {
539  writes_done_ops_at_start_ = true;
540  }
541  }
542 
543  virtual void AddHold(int holds) override { callbacks_outstanding_ += holds; }
544  virtual void RemoveHold() override { MaybeFinish(); }
545 
546  private:
547  friend class ClientCallbackReaderWriterFactory<Request, Response>;
548 
550  Call call, ClientContext* context,
552  : context_(context),
553  call_(call),
554  reactor_(reactor),
555  start_corked_(context_->initial_metadata_corked_) {
556  this->BindReactor(reactor);
557  }
558 
559  ClientContext* const context_;
560  Call call_;
562 
564  CallbackWithSuccessTag start_tag_;
565  bool start_corked_;
566 
568  CallbackWithSuccessTag finish_tag_;
569  Status finish_status_;
570 
572  write_ops_;
573  CallbackWithSuccessTag write_tag_;
574  bool write_ops_at_start_{false};
575 
577  CallbackWithSuccessTag writes_done_tag_;
578  bool writes_done_ops_at_start_{false};
579 
581  CallbackWithSuccessTag read_tag_;
582  bool read_ops_at_start_{false};
583 
584  // Minimum of 2 callbacks to pre-register for start and finish
585  std::atomic_int callbacks_outstanding_{2};
586  bool started_{false};
587 };
588 
589 template <class Request, class Response>
591  public:
592  static void Create(
593  ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
594  ClientContext* context,
596  Call call = channel->CreateCall(method, context, channel->CallbackCQ());
597 
602  reactor);
603  }
604 };
605 
606 template <class Response>
608  : public ::grpc::experimental::ClientCallbackReader<Response> {
609  public:
610  // always allocated against a call arena, no memory free required
611  static void operator delete(void* ptr, std::size_t size) {
612  assert(size == sizeof(ClientCallbackReaderImpl));
613  }
614 
615  // This operator should never be called as the memory should be freed as part
616  // of the arena destruction. It only exists to provide a matching operator
617  // delete to the operator new so that some compilers will not complain (see
618  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
619  // there are no tests catching the compiler warning.
620  static void operator delete(void*, void*) { assert(0); }
621 
622  void MaybeFinish() {
623  if (--callbacks_outstanding_ == 0) {
624  Status s = std::move(finish_status_);
625  auto* reactor = reactor_;
626  auto* call = call_.call();
627  this->~ClientCallbackReaderImpl();
629  reactor->OnDone(s);
630  }
631  }
632 
633  void StartCall() override {
634  // This call initiates two batches, plus any backlog, each with a callback
635  // 1. Send initial metadata (unless corked) + recv initial metadata
636  // 2. Any backlog
637  // 3. Recv trailing metadata, on_completion callback
638  started_ = true;
639 
640  start_tag_.Set(call_.call(),
641  [this](bool ok) {
642  reactor_->OnReadInitialMetadataDone(ok);
643  MaybeFinish();
644  },
645  &start_ops_);
646  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
647  context_->initial_metadata_flags());
648  start_ops_.RecvInitialMetadata(context_);
649  start_ops_.set_core_cq_tag(&start_tag_);
650  call_.PerformOps(&start_ops_);
651 
652  // Also set up the read tag so it doesn't have to be set up each time
653  read_tag_.Set(call_.call(),
654  [this](bool ok) {
655  reactor_->OnReadDone(ok);
656  MaybeFinish();
657  },
658  &read_ops_);
659  read_ops_.set_core_cq_tag(&read_tag_);
660  if (read_ops_at_start_) {
661  call_.PerformOps(&read_ops_);
662  }
663 
664  finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
665  &finish_ops_);
666  finish_ops_.ClientRecvStatus(context_, &finish_status_);
667  finish_ops_.set_core_cq_tag(&finish_tag_);
668  call_.PerformOps(&finish_ops_);
669  }
670 
671  void Read(Response* msg) override {
672  read_ops_.RecvMessage(msg);
673  callbacks_outstanding_++;
674  if (started_) {
675  call_.PerformOps(&read_ops_);
676  } else {
677  read_ops_at_start_ = true;
678  }
679  }
680 
681  virtual void AddHold(int holds) override { callbacks_outstanding_ += holds; }
682  virtual void RemoveHold() override { MaybeFinish(); }
683 
684  private:
685  friend class ClientCallbackReaderFactory<Response>;
686 
687  template <class Request>
689  Call call, ClientContext* context, Request* request,
691  : context_(context), call_(call), reactor_(reactor) {
692  this->BindReactor(reactor);
693  // TODO(vjpai): don't assert
694  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
695  start_ops_.ClientSendClose();
696  }
697 
698  ClientContext* const context_;
699  Call call_;
701 
704  start_ops_;
705  CallbackWithSuccessTag start_tag_;
706 
708  CallbackWithSuccessTag finish_tag_;
709  Status finish_status_;
710 
712  CallbackWithSuccessTag read_tag_;
713  bool read_ops_at_start_{false};
714 
715  // Minimum of 2 callbacks to pre-register for start and finish
716  std::atomic_int callbacks_outstanding_{2};
717  bool started_{false};
718 };
719 
720 template <class Response>
722  public:
723  template <class Request>
724  static void Create(
725  ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
726  ClientContext* context, const Request* request,
728  Call call = channel->CreateCall(method, context, channel->CallbackCQ());
729 
732  call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
733  ClientCallbackReaderImpl<Response>(call, context, request, reactor);
734  }
735 };
736 
737 template <class Request>
740  public:
741  // always allocated against a call arena, no memory free required
742  static void operator delete(void* ptr, std::size_t size) {
743  assert(size == sizeof(ClientCallbackWriterImpl));
744  }
745 
746  // This operator should never be called as the memory should be freed as part
747  // of the arena destruction. It only exists to provide a matching operator
748  // delete to the operator new so that some compilers will not complain (see
749  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
750  // there are no tests catching the compiler warning.
751  static void operator delete(void*, void*) { assert(0); }
752 
753  void MaybeFinish() {
754  if (--callbacks_outstanding_ == 0) {
755  Status s = std::move(finish_status_);
756  auto* reactor = reactor_;
757  auto* call = call_.call();
758  this->~ClientCallbackWriterImpl();
760  reactor->OnDone(s);
761  }
762  }
763 
764  void StartCall() override {
765  // This call initiates two batches, plus any backlog, each with a callback
766  // 1. Send initial metadata (unless corked) + recv initial metadata
767  // 2. Any backlog
768  // 3. Recv trailing metadata, on_completion callback
769  started_ = true;
770 
771  start_tag_.Set(call_.call(),
772  [this](bool ok) {
773  reactor_->OnReadInitialMetadataDone(ok);
774  MaybeFinish();
775  },
776  &start_ops_);
777  if (!start_corked_) {
778  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
779  context_->initial_metadata_flags());
780  }
781  start_ops_.RecvInitialMetadata(context_);
782  start_ops_.set_core_cq_tag(&start_tag_);
783  call_.PerformOps(&start_ops_);
784 
785  // Also set up the read and write tags so that they don't have to be set up
786  // each time
787  write_tag_.Set(call_.call(),
788  [this](bool ok) {
789  reactor_->OnWriteDone(ok);
790  MaybeFinish();
791  },
792  &write_ops_);
793  write_ops_.set_core_cq_tag(&write_tag_);
794 
795  if (write_ops_at_start_) {
796  call_.PerformOps(&write_ops_);
797  }
798 
799  if (writes_done_ops_at_start_) {
800  call_.PerformOps(&writes_done_ops_);
801  }
802 
803  finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
804  &finish_ops_);
805  finish_ops_.ClientRecvStatus(context_, &finish_status_);
806  finish_ops_.set_core_cq_tag(&finish_tag_);
807  call_.PerformOps(&finish_ops_);
808  }
809 
810  void Write(const Request* msg, WriteOptions options) override {
811  if (start_corked_) {
812  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
813  context_->initial_metadata_flags());
814  start_corked_ = false;
815  }
816 
817  if (options.is_last_message()) {
818  options.set_buffer_hint();
819  write_ops_.ClientSendClose();
820  }
821  // TODO(vjpai): don't assert
822  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
823  callbacks_outstanding_++;
824  if (started_) {
825  call_.PerformOps(&write_ops_);
826  } else {
827  write_ops_at_start_ = true;
828  }
829  }
830  void WritesDone() override {
831  if (start_corked_) {
832  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
833  context_->initial_metadata_flags());
834  start_corked_ = false;
835  }
836  writes_done_ops_.ClientSendClose();
837  writes_done_tag_.Set(call_.call(),
838  [this](bool ok) {
839  reactor_->OnWritesDoneDone(ok);
840  MaybeFinish();
841  },
842  &writes_done_ops_);
843  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
844  callbacks_outstanding_++;
845  if (started_) {
846  call_.PerformOps(&writes_done_ops_);
847  } else {
848  writes_done_ops_at_start_ = true;
849  }
850  }
851 
852  virtual void AddHold(int holds) override { callbacks_outstanding_ += holds; }
853  virtual void RemoveHold() override { MaybeFinish(); }
854 
855  private:
856  friend class ClientCallbackWriterFactory<Request>;
857 
858  template <class Response>
860  Call call, ClientContext* context, Response* response,
862  : context_(context),
863  call_(call),
864  reactor_(reactor),
865  start_corked_(context_->initial_metadata_corked_) {
866  this->BindReactor(reactor);
867  finish_ops_.RecvMessage(response);
868  finish_ops_.AllowNoMessage();
869  }
870 
871  ClientContext* const context_;
872  Call call_;
874 
876  CallbackWithSuccessTag start_tag_;
877  bool start_corked_;
878 
880  CallbackWithSuccessTag finish_tag_;
881  Status finish_status_;
882 
884  write_ops_;
885  CallbackWithSuccessTag write_tag_;
886  bool write_ops_at_start_{false};
887 
889  CallbackWithSuccessTag writes_done_tag_;
890  bool writes_done_ops_at_start_{false};
891 
892  // Minimum of 2 callbacks to pre-register for start and finish
893  std::atomic_int callbacks_outstanding_{2};
894  bool started_{false};
895 };
896 
897 template <class Request>
899  public:
900  template <class Response>
901  static void Create(
902  ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
903  ClientContext* context, Response* response,
905  Call call = channel->CreateCall(method, context, channel->CallbackCQ());
906 
909  call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
910  ClientCallbackWriterImpl<Request>(call, context, response, reactor);
911  }
912 };
913 
916  public:
917  // always allocated against a call arena, no memory free required
918  static void operator delete(void* ptr, std::size_t size) {
919  assert(size == sizeof(ClientCallbackUnaryImpl));
920  }
921 
922  // This operator should never be called as the memory should be freed as part
923  // of the arena destruction. It only exists to provide a matching operator
924  // delete to the operator new so that some compilers will not complain (see
925  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
926  // there are no tests catching the compiler warning.
927  static void operator delete(void*, void*) { assert(0); }
928 
929  void StartCall() override {
930  // This call initiates two batches, each with a callback
931  // 1. Send initial metadata + write + writes done + recv initial metadata
932  // 2. Read message, recv trailing metadata
933  started_ = true;
934 
935  start_tag_.Set(call_.call(),
936  [this](bool ok) {
937  reactor_->OnReadInitialMetadataDone(ok);
938  MaybeFinish();
939  },
940  &start_ops_);
941  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
942  context_->initial_metadata_flags());
943  start_ops_.RecvInitialMetadata(context_);
944  start_ops_.set_core_cq_tag(&start_tag_);
945  call_.PerformOps(&start_ops_);
946 
947  finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
948  &finish_ops_);
949  finish_ops_.ClientRecvStatus(context_, &finish_status_);
950  finish_ops_.set_core_cq_tag(&finish_tag_);
951  call_.PerformOps(&finish_ops_);
952  }
953 
954  void MaybeFinish() {
955  if (--callbacks_outstanding_ == 0) {
956  Status s = std::move(finish_status_);
957  auto* reactor = reactor_;
958  auto* call = call_.call();
959  this->~ClientCallbackUnaryImpl();
961  reactor->OnDone(s);
962  }
963  }
964 
965  private:
967 
968  template <class Request, class Response>
969  ClientCallbackUnaryImpl(Call call, ClientContext* context, Request* request,
970  Response* response,
972  : context_(context), call_(call), reactor_(reactor) {
973  this->BindReactor(reactor);
974  // TODO(vjpai): don't assert
975  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
976  start_ops_.ClientSendClose();
977  finish_ops_.RecvMessage(response);
978  finish_ops_.AllowNoMessage();
979  }
980 
981  ClientContext* const context_;
982  Call call_;
984 
987  start_ops_;
988  CallbackWithSuccessTag start_tag_;
989 
991  CallbackWithSuccessTag finish_tag_;
992  Status finish_status_;
993 
994  // This call will have 2 callbacks: start and finish
995  std::atomic_int callbacks_outstanding_{2};
996  bool started_{false};
997 };
998 
1000  public:
1001  template <class Request, class Response>
1002  static void Create(ChannelInterface* channel,
1003  const ::grpc::internal::RpcMethod& method,
1004  ClientContext* context, const Request* request,
1005  Response* response,
1007  Call call = channel->CreateCall(method, context, channel->CallbackCQ());
1008 
1010 
1012  call.call(), sizeof(ClientCallbackUnaryImpl)))
1013  ClientCallbackUnaryImpl(call, context, request, response, reactor);
1014  }
1015 };
1016 
1017 } // namespace internal
1018 } // namespace grpc
1019 
1020 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
virtual void AddHold(int holds) override
Definition: client_callback.h:681
virtual ~ClientCallbackUnary()
Definition: client_callback.h:165
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:113
void StartCall() override
Definition: client_callback.h:633
virtual ~ClientWriteReactor()
Definition: client_callback.h:336
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:353
Definition: channel_interface.h:59
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:145
virtual void grpc_call_ref(grpc_call *call)=0
Definition: client_callback.h:914
virtual ~ClientCallbackReader()
Definition: client_callback.h:130
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback.h:260
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
virtual void RemoveHold() override
Definition: client_callback.h:544
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback.h:102
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, Response *response, ::grpc::experimental::ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:901
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:826
virtual ~ClientReadReactor()
Definition: client_callback.h:312
virtual void grpc_call_unref(grpc_call *call)=0
void WritesDone() override
Definition: client_callback.h:830
void StartRead(Response *resp)
Definition: client_callback.h:315
CallbackUnaryCallImpl(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(Status)> on_completion)
Definition: client_callback.h:57
Definition: client_callback.h:111
Definition: channel_interface.h:63
void Write(const Request *req)
Definition: client_callback.h:147
void StartCall() override
Definition: client_callback.h:433
void Read(Response *msg) override
Definition: client_callback.h:491
Definition: channel_interface.h:49
void RemoveHold()
Definition: client_callback.h:262
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, ::grpc::experimental::ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:592
grpc_call * call() const
Definition: call.h:72
void WriteLast(const Request *req, WriteOptions options)
Definition: client_callback.h:149
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback.h:237
virtual void OnWriteDone(bool ok)
Definition: client_callback.h:354
virtual void OnDone(const Status &s)
Definition: client_callback.h:352
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:179
void Read(Response *msg) override
Definition: client_callback.h:671
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, const Request *request, ::grpc::experimental::ClientReadReactor< Response > *reactor)
Definition: client_callback.h:724
virtual ~ClientCallbackWriter()
Definition: client_callback.h:145
virtual void RemoveHold() override
Definition: client_callback.h:853
virtual void OnDone(const Status &s)
Definition: client_callback.h:321
Descriptor of an RPC method.
Definition: rpc_method.h:29
void StartWrite(const Request *req, WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback.h:215
Definition: client_callback.h:163
void AddMultipleHolds(int holds)
Definition: client_callback.h:261
virtual void AddHold(int holds) override
Definition: client_callback.h:852
::grpc_impl::Channel Channel
Definition: channel.h:26
Definition: client_callback.h:128
Definition: call_op_set.h:223
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback.h:207
void Write(const Request *msg, WriteOptions options) override
Definition: client_callback.h:810
Definition: call_op_set.h:701
Definition: client_callback.h:999
virtual void OnReadDone(bool ok)
Definition: client_callback.h:323
void StartWriteLast(const Request *req, WriteOptions options)
Definition: client_callback.h:343
Definition: call_op_set.h:293
virtual ~ClientBidiReactor()
Definition: client_callback.h:186
void StartWrite(const Request *req, WriteOptions options)
Definition: client_callback.h:340
virtual ~ClientUnaryReactor()
Definition: client_callback.h:376
void WritesDone() override
Definition: client_callback.h:521
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback.h:199
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback.h:104
virtual void OnWritesDoneDone(bool ok)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback.h:297
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start&#39;ed before this call...
Definition: client_callback.h:192
Codegen interface for grpc::Channel.
Definition: channel_interface.h:69
Definition: client_callback.h:406
virtual void AddHold(int holds) override
Definition: client_callback.h:543
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:51
virtual void RemoveHold() override
Definition: client_callback.h:682
void MaybeFinish()
Definition: client_callback.h:422
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback.h:100
void AddMultipleHolds(int holds)
Definition: client_callback.h:349
virtual void OnWritesDoneDone(bool ok)
Definition: client_callback.h:355
Definition: byte_buffer.h:41
Per-message write options.
Definition: call_op_set.h:85
Definition: client_callback.h:738
void MaybeFinish()
Definition: client_callback.h:753
Definition: channel_interface.h:61
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback.h:374
void StartWritesDone()
Definition: client_callback.h:346
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
Definition: call_op_set.h:599
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
virtual void OnDone(const Status &s)
Definition: client_callback.h:379
void Write(const Request *msg, WriteOptions options) override
Definition: client_callback.h:501
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
bool ok() const
Is the status OK?
Definition: status.h:118
void StartCall() override
Definition: client_callback.h:764
Definition: client_callback.h:143
void StartCall()
Definition: client_callback.h:314
virtual void OnDone(const Status &s)
Notifies the application that all operations associated with this RPC have completed and provides the...
Definition: client_callback.h:268
void AddMultipleHolds(int holds)
Definition: client_callback.h:318
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue_impl.h:103
Did it work? If it didn&#39;t, why?
Definition: status.h:31
virtual void OnReadDone(bool ok)
Notifies the application that a StartRead operation completed.
Definition: client_callback.h:283
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:122
Definition: callback_common.h:68
virtual void OnWriteDone(bool ok)
Notifies the application that a StartWrite operation completed.
Definition: client_callback.h:289
void AddHold()
Definition: client_callback.h:348
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
Definition: client_callback.h:607
void RemoveHold()
Definition: client_callback.h:350
Definition: call_op_set.h:749
void StartCall()
Definition: client_callback.h:378
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:137
void AddHold()
Definition: client_callback.h:317
void StartWriteLast(const Request *req, WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback.h:228
void MaybeFinish()
Definition: client_callback.h:622
virtual void OnReadInitialMetadataDone(bool ok)
Notifies the application that a read of initial metadata from the server is done. ...
Definition: client_callback.h:277
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:158
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:380
void RemoveHold()
Definition: client_callback.h:319
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:322
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, const Request *request, Response *response, ::grpc::experimental::ClientUnaryReactor *reactor)
Definition: client_callback.h:1002
void StartCall() override
Definition: client_callback.h:929
void StartWrite(const Request *req)
Definition: client_callback.h:339
Straightforward wrapping of the C call object.
Definition: call.h:38
void MaybeFinish()
Definition: client_callback.h:954
void StartCall()
Definition: client_callback.h:338
void CallbackUnaryCall(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(Status)> on_completion)
Perform a callback-based unary call TODO(vjpai): Combine as much as possible with the blocking unary ...
Definition: client_callback.h:46