GRPC C++  1.22.0
client_callback.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
20 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
21 
22 #include <functional>
23 
31 
32 namespace grpc_impl {
33 class Channel;
34 class ClientContext;
35 } // namespace grpc_impl
36 
37 namespace grpc {
38 
39 namespace internal {
40 class RpcMethod;
41 
44 template <class InputMessage, class OutputMessage>
45 void CallbackUnaryCall(ChannelInterface* channel, const RpcMethod& method,
46  ClientContext* context, const InputMessage* request,
47  OutputMessage* result,
48  std::function<void(Status)> on_completion) {
50  channel, method, context, request, result, on_completion);
51 }
52 
53 template <class InputMessage, class OutputMessage>
55  public:
57  ClientContext* context, const InputMessage* request,
58  OutputMessage* result,
59  std::function<void(Status)> on_completion) {
60  CompletionQueue* cq = channel->CallbackCQ();
61  GPR_CODEGEN_ASSERT(cq != nullptr);
62  Call call(channel->CreateCall(method, context, cq));
63 
64  using FullCallOpSet =
68 
70  call.call(), sizeof(FullCallOpSet))) FullCallOpSet;
71 
73  call.call(), sizeof(CallbackWithStatusTag)))
74  CallbackWithStatusTag(call.call(), on_completion, ops);
75 
76  // TODO(vjpai): Unify code with sync API as much as possible
77  Status s = ops->SendMessagePtr(request);
78  if (!s.ok()) {
79  tag->force_run(s);
80  return;
81  }
82  ops->SendInitialMetadata(&context->send_initial_metadata_,
83  context->initial_metadata_flags());
84  ops->RecvInitialMetadata(context);
85  ops->RecvMessage(result);
86  ops->AllowNoMessage();
87  ops->ClientSendClose();
88  ops->ClientRecvStatus(context, tag->status_ptr());
89  ops->set_core_cq_tag(tag);
90  call.PerformOps(ops);
91  }
92 };
93 } // namespace internal
94 
95 namespace experimental {
96 
97 // Forward declarations
98 template <class Request, class Response>
100 template <class Response>
102 template <class Request>
104 class ClientUnaryReactor;
105 
106 // NOTE: The streaming objects are not actually implemented in the public API.
107 // These interfaces are provided for mocking only. Typical applications
108 // will interact exclusively with the reactors that they define.
109 template <class Request, class Response>
111  public:
113  virtual void StartCall() = 0;
114  virtual void Write(const Request* req, WriteOptions options) = 0;
115  virtual void WritesDone() = 0;
116  virtual void Read(Response* resp) = 0;
117  virtual void AddHold(int holds) = 0;
118  virtual void RemoveHold() = 0;
119 
120  protected:
122  reactor->BindStream(this);
123  }
124 };
125 
126 template <class Response>
128  public:
130  virtual void StartCall() = 0;
131  virtual void Read(Response* resp) = 0;
132  virtual void AddHold(int holds) = 0;
133  virtual void RemoveHold() = 0;
134 
135  protected:
137  reactor->BindReader(this);
138  }
139 };
140 
141 template <class Request>
143  public:
145  virtual void StartCall() = 0;
146  void Write(const Request* req) { Write(req, WriteOptions()); }
147  virtual void Write(const Request* req, WriteOptions options) = 0;
148  void WriteLast(const Request* req, WriteOptions options) {
149  Write(req, options.set_last_message());
150  }
151  virtual void WritesDone() = 0;
152 
153  virtual void AddHold(int holds) = 0;
154  virtual void RemoveHold() = 0;
155 
156  protected:
158  reactor->BindWriter(this);
159  }
160 };
161 
163  public:
164  virtual ~ClientCallbackUnary() {}
165  virtual void StartCall() = 0;
166 
167  protected:
168  void BindReactor(ClientUnaryReactor* reactor);
169 };
170 
171 // The following classes are the reactor interfaces that are to be implemented
172 // by the user. They are passed in to the library as an argument to a call on a
173 // stub (either a codegen-ed call or a generic call). The streaming RPC is
174 // activated by calling StartCall, possibly after initiating StartRead,
175 // StartWrite, or AddHold operations on the streaming object. Note that none of
176 // the classes are pure; all reactions have a default empty reaction so that the
177 // user class only needs to override those classes that it cares about.
178 // The reactor must be passed to the stub invocation before any of the below
179 // operations can be called.
180 
182 template <class Request, class Response>
183 class ClientBidiReactor {
184  public:
185  virtual ~ClientBidiReactor() {}
186 
191  void StartCall() { stream_->StartCall(); }
192 
198  void StartRead(Response* resp) { stream_->Read(resp); }
199 
206  void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); }
207 
214  void StartWrite(const Request* req, WriteOptions options) {
215  stream_->Write(req, std::move(options));
216  }
217 
227  void StartWriteLast(const Request* req, WriteOptions options) {
228  StartWrite(req, std::move(options.set_last_message()));
229  }
230 
236  void StartWritesDone() { stream_->WritesDone(); }
237 
259  void AddHold() { AddMultipleHolds(1); }
260  void AddMultipleHolds(int holds) { stream_->AddHold(holds); }
261  void RemoveHold() { stream_->RemoveHold(); }
262 
267  virtual void OnDone(const Status& s) {}
268 
276  virtual void OnReadInitialMetadataDone(bool ok) {}
277 
282  virtual void OnReadDone(bool ok) {}
283 
288  virtual void OnWriteDone(bool ok) {}
289 
296  virtual void OnWritesDoneDone(bool ok) {}
297 
298  private:
299  friend class ClientCallbackReaderWriter<Request, Response>;
300  void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
301  stream_ = stream;
302  }
304 };
305 
308 template <class Response>
309 class ClientReadReactor {
310  public:
311  virtual ~ClientReadReactor() {}
312 
313  void StartCall() { reader_->StartCall(); }
314  void StartRead(Response* resp) { reader_->Read(resp); }
315 
316  void AddHold() { AddMultipleHolds(1); }
317  void AddMultipleHolds(int holds) { reader_->AddHold(holds); }
318  void RemoveHold() { reader_->RemoveHold(); }
319 
320  virtual void OnDone(const Status& s) {}
321  virtual void OnReadInitialMetadataDone(bool ok) {}
322  virtual void OnReadDone(bool ok) {}
323 
324  private:
325  friend class ClientCallbackReader<Response>;
326  void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
328 };
329 
332 template <class Request>
333 class ClientWriteReactor {
334  public:
335  virtual ~ClientWriteReactor() {}
336 
337  void StartCall() { writer_->StartCall(); }
338  void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); }
339  void StartWrite(const Request* req, WriteOptions options) {
340  writer_->Write(req, std::move(options));
341  }
342  void StartWriteLast(const Request* req, WriteOptions options) {
343  StartWrite(req, std::move(options.set_last_message()));
344  }
345  void StartWritesDone() { writer_->WritesDone(); }
346 
347  void AddHold() { AddMultipleHolds(1); }
348  void AddMultipleHolds(int holds) { writer_->AddHold(holds); }
349  void RemoveHold() { writer_->RemoveHold(); }
350 
351  virtual void OnDone(const Status& s) {}
352  virtual void OnReadInitialMetadataDone(bool ok) {}
353  virtual void OnWriteDone(bool ok) {}
354  virtual void OnWritesDoneDone(bool ok) {}
355 
356  private:
357  friend class ClientCallbackWriter<Request>;
358  void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
360 };
361 
374  public:
375  virtual ~ClientUnaryReactor() {}
376 
377  void StartCall() { call_->StartCall(); }
378  virtual void OnDone(const Status& s) {}
379  virtual void OnReadInitialMetadataDone(bool ok) {}
380 
381  private:
382  friend class ClientCallbackUnary;
383  void BindCall(ClientCallbackUnary* call) { call_ = call; }
384  ClientCallbackUnary* call_;
385 };
386 
387 // Define function out-of-line from class to avoid forward declaration issue
388 inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
389  reactor->BindCall(this);
390 }
391 
392 } // namespace experimental
393 
394 namespace internal {
395 
396 // Forward declare factory classes for friendship
397 template <class Request, class Response>
398 class ClientCallbackReaderWriterFactory;
399 template <class Response>
400 class ClientCallbackReaderFactory;
401 template <class Request>
402 class ClientCallbackWriterFactory;
403 
404 template <class Request, class Response>
407  Response> {
408  public:
409  // always allocated against a call arena, no memory free required
410  static void operator delete(void* ptr, std::size_t size) {
411  assert(size == sizeof(ClientCallbackReaderWriterImpl));
412  }
413 
414  // This operator should never be called as the memory should be freed as part
415  // of the arena destruction. It only exists to provide a matching operator
416  // delete to the operator new so that some compilers will not complain (see
417  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
418  // there are no tests catching the compiler warning.
419  static void operator delete(void*, void*) { assert(0); }
420 
421  void MaybeFinish() {
422  if (--callbacks_outstanding_ == 0) {
423  Status s = std::move(finish_status_);
424  auto* reactor = reactor_;
425  auto* call = call_.call();
428  reactor->OnDone(s);
429  }
430  }
431 
432  void StartCall() override {
433  // This call initiates two batches, plus any backlog, each with a callback
434  // 1. Send initial metadata (unless corked) + recv initial metadata
435  // 2. Any read backlog
436  // 3. Any write backlog
437  // 4. Recv trailing metadata, on_completion callback
438  started_ = true;
439 
440  start_tag_.Set(call_.call(),
441  [this](bool ok) {
442  reactor_->OnReadInitialMetadataDone(ok);
443  MaybeFinish();
444  },
445  &start_ops_);
446  if (!start_corked_) {
447  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
448  context_->initial_metadata_flags());
449  }
450  start_ops_.RecvInitialMetadata(context_);
451  start_ops_.set_core_cq_tag(&start_tag_);
452  call_.PerformOps(&start_ops_);
453 
454  // Also set up the read and write tags so that they don't have to be set up
455  // each time
456  write_tag_.Set(call_.call(),
457  [this](bool ok) {
458  reactor_->OnWriteDone(ok);
459  MaybeFinish();
460  },
461  &write_ops_);
462  write_ops_.set_core_cq_tag(&write_tag_);
463 
464  read_tag_.Set(call_.call(),
465  [this](bool ok) {
466  reactor_->OnReadDone(ok);
467  MaybeFinish();
468  },
469  &read_ops_);
470  read_ops_.set_core_cq_tag(&read_tag_);
471  if (read_ops_at_start_) {
472  call_.PerformOps(&read_ops_);
473  }
474 
475  if (write_ops_at_start_) {
476  call_.PerformOps(&write_ops_);
477  }
478 
479  if (writes_done_ops_at_start_) {
480  call_.PerformOps(&writes_done_ops_);
481  }
482 
483  finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
484  &finish_ops_);
485  finish_ops_.ClientRecvStatus(context_, &finish_status_);
486  finish_ops_.set_core_cq_tag(&finish_tag_);
487  call_.PerformOps(&finish_ops_);
488  }
489 
490  void Read(Response* msg) override {
491  read_ops_.RecvMessage(msg);
492  callbacks_outstanding_++;
493  if (started_) {
494  call_.PerformOps(&read_ops_);
495  } else {
496  read_ops_at_start_ = true;
497  }
498  }
499 
500  void Write(const Request* msg, WriteOptions options) override {
501  if (start_corked_) {
502  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
503  context_->initial_metadata_flags());
504  start_corked_ = false;
505  }
506 
507  if (options.is_last_message()) {
508  options.set_buffer_hint();
509  write_ops_.ClientSendClose();
510  }
511  // TODO(vjpai): don't assert
512  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
513  callbacks_outstanding_++;
514  if (started_) {
515  call_.PerformOps(&write_ops_);
516  } else {
517  write_ops_at_start_ = true;
518  }
519  }
520  void WritesDone() override {
521  if (start_corked_) {
522  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
523  context_->initial_metadata_flags());
524  start_corked_ = false;
525  }
526  writes_done_ops_.ClientSendClose();
527  writes_done_tag_.Set(call_.call(),
528  [this](bool ok) {
529  reactor_->OnWritesDoneDone(ok);
530  MaybeFinish();
531  },
532  &writes_done_ops_);
533  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
534  callbacks_outstanding_++;
535  if (started_) {
536  call_.PerformOps(&writes_done_ops_);
537  } else {
538  writes_done_ops_at_start_ = true;
539  }
540  }
541 
542  virtual void AddHold(int holds) override { callbacks_outstanding_ += holds; }
543  virtual void RemoveHold() override { MaybeFinish(); }
544 
545  private:
546  friend class ClientCallbackReaderWriterFactory<Request, Response>;
547 
549  Call call, ClientContext* context,
551  : context_(context),
552  call_(call),
553  reactor_(reactor),
554  start_corked_(context_->initial_metadata_corked_) {
555  this->BindReactor(reactor);
556  }
557 
558  ClientContext* const context_;
559  Call call_;
561 
563  CallbackWithSuccessTag start_tag_;
564  bool start_corked_;
565 
567  CallbackWithSuccessTag finish_tag_;
568  Status finish_status_;
569 
571  write_ops_;
572  CallbackWithSuccessTag write_tag_;
573  bool write_ops_at_start_{false};
574 
576  CallbackWithSuccessTag writes_done_tag_;
577  bool writes_done_ops_at_start_{false};
578 
580  CallbackWithSuccessTag read_tag_;
581  bool read_ops_at_start_{false};
582 
583  // Minimum of 2 callbacks to pre-register for start and finish
584  std::atomic_int callbacks_outstanding_{2};
585  bool started_{false};
586 };
587 
588 template <class Request, class Response>
590  public:
591  static void Create(
592  ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
593  ClientContext* context,
595  Call call = channel->CreateCall(method, context, channel->CallbackCQ());
596 
601  reactor);
602  }
603 };
604 
605 template <class Response>
607  : public ::grpc::experimental::ClientCallbackReader<Response> {
608  public:
609  // always allocated against a call arena, no memory free required
610  static void operator delete(void* ptr, std::size_t size) {
611  assert(size == sizeof(ClientCallbackReaderImpl));
612  }
613 
614  // This operator should never be called as the memory should be freed as part
615  // of the arena destruction. It only exists to provide a matching operator
616  // delete to the operator new so that some compilers will not complain (see
617  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
618  // there are no tests catching the compiler warning.
619  static void operator delete(void*, void*) { assert(0); }
620 
621  void MaybeFinish() {
622  if (--callbacks_outstanding_ == 0) {
623  Status s = std::move(finish_status_);
624  auto* reactor = reactor_;
625  auto* call = call_.call();
626  this->~ClientCallbackReaderImpl();
628  reactor->OnDone(s);
629  }
630  }
631 
632  void StartCall() override {
633  // This call initiates two batches, plus any backlog, each with a callback
634  // 1. Send initial metadata (unless corked) + recv initial metadata
635  // 2. Any backlog
636  // 3. Recv trailing metadata, on_completion callback
637  started_ = true;
638 
639  start_tag_.Set(call_.call(),
640  [this](bool ok) {
641  reactor_->OnReadInitialMetadataDone(ok);
642  MaybeFinish();
643  },
644  &start_ops_);
645  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
646  context_->initial_metadata_flags());
647  start_ops_.RecvInitialMetadata(context_);
648  start_ops_.set_core_cq_tag(&start_tag_);
649  call_.PerformOps(&start_ops_);
650 
651  // Also set up the read tag so it doesn't have to be set up each time
652  read_tag_.Set(call_.call(),
653  [this](bool ok) {
654  reactor_->OnReadDone(ok);
655  MaybeFinish();
656  },
657  &read_ops_);
658  read_ops_.set_core_cq_tag(&read_tag_);
659  if (read_ops_at_start_) {
660  call_.PerformOps(&read_ops_);
661  }
662 
663  finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
664  &finish_ops_);
665  finish_ops_.ClientRecvStatus(context_, &finish_status_);
666  finish_ops_.set_core_cq_tag(&finish_tag_);
667  call_.PerformOps(&finish_ops_);
668  }
669 
670  void Read(Response* msg) override {
671  read_ops_.RecvMessage(msg);
672  callbacks_outstanding_++;
673  if (started_) {
674  call_.PerformOps(&read_ops_);
675  } else {
676  read_ops_at_start_ = true;
677  }
678  }
679 
680  virtual void AddHold(int holds) override { callbacks_outstanding_ += holds; }
681  virtual void RemoveHold() override { MaybeFinish(); }
682 
683  private:
684  friend class ClientCallbackReaderFactory<Response>;
685 
686  template <class Request>
688  Call call, ClientContext* context, Request* request,
690  : context_(context), call_(call), reactor_(reactor) {
691  this->BindReactor(reactor);
692  // TODO(vjpai): don't assert
693  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
694  start_ops_.ClientSendClose();
695  }
696 
697  ClientContext* const context_;
698  Call call_;
700 
703  start_ops_;
704  CallbackWithSuccessTag start_tag_;
705 
707  CallbackWithSuccessTag finish_tag_;
708  Status finish_status_;
709 
711  CallbackWithSuccessTag read_tag_;
712  bool read_ops_at_start_{false};
713 
714  // Minimum of 2 callbacks to pre-register for start and finish
715  std::atomic_int callbacks_outstanding_{2};
716  bool started_{false};
717 };
718 
719 template <class Response>
721  public:
722  template <class Request>
723  static void Create(
724  ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
725  ClientContext* context, const Request* request,
727  Call call = channel->CreateCall(method, context, channel->CallbackCQ());
728 
731  call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
732  ClientCallbackReaderImpl<Response>(call, context, request, reactor);
733  }
734 };
735 
736 template <class Request>
739  public:
740  // always allocated against a call arena, no memory free required
741  static void operator delete(void* ptr, std::size_t size) {
742  assert(size == sizeof(ClientCallbackWriterImpl));
743  }
744 
745  // This operator should never be called as the memory should be freed as part
746  // of the arena destruction. It only exists to provide a matching operator
747  // delete to the operator new so that some compilers will not complain (see
748  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
749  // there are no tests catching the compiler warning.
750  static void operator delete(void*, void*) { assert(0); }
751 
752  void MaybeFinish() {
753  if (--callbacks_outstanding_ == 0) {
754  Status s = std::move(finish_status_);
755  auto* reactor = reactor_;
756  auto* call = call_.call();
757  this->~ClientCallbackWriterImpl();
759  reactor->OnDone(s);
760  }
761  }
762 
763  void StartCall() override {
764  // This call initiates two batches, plus any backlog, each with a callback
765  // 1. Send initial metadata (unless corked) + recv initial metadata
766  // 2. Any backlog
767  // 3. Recv trailing metadata, on_completion callback
768  started_ = true;
769 
770  start_tag_.Set(call_.call(),
771  [this](bool ok) {
772  reactor_->OnReadInitialMetadataDone(ok);
773  MaybeFinish();
774  },
775  &start_ops_);
776  if (!start_corked_) {
777  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
778  context_->initial_metadata_flags());
779  }
780  start_ops_.RecvInitialMetadata(context_);
781  start_ops_.set_core_cq_tag(&start_tag_);
782  call_.PerformOps(&start_ops_);
783 
784  // Also set up the read and write tags so that they don't have to be set up
785  // each time
786  write_tag_.Set(call_.call(),
787  [this](bool ok) {
788  reactor_->OnWriteDone(ok);
789  MaybeFinish();
790  },
791  &write_ops_);
792  write_ops_.set_core_cq_tag(&write_tag_);
793 
794  if (write_ops_at_start_) {
795  call_.PerformOps(&write_ops_);
796  }
797 
798  if (writes_done_ops_at_start_) {
799  call_.PerformOps(&writes_done_ops_);
800  }
801 
802  finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
803  &finish_ops_);
804  finish_ops_.ClientRecvStatus(context_, &finish_status_);
805  finish_ops_.set_core_cq_tag(&finish_tag_);
806  call_.PerformOps(&finish_ops_);
807  }
808 
809  void Write(const Request* msg, WriteOptions options) override {
810  if (start_corked_) {
811  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
812  context_->initial_metadata_flags());
813  start_corked_ = false;
814  }
815 
816  if (options.is_last_message()) {
817  options.set_buffer_hint();
818  write_ops_.ClientSendClose();
819  }
820  // TODO(vjpai): don't assert
821  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
822  callbacks_outstanding_++;
823  if (started_) {
824  call_.PerformOps(&write_ops_);
825  } else {
826  write_ops_at_start_ = true;
827  }
828  }
829  void WritesDone() override {
830  if (start_corked_) {
831  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
832  context_->initial_metadata_flags());
833  start_corked_ = false;
834  }
835  writes_done_ops_.ClientSendClose();
836  writes_done_tag_.Set(call_.call(),
837  [this](bool ok) {
838  reactor_->OnWritesDoneDone(ok);
839  MaybeFinish();
840  },
841  &writes_done_ops_);
842  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
843  callbacks_outstanding_++;
844  if (started_) {
845  call_.PerformOps(&writes_done_ops_);
846  } else {
847  writes_done_ops_at_start_ = true;
848  }
849  }
850 
851  virtual void AddHold(int holds) override { callbacks_outstanding_ += holds; }
852  virtual void RemoveHold() override { MaybeFinish(); }
853 
854  private:
855  friend class ClientCallbackWriterFactory<Request>;
856 
857  template <class Response>
859  Call call, ClientContext* context, Response* response,
861  : context_(context),
862  call_(call),
863  reactor_(reactor),
864  start_corked_(context_->initial_metadata_corked_) {
865  this->BindReactor(reactor);
866  finish_ops_.RecvMessage(response);
867  finish_ops_.AllowNoMessage();
868  }
869 
870  ClientContext* const context_;
871  Call call_;
873 
875  CallbackWithSuccessTag start_tag_;
876  bool start_corked_;
877 
879  CallbackWithSuccessTag finish_tag_;
880  Status finish_status_;
881 
883  write_ops_;
884  CallbackWithSuccessTag write_tag_;
885  bool write_ops_at_start_{false};
886 
888  CallbackWithSuccessTag writes_done_tag_;
889  bool writes_done_ops_at_start_{false};
890 
891  // Minimum of 2 callbacks to pre-register for start and finish
892  std::atomic_int callbacks_outstanding_{2};
893  bool started_{false};
894 };
895 
896 template <class Request>
898  public:
899  template <class Response>
900  static void Create(
901  ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
902  ClientContext* context, Response* response,
904  Call call = channel->CreateCall(method, context, channel->CallbackCQ());
905 
908  call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
909  ClientCallbackWriterImpl<Request>(call, context, response, reactor);
910  }
911 };
912 
915  public:
916  // always allocated against a call arena, no memory free required
917  static void operator delete(void* ptr, std::size_t size) {
918  assert(size == sizeof(ClientCallbackUnaryImpl));
919  }
920 
921  // This operator should never be called as the memory should be freed as part
922  // of the arena destruction. It only exists to provide a matching operator
923  // delete to the operator new so that some compilers will not complain (see
924  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
925  // there are no tests catching the compiler warning.
926  static void operator delete(void*, void*) { assert(0); }
927 
928  void StartCall() override {
929  // This call initiates two batches, each with a callback
930  // 1. Send initial metadata + write + writes done + recv initial metadata
931  // 2. Read message, recv trailing metadata
932  started_ = true;
933 
934  start_tag_.Set(call_.call(),
935  [this](bool ok) {
936  reactor_->OnReadInitialMetadataDone(ok);
937  MaybeFinish();
938  },
939  &start_ops_);
940  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
941  context_->initial_metadata_flags());
942  start_ops_.RecvInitialMetadata(context_);
943  start_ops_.set_core_cq_tag(&start_tag_);
944  call_.PerformOps(&start_ops_);
945 
946  finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
947  &finish_ops_);
948  finish_ops_.ClientRecvStatus(context_, &finish_status_);
949  finish_ops_.set_core_cq_tag(&finish_tag_);
950  call_.PerformOps(&finish_ops_);
951  }
952 
953  void MaybeFinish() {
954  if (--callbacks_outstanding_ == 0) {
955  Status s = std::move(finish_status_);
956  auto* reactor = reactor_;
957  auto* call = call_.call();
958  this->~ClientCallbackUnaryImpl();
960  reactor->OnDone(s);
961  }
962  }
963 
964  private:
966 
967  template <class Request, class Response>
968  ClientCallbackUnaryImpl(Call call, ClientContext* context, Request* request,
969  Response* response,
971  : context_(context), call_(call), reactor_(reactor) {
972  this->BindReactor(reactor);
973  // TODO(vjpai): don't assert
974  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
975  start_ops_.ClientSendClose();
976  finish_ops_.RecvMessage(response);
977  finish_ops_.AllowNoMessage();
978  }
979 
980  ClientContext* const context_;
981  Call call_;
983 
986  start_ops_;
987  CallbackWithSuccessTag start_tag_;
988 
990  CallbackWithSuccessTag finish_tag_;
991  Status finish_status_;
992 
993  // This call will have 2 callbacks: start and finish
994  std::atomic_int callbacks_outstanding_{2};
995  bool started_{false};
996 };
997 
999  public:
1000  template <class Request, class Response>
1001  static void Create(ChannelInterface* channel,
1002  const ::grpc::internal::RpcMethod& method,
1003  ClientContext* context, const Request* request,
1004  Response* response,
1006  Call call = channel->CreateCall(method, context, channel->CallbackCQ());
1007 
1009 
1011  call.call(), sizeof(ClientCallbackUnaryImpl)))
1012  ClientCallbackUnaryImpl(call, context, request, response, reactor);
1013  }
1014 };
1015 
1016 } // namespace internal
1017 } // namespace grpc
1018 
1019 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
virtual void AddHold(int holds) override
Definition: client_callback.h:680
virtual ~ClientCallbackUnary()
Definition: client_callback.h:164
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:112
void StartCall() override
Definition: client_callback.h:632
::grpc_impl::ClientContext ClientContext
Definition: client_context.h:26
virtual ~ClientWriteReactor()
Definition: client_callback.h:335
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:352
Definition: channel_interface.h:59
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:145
virtual void grpc_call_ref(grpc_call *call)=0
Definition: client_callback.h:913
virtual ~ClientCallbackReader()
Definition: client_callback.h:129
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback.h:259
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
virtual void RemoveHold() override
Definition: client_callback.h:543
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback.h:101
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, Response *response, ::grpc::experimental::ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:900
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:821
virtual ~ClientReadReactor()
Definition: client_callback.h:311
virtual void grpc_call_unref(grpc_call *call)=0
void WritesDone() override
Definition: client_callback.h:829
void StartRead(Response *resp)
Definition: client_callback.h:314
CallbackUnaryCallImpl(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(Status)> on_completion)
Definition: client_callback.h:56
Definition: client_callback.h:110
Definition: channel_interface.h:63
void Write(const Request *req)
Definition: client_callback.h:146
void StartCall() override
Definition: client_callback.h:432
void Read(Response *msg) override
Definition: client_callback.h:490
Definition: channel_interface.h:49
void RemoveHold()
Definition: client_callback.h:261
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, ::grpc::experimental::ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:591
grpc_call * call() const
Definition: call.h:72
void WriteLast(const Request *req, WriteOptions options)
Definition: client_callback.h:148
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback.h:236
virtual void OnWriteDone(bool ok)
Definition: client_callback.h:353
virtual void OnDone(const Status &s)
Definition: client_callback.h:351
void Read(Response *msg) override
Definition: client_callback.h:670
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, const Request *request, ::grpc::experimental::ClientReadReactor< Response > *reactor)
Definition: client_callback.h:723
virtual ~ClientCallbackWriter()
Definition: client_callback.h:144
virtual void RemoveHold() override
Definition: client_callback.h:852
virtual void OnDone(const Status &s)
Definition: client_callback.h:320
Descriptor of an RPC method.
Definition: rpc_method.h:29
void StartWrite(const Request *req, WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback.h:214
Definition: client_callback.h:162
void AddMultipleHolds(int holds)
Definition: client_callback.h:260
virtual void AddHold(int holds) override
Definition: client_callback.h:851
::grpc_impl::Channel Channel
Definition: channel.h:26
Definition: client_callback.h:127
Definition: call_op_set.h:218
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback.h:206
void Write(const Request *msg, WriteOptions options) override
Definition: client_callback.h:809
Definition: call_op_set.h:696
Definition: client_callback.h:998
virtual void OnReadDone(bool ok)
Definition: client_callback.h:322
void StartWriteLast(const Request *req, WriteOptions options)
Definition: client_callback.h:342
Definition: call_op_set.h:288
virtual ~ClientBidiReactor()
Definition: client_callback.h:185
void StartWrite(const Request *req, WriteOptions options)
Definition: client_callback.h:339
virtual ~ClientUnaryReactor()
Definition: client_callback.h:375
void WritesDone() override
Definition: client_callback.h:520
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback.h:198
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback.h:103
virtual void OnWritesDoneDone(bool ok)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback.h:296
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start&#39;ed before this call...
Definition: client_callback.h:191
Codegen interface for grpc::Channel.
Definition: channel_interface.h:69
Definition: client_callback.h:405
virtual void AddHold(int holds) override
Definition: client_callback.h:542
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:51
virtual void RemoveHold() override
Definition: client_callback.h:681
void MaybeFinish()
Definition: client_callback.h:421
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback.h:99
void AddMultipleHolds(int holds)
Definition: client_callback.h:348
virtual void OnWritesDoneDone(bool ok)
Definition: client_callback.h:354
Definition: byte_buffer.h:41
Per-message write options.
Definition: call_op_set.h:85
Definition: client_callback.h:737
void MaybeFinish()
Definition: client_callback.h:752
Definition: channel_interface.h:61
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback.h:373
void StartWritesDone()
Definition: client_callback.h:345
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
Definition: call_op_set.h:594
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
virtual void OnDone(const Status &s)
Definition: client_callback.h:378
void Write(const Request *msg, WriteOptions options) override
Definition: client_callback.h:500
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
bool ok() const
Is the status OK?
Definition: status.h:118
void StartCall() override
Definition: client_callback.h:763
Definition: client_callback.h:142
void StartCall()
Definition: client_callback.h:313
virtual void OnDone(const Status &s)
Notifies the application that all operations associated with this RPC have completed and provides the...
Definition: client_callback.h:267
void AddMultipleHolds(int holds)
Definition: client_callback.h:317
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue_impl.h:102
A ClientContext allows the person implementing a service client to:
Definition: client_context_impl.h:178
Did it work? If it didn&#39;t, why?
Definition: status.h:31
virtual void OnReadDone(bool ok)
Notifies the application that a StartRead operation completed.
Definition: client_callback.h:282
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:121
Definition: callback_common.h:68
virtual void OnWriteDone(bool ok)
Notifies the application that a StartWrite operation completed.
Definition: client_callback.h:288
void AddHold()
Definition: client_callback.h:347
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
Definition: client_callback.h:606
void RemoveHold()
Definition: client_callback.h:349
Definition: call_op_set.h:744
void StartCall()
Definition: client_callback.h:377
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:136
void AddHold()
Definition: client_callback.h:316
void StartWriteLast(const Request *req, WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback.h:227
void MaybeFinish()
Definition: client_callback.h:621
virtual void OnReadInitialMetadataDone(bool ok)
Notifies the application that a read of initial metadata from the server is done. ...
Definition: client_callback.h:276
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:157
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:379
void RemoveHold()
Definition: client_callback.h:318
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:321
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, const Request *request, Response *response, ::grpc::experimental::ClientUnaryReactor *reactor)
Definition: client_callback.h:1001
void StartCall() override
Definition: client_callback.h:928
void StartWrite(const Request *req)
Definition: client_callback.h:338
Straightforward wrapping of the C call object.
Definition: call.h:38
void MaybeFinish()
Definition: client_callback.h:953
void StartCall()
Definition: client_callback.h:337
void CallbackUnaryCall(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(Status)> on_completion)
Perform a callback-based unary call TODO(vjpai): Combine as much as possible with the blocking unary ...
Definition: client_callback.h:45