GRPC C++  1.23.0
client_callback_impl.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
20 #include <atomic>
21 #include <functional>
22 
30 
31 namespace grpc {
32 namespace internal {
33 class RpcMethod;
34 } // namespace internal
35 } // namespace grpc
36 
37 namespace grpc_impl {
38 class Channel;
39 class ClientContext;
40 
41 namespace internal {
42 
45 template <class InputMessage, class OutputMessage>
47  const ::grpc::internal::RpcMethod& method,
48  ::grpc_impl::ClientContext* context,
49  const InputMessage* request, OutputMessage* result,
50  std::function<void(::grpc::Status)> on_completion) {
52  channel, method, context, request, result, on_completion);
53 }
54 
55 template <class InputMessage, class OutputMessage>
57  public:
59  const ::grpc::internal::RpcMethod& method,
60  ::grpc_impl::ClientContext* context,
61  const InputMessage* request, OutputMessage* result,
62  std::function<void(::grpc::Status)> on_completion) {
63  ::grpc_impl::CompletionQueue* cq = channel->CallbackCQ();
64  GPR_CODEGEN_ASSERT(cq != nullptr);
65  grpc::internal::Call call(channel->CreateCall(method, context, cq));
66 
67  using FullCallOpSet = grpc::internal::CallOpSet<
74 
76  call.call(), sizeof(FullCallOpSet))) FullCallOpSet;
77 
79  call.call(), sizeof(grpc::internal::CallbackWithStatusTag)))
80  grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
81 
82  // TODO(vjpai): Unify code with sync API as much as possible
83  ::grpc::Status s = ops->SendMessagePtr(request);
84  if (!s.ok()) {
85  tag->force_run(s);
86  return;
87  }
88  ops->SendInitialMetadata(&context->send_initial_metadata_,
89  context->initial_metadata_flags());
90  ops->RecvInitialMetadata(context);
91  ops->RecvMessage(result);
92  ops->AllowNoMessage();
93  ops->ClientSendClose();
94  ops->ClientRecvStatus(context, tag->status_ptr());
95  ops->set_core_cq_tag(tag);
96  call.PerformOps(ops);
97  }
98 };
99 } // namespace internal
100 
101 namespace experimental {
102 
103 // Forward declarations
104 template <class Request, class Response>
106 template <class Response>
108 template <class Request>
110 class ClientUnaryReactor;
111 
112 // NOTE: The streaming objects are not actually implemented in the public API.
113 // These interfaces are provided for mocking only. Typical applications
114 // will interact exclusively with the reactors that they define.
115 template <class Request, class Response>
117  public:
119  virtual void StartCall() = 0;
120  virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
121  virtual void WritesDone() = 0;
122  virtual void Read(Response* resp) = 0;
123  virtual void AddHold(int holds) = 0;
124  virtual void RemoveHold() = 0;
125 
126  protected:
128  reactor->BindStream(this);
129  }
130 };
131 
132 template <class Response>
134  public:
136  virtual void StartCall() = 0;
137  virtual void Read(Response* resp) = 0;
138  virtual void AddHold(int holds) = 0;
139  virtual void RemoveHold() = 0;
140 
141  protected:
143  reactor->BindReader(this);
144  }
145 };
146 
147 template <class Request>
149  public:
151  virtual void StartCall() = 0;
152  void Write(const Request* req) { Write(req, ::grpc::WriteOptions()); }
153  virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
154  void WriteLast(const Request* req, ::grpc::WriteOptions options) {
155  Write(req, options.set_last_message());
156  }
157  virtual void WritesDone() = 0;
158 
159  virtual void AddHold(int holds) = 0;
160  virtual void RemoveHold() = 0;
161 
162  protected:
164  reactor->BindWriter(this);
165  }
166 };
167 
169  public:
170  virtual ~ClientCallbackUnary() {}
171  virtual void StartCall() = 0;
172 
173  protected:
174  void BindReactor(ClientUnaryReactor* reactor);
175 };
176 
177 // The following classes are the reactor interfaces that are to be implemented
178 // by the user. They are passed in to the library as an argument to a call on a
179 // stub (either a codegen-ed call or a generic call). The streaming RPC is
180 // activated by calling StartCall, possibly after initiating StartRead,
181 // StartWrite, or AddHold operations on the streaming object. Note that none of
182 // the classes are pure; all reactions have a default empty reaction so that the
183 // user class only needs to override those classes that it cares about.
184 // The reactor must be passed to the stub invocation before any of the below
185 // operations can be called.
186 
188 template <class Request, class Response>
189 class ClientBidiReactor {
190  public:
191  virtual ~ClientBidiReactor() {}
192 
197  void StartCall() { stream_->StartCall(); }
198 
204  void StartRead(Response* resp) { stream_->Read(resp); }
205 
212  void StartWrite(const Request* req) {
213  StartWrite(req, ::grpc::WriteOptions());
214  }
215 
222  void StartWrite(const Request* req, ::grpc::WriteOptions options) {
223  stream_->Write(req, std::move(options));
224  }
225 
235  void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
236  StartWrite(req, std::move(options.set_last_message()));
237  }
238 
244  void StartWritesDone() { stream_->WritesDone(); }
245 
267  void AddHold() { AddMultipleHolds(1); }
268  void AddMultipleHolds(int holds) { stream_->AddHold(holds); }
269  void RemoveHold() { stream_->RemoveHold(); }
270 
275  virtual void OnDone(const ::grpc::Status& s) {}
276 
284  virtual void OnReadInitialMetadataDone(bool ok) {}
285 
290  virtual void OnReadDone(bool ok) {}
291 
296  virtual void OnWriteDone(bool ok) {}
297 
304  virtual void OnWritesDoneDone(bool ok) {}
305 
306  private:
307  friend class ClientCallbackReaderWriter<Request, Response>;
308  void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
309  stream_ = stream;
310  }
312 };
313 
316 template <class Response>
317 class ClientReadReactor {
318  public:
319  virtual ~ClientReadReactor() {}
320 
321  void StartCall() { reader_->StartCall(); }
322  void StartRead(Response* resp) { reader_->Read(resp); }
323 
324  void AddHold() { AddMultipleHolds(1); }
325  void AddMultipleHolds(int holds) { reader_->AddHold(holds); }
326  void RemoveHold() { reader_->RemoveHold(); }
327 
328  virtual void OnDone(const ::grpc::Status& s) {}
329  virtual void OnReadInitialMetadataDone(bool ok) {}
330  virtual void OnReadDone(bool ok) {}
331 
332  private:
333  friend class ClientCallbackReader<Response>;
334  void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
336 };
337 
340 template <class Request>
341 class ClientWriteReactor {
342  public:
343  virtual ~ClientWriteReactor() {}
344 
345  void StartCall() { writer_->StartCall(); }
346  void StartWrite(const Request* req) {
347  StartWrite(req, ::grpc::WriteOptions());
348  }
349  void StartWrite(const Request* req, ::grpc::WriteOptions options) {
350  writer_->Write(req, std::move(options));
351  }
352  void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
353  StartWrite(req, std::move(options.set_last_message()));
354  }
355  void StartWritesDone() { writer_->WritesDone(); }
356 
357  void AddHold() { AddMultipleHolds(1); }
358  void AddMultipleHolds(int holds) { writer_->AddHold(holds); }
359  void RemoveHold() { writer_->RemoveHold(); }
360 
361  virtual void OnDone(const ::grpc::Status& s) {}
362  virtual void OnReadInitialMetadataDone(bool ok) {}
363  virtual void OnWriteDone(bool ok) {}
364  virtual void OnWritesDoneDone(bool ok) {}
365 
366  private:
367  friend class ClientCallbackWriter<Request>;
368  void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
370 };
371 
384  public:
385  virtual ~ClientUnaryReactor() {}
386 
387  void StartCall() { call_->StartCall(); }
388  virtual void OnDone(const ::grpc::Status& s) {}
389  virtual void OnReadInitialMetadataDone(bool ok) {}
390 
391  private:
392  friend class ClientCallbackUnary;
393  void BindCall(ClientCallbackUnary* call) { call_ = call; }
394  ClientCallbackUnary* call_;
395 };
396 
397 // Define function out-of-line from class to avoid forward declaration issue
398 inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
399  reactor->BindCall(this);
400 }
401 
402 } // namespace experimental
403 
404 namespace internal {
405 
406 // Forward declare factory classes for friendship
407 template <class Request, class Response>
408 class ClientCallbackReaderWriterFactory;
409 template <class Response>
410 class ClientCallbackReaderFactory;
411 template <class Request>
412 class ClientCallbackWriterFactory;
413 
414 template <class Request, class Response>
416  : public experimental::ClientCallbackReaderWriter<Request, Response> {
417  public:
418  // always allocated against a call arena, no memory free required
419  static void operator delete(void* ptr, std::size_t size) {
420  assert(size == sizeof(ClientCallbackReaderWriterImpl));
421  }
422 
423  // This operator should never be called as the memory should be freed as part
424  // of the arena destruction. It only exists to provide a matching operator
425  // delete to the operator new so that some compilers will not complain (see
426  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
427  // there are no tests catching the compiler warning.
428  static void operator delete(void*, void*) { assert(0); }
429 
430  void MaybeFinish() {
431  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
432  1, std::memory_order_acq_rel) == 1)) {
433  ::grpc::Status s = std::move(finish_status_);
434  auto* reactor = reactor_;
435  auto* call = call_.call();
438  reactor->OnDone(s);
439  }
440  }
441 
442  void StartCall() override {
443  // This call initiates two batches, plus any backlog, each with a callback
444  // 1. Send initial metadata (unless corked) + recv initial metadata
445  // 2. Any read backlog
446  // 3. Any write backlog
447  // 4. Recv trailing metadata, on_completion callback
448  started_ = true;
449 
450  start_tag_.Set(call_.call(),
451  [this](bool ok) {
452  reactor_->OnReadInitialMetadataDone(ok);
453  MaybeFinish();
454  },
455  &start_ops_);
456  if (!start_corked_) {
457  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
458  context_->initial_metadata_flags());
459  }
460  start_ops_.RecvInitialMetadata(context_);
461  start_ops_.set_core_cq_tag(&start_tag_);
462  call_.PerformOps(&start_ops_);
463 
464  // Also set up the read and write tags so that they don't have to be set up
465  // each time
466  write_tag_.Set(call_.call(),
467  [this](bool ok) {
468  reactor_->OnWriteDone(ok);
469  MaybeFinish();
470  },
471  &write_ops_);
472  write_ops_.set_core_cq_tag(&write_tag_);
473 
474  read_tag_.Set(call_.call(),
475  [this](bool ok) {
476  reactor_->OnReadDone(ok);
477  MaybeFinish();
478  },
479  &read_ops_);
480  read_ops_.set_core_cq_tag(&read_tag_);
481  if (read_ops_at_start_) {
482  call_.PerformOps(&read_ops_);
483  }
484 
485  if (write_ops_at_start_) {
486  call_.PerformOps(&write_ops_);
487  }
488 
489  if (writes_done_ops_at_start_) {
490  call_.PerformOps(&writes_done_ops_);
491  }
492 
493  finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
494  &finish_ops_);
495  finish_ops_.ClientRecvStatus(context_, &finish_status_);
496  finish_ops_.set_core_cq_tag(&finish_tag_);
497  call_.PerformOps(&finish_ops_);
498  }
499 
500  void Read(Response* msg) override {
501  read_ops_.RecvMessage(msg);
502  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
503  if (started_) {
504  call_.PerformOps(&read_ops_);
505  } else {
506  read_ops_at_start_ = true;
507  }
508  }
509 
510  void Write(const Request* msg, ::grpc::WriteOptions options) override {
511  if (start_corked_) {
512  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
513  context_->initial_metadata_flags());
514  start_corked_ = false;
515  }
516 
517  if (options.is_last_message()) {
518  options.set_buffer_hint();
519  write_ops_.ClientSendClose();
520  }
521  // TODO(vjpai): don't assert
522  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
523  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
524  if (started_) {
525  call_.PerformOps(&write_ops_);
526  } else {
527  write_ops_at_start_ = true;
528  }
529  }
530  void WritesDone() override {
531  if (start_corked_) {
532  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
533  context_->initial_metadata_flags());
534  start_corked_ = false;
535  }
536  writes_done_ops_.ClientSendClose();
537  writes_done_tag_.Set(call_.call(),
538  [this](bool ok) {
539  reactor_->OnWritesDoneDone(ok);
540  MaybeFinish();
541  },
542  &writes_done_ops_);
543  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
544  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
545  if (started_) {
546  call_.PerformOps(&writes_done_ops_);
547  } else {
548  writes_done_ops_at_start_ = true;
549  }
550  }
551 
552  void AddHold(int holds) override {
553  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
554  }
555  void RemoveHold() override { MaybeFinish(); }
556 
557  private:
558  friend class ClientCallbackReaderWriterFactory<Request, Response>;
559 
563  : context_(context),
564  call_(call),
565  reactor_(reactor),
566  start_corked_(context_->initial_metadata_corked_) {
567  this->BindReactor(reactor);
568  }
569 
570  ::grpc_impl::ClientContext* const context_;
571  grpc::internal::Call call_;
573 
576  start_ops_;
578  bool start_corked_;
579 
582  ::grpc::Status finish_status_;
583 
584  grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
587  write_ops_;
589  bool write_ops_at_start_{false};
590 
591  grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
593  writes_done_ops_;
595  bool writes_done_ops_at_start_{false};
596 
598  read_ops_;
600  bool read_ops_at_start_{false};
601 
602  // Minimum of 2 callbacks to pre-register for start and finish
603  std::atomic<intptr_t> callbacks_outstanding_{2};
604  bool started_{false};
605 };
606 
607 template <class Request, class Response>
609  public:
610  static void Create(
611  ::grpc::ChannelInterface* channel,
612  const ::grpc::internal::RpcMethod& method,
613  ::grpc_impl::ClientContext* context,
615  grpc::internal::Call call =
616  channel->CreateCall(method, context, channel->CallbackCQ());
617 
622  reactor);
623  }
624 };
625 
626 template <class Response>
628  : public experimental::ClientCallbackReader<Response> {
629  public:
630  // always allocated against a call arena, no memory free required
631  static void operator delete(void* ptr, std::size_t size) {
632  assert(size == sizeof(ClientCallbackReaderImpl));
633  }
634 
635  // This operator should never be called as the memory should be freed as part
636  // of the arena destruction. It only exists to provide a matching operator
637  // delete to the operator new so that some compilers will not complain (see
638  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
639  // there are no tests catching the compiler warning.
640  static void operator delete(void*, void*) { assert(0); }
641 
642  void MaybeFinish() {
643  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
644  1, std::memory_order_acq_rel) == 1)) {
645  ::grpc::Status s = std::move(finish_status_);
646  auto* reactor = reactor_;
647  auto* call = call_.call();
648  this->~ClientCallbackReaderImpl();
650  reactor->OnDone(s);
651  }
652  }
653 
654  void StartCall() override {
655  // This call initiates two batches, plus any backlog, each with a callback
656  // 1. Send initial metadata (unless corked) + recv initial metadata
657  // 2. Any backlog
658  // 3. Recv trailing metadata, on_completion callback
659  started_ = true;
660 
661  start_tag_.Set(call_.call(),
662  [this](bool ok) {
663  reactor_->OnReadInitialMetadataDone(ok);
664  MaybeFinish();
665  },
666  &start_ops_);
667  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
668  context_->initial_metadata_flags());
669  start_ops_.RecvInitialMetadata(context_);
670  start_ops_.set_core_cq_tag(&start_tag_);
671  call_.PerformOps(&start_ops_);
672 
673  // Also set up the read tag so it doesn't have to be set up each time
674  read_tag_.Set(call_.call(),
675  [this](bool ok) {
676  reactor_->OnReadDone(ok);
677  MaybeFinish();
678  },
679  &read_ops_);
680  read_ops_.set_core_cq_tag(&read_tag_);
681  if (read_ops_at_start_) {
682  call_.PerformOps(&read_ops_);
683  }
684 
685  finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
686  &finish_ops_);
687  finish_ops_.ClientRecvStatus(context_, &finish_status_);
688  finish_ops_.set_core_cq_tag(&finish_tag_);
689  call_.PerformOps(&finish_ops_);
690  }
691 
692  void Read(Response* msg) override {
693  read_ops_.RecvMessage(msg);
694  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
695  if (started_) {
696  call_.PerformOps(&read_ops_);
697  } else {
698  read_ops_at_start_ = true;
699  }
700  }
701 
702  void AddHold(int holds) override {
703  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
704  }
705  void RemoveHold() override { MaybeFinish(); }
706 
707  private:
708  friend class ClientCallbackReaderFactory<Response>;
709 
710  template <class Request>
712  ::grpc_impl::ClientContext* context,
713  Request* request,
715  : context_(context), call_(call), reactor_(reactor) {
716  this->BindReactor(reactor);
717  // TODO(vjpai): don't assert
718  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
719  start_ops_.ClientSendClose();
720  }
721 
722  ::grpc_impl::ClientContext* const context_;
723  grpc::internal::Call call_;
725 
730  start_ops_;
732 
735  ::grpc::Status finish_status_;
736 
738  read_ops_;
740  bool read_ops_at_start_{false};
741 
742  // Minimum of 2 callbacks to pre-register for start and finish
743  std::atomic<intptr_t> callbacks_outstanding_{2};
744  bool started_{false};
745 };
746 
747 template <class Response>
749  public:
750  template <class Request>
751  static void Create(::grpc::ChannelInterface* channel,
752  const ::grpc::internal::RpcMethod& method,
753  ::grpc_impl::ClientContext* context,
754  const Request* request,
756  grpc::internal::Call call =
757  channel->CreateCall(method, context, channel->CallbackCQ());
758 
761  call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
762  ClientCallbackReaderImpl<Response>(call, context, request, reactor);
763  }
764 };
765 
766 template <class Request>
768  : public experimental::ClientCallbackWriter<Request> {
769  public:
770  // always allocated against a call arena, no memory free required
771  static void operator delete(void* ptr, std::size_t size) {
772  assert(size == sizeof(ClientCallbackWriterImpl));
773  }
774 
775  // This operator should never be called as the memory should be freed as part
776  // of the arena destruction. It only exists to provide a matching operator
777  // delete to the operator new so that some compilers will not complain (see
778  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
779  // there are no tests catching the compiler warning.
780  static void operator delete(void*, void*) { assert(0); }
781 
782  void MaybeFinish() {
783  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
784  1, std::memory_order_acq_rel) == 1)) {
785  ::grpc::Status s = std::move(finish_status_);
786  auto* reactor = reactor_;
787  auto* call = call_.call();
788  this->~ClientCallbackWriterImpl();
790  reactor->OnDone(s);
791  }
792  }
793 
794  void StartCall() override {
795  // This call initiates two batches, plus any backlog, each with a callback
796  // 1. Send initial metadata (unless corked) + recv initial metadata
797  // 2. Any backlog
798  // 3. Recv trailing metadata, on_completion callback
799  started_ = true;
800 
801  start_tag_.Set(call_.call(),
802  [this](bool ok) {
803  reactor_->OnReadInitialMetadataDone(ok);
804  MaybeFinish();
805  },
806  &start_ops_);
807  if (!start_corked_) {
808  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
809  context_->initial_metadata_flags());
810  }
811  start_ops_.RecvInitialMetadata(context_);
812  start_ops_.set_core_cq_tag(&start_tag_);
813  call_.PerformOps(&start_ops_);
814 
815  // Also set up the read and write tags so that they don't have to be set up
816  // each time
817  write_tag_.Set(call_.call(),
818  [this](bool ok) {
819  reactor_->OnWriteDone(ok);
820  MaybeFinish();
821  },
822  &write_ops_);
823  write_ops_.set_core_cq_tag(&write_tag_);
824 
825  if (write_ops_at_start_) {
826  call_.PerformOps(&write_ops_);
827  }
828 
829  if (writes_done_ops_at_start_) {
830  call_.PerformOps(&writes_done_ops_);
831  }
832 
833  finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
834  &finish_ops_);
835  finish_ops_.ClientRecvStatus(context_, &finish_status_);
836  finish_ops_.set_core_cq_tag(&finish_tag_);
837  call_.PerformOps(&finish_ops_);
838  }
839 
840  void Write(const Request* msg, ::grpc::WriteOptions options) override {
841  if (start_corked_) {
842  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
843  context_->initial_metadata_flags());
844  start_corked_ = false;
845  }
846 
847  if (options.is_last_message()) {
848  options.set_buffer_hint();
849  write_ops_.ClientSendClose();
850  }
851  // TODO(vjpai): don't assert
852  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
853  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
854  if (started_) {
855  call_.PerformOps(&write_ops_);
856  } else {
857  write_ops_at_start_ = true;
858  }
859  }
860  void WritesDone() override {
861  if (start_corked_) {
862  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
863  context_->initial_metadata_flags());
864  start_corked_ = false;
865  }
866  writes_done_ops_.ClientSendClose();
867  writes_done_tag_.Set(call_.call(),
868  [this](bool ok) {
869  reactor_->OnWritesDoneDone(ok);
870  MaybeFinish();
871  },
872  &writes_done_ops_);
873  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
874  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
875  if (started_) {
876  call_.PerformOps(&writes_done_ops_);
877  } else {
878  writes_done_ops_at_start_ = true;
879  }
880  }
881 
882  void AddHold(int holds) override {
883  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
884  }
885  void RemoveHold() override { MaybeFinish(); }
886 
887  private:
888  friend class ClientCallbackWriterFactory<Request>;
889 
890  template <class Response>
892  ::grpc_impl::ClientContext* context,
893  Response* response,
895  : context_(context),
896  call_(call),
897  reactor_(reactor),
898  start_corked_(context_->initial_metadata_corked_) {
899  this->BindReactor(reactor);
900  finish_ops_.RecvMessage(response);
901  finish_ops_.AllowNoMessage();
902  }
903 
904  ::grpc_impl::ClientContext* const context_;
905  grpc::internal::Call call_;
907 
910  start_ops_;
912  bool start_corked_;
913 
916  finish_ops_;
918  ::grpc::Status finish_status_;
919 
920  grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
923  write_ops_;
925  bool write_ops_at_start_{false};
926 
927  grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
929  writes_done_ops_;
931  bool writes_done_ops_at_start_{false};
932 
933  // Minimum of 2 callbacks to pre-register for start and finish
934  std::atomic<intptr_t> callbacks_outstanding_{2};
935  bool started_{false};
936 };
937 
938 template <class Request>
940  public:
941  template <class Response>
942  static void Create(::grpc::ChannelInterface* channel,
943  const ::grpc::internal::RpcMethod& method,
944  ::grpc_impl::ClientContext* context, Response* response,
946  grpc::internal::Call call =
947  channel->CreateCall(method, context, channel->CallbackCQ());
948 
951  call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
952  ClientCallbackWriterImpl<Request>(call, context, response, reactor);
953  }
954 };
955 
957  public:
958  // always allocated against a call arena, no memory free required
959  static void operator delete(void* ptr, std::size_t size) {
960  assert(size == sizeof(ClientCallbackUnaryImpl));
961  }
962 
963  // This operator should never be called as the memory should be freed as part
964  // of the arena destruction. It only exists to provide a matching operator
965  // delete to the operator new so that some compilers will not complain (see
966  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
967  // there are no tests catching the compiler warning.
968  static void operator delete(void*, void*) { assert(0); }
969 
970  void StartCall() override {
971  // This call initiates two batches, each with a callback
972  // 1. Send initial metadata + write + writes done + recv initial metadata
973  // 2. Read message, recv trailing metadata
974  started_ = true;
975 
976  start_tag_.Set(call_.call(),
977  [this](bool ok) {
978  reactor_->OnReadInitialMetadataDone(ok);
979  MaybeFinish();
980  },
981  &start_ops_);
982  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
983  context_->initial_metadata_flags());
984  start_ops_.RecvInitialMetadata(context_);
985  start_ops_.set_core_cq_tag(&start_tag_);
986  call_.PerformOps(&start_ops_);
987 
988  finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
989  &finish_ops_);
990  finish_ops_.ClientRecvStatus(context_, &finish_status_);
991  finish_ops_.set_core_cq_tag(&finish_tag_);
992  call_.PerformOps(&finish_ops_);
993  }
994 
995  void MaybeFinish() {
996  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
997  1, std::memory_order_acq_rel) == 1)) {
998  ::grpc::Status s = std::move(finish_status_);
999  auto* reactor = reactor_;
1000  auto* call = call_.call();
1001  this->~ClientCallbackUnaryImpl();
1003  reactor->OnDone(s);
1004  }
1005  }
1006 
1007  private:
1009 
1010  template <class Request, class Response>
1012  ::grpc_impl::ClientContext* context, Request* request,
1013  Response* response,
1015  : context_(context), call_(call), reactor_(reactor) {
1016  this->BindReactor(reactor);
1017  // TODO(vjpai): don't assert
1018  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
1019  start_ops_.ClientSendClose();
1020  finish_ops_.RecvMessage(response);
1021  finish_ops_.AllowNoMessage();
1022  }
1023 
1024  ::grpc_impl::ClientContext* const context_;
1025  grpc::internal::Call call_;
1026  experimental::ClientUnaryReactor* const reactor_;
1027 
1032  start_ops_;
1034 
1037  finish_ops_;
1039  ::grpc::Status finish_status_;
1040 
1041  // This call will have 2 callbacks: start and finish
1042  std::atomic<intptr_t> callbacks_outstanding_{2};
1043  bool started_{false};
1044 };
1045 
1047  public:
1048  template <class Request, class Response>
1049  static void Create(::grpc::ChannelInterface* channel,
1050  const ::grpc::internal::RpcMethod& method,
1051  ::grpc_impl::ClientContext* context,
1052  const Request* request, Response* response,
1054  grpc::internal::Call call =
1055  channel->CreateCall(method, context, channel->CallbackCQ());
1056 
1058 
1060  call.call(), sizeof(ClientCallbackUnaryImpl)))
1061  ClientCallbackUnaryImpl(call, context, request, response, reactor);
1062  }
1063 };
1064 
1065 } // namespace internal
1066 } // namespace grpc_impl
1067 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
virtual void OnReadDone(bool ok)
Notifies the application that a StartRead operation completed.
Definition: client_callback_impl.h:290
CallbackUnaryCallImpl(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Definition: client_callback_impl.h:58
::grpc_impl::ClientContext ClientContext
Definition: client_context.h:26
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback_impl.h:127
void Write(const Request *req)
Definition: client_callback_impl.h:152
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:145
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback_impl.h:163
void StartCall() override
Definition: client_callback_impl.h:970
virtual void grpc_call_ref(grpc_call *call)=0
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback_impl.h:329
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback_impl.h:212
void StartWrite(const Request *req)
Definition: client_callback_impl.h:346
virtual ~ClientUnaryReactor()
Definition: client_callback_impl.h:385
void Read(Response *msg) override
Definition: client_callback_impl.h:500
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
void RemoveHold()
Definition: client_callback_impl.h:326
Definition: client_callback_impl.h:1046
virtual void OnReadDone(bool ok)
Definition: client_callback_impl.h:330
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:821
void Read(Response *msg) override
Definition: client_callback_impl.h:692
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:352
virtual void OnWriteDone(bool ok)
Notifies the application that a StartWrite operation completed.
Definition: client_callback_impl.h:296
Definition: channel_interface.h:38
#define GPR_UNLIKELY(x)
Definition: port_platform.h:654
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback_impl.h:105
virtual void grpc_call_unref(grpc_call *call)=0
Definition: client_callback_impl.h:148
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback_impl.h:510
virtual ~ClientReadReactor()
Definition: client_callback_impl.h:319
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback_impl.h:142
Definition: channel_interface.h:50
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback_impl.h:383
grpc_call * call() const
Definition: call.h:72
virtual ~ClientCallbackReader()
Definition: client_callback_impl.h:135
Definition: client_callback_impl.h:133
void StartCall()
Definition: client_callback_impl.h:345
Definition: client_callback_impl.h:767
::google::protobuf::util::Status Status
Definition: config_protobuf.h:96
Definition: client_callback_impl.h:168
void AddHold()
Definition: client_callback_impl.h:357
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:349
void RemoveHold() override
Definition: client_callback_impl.h:705
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback_impl.h:840
void AddHold(int holds) override
Definition: client_callback_impl.h:702
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const Request *request, Response *response, experimental::ClientUnaryReactor *reactor)
Definition: client_callback_impl.h:1049
virtual ~ClientCallbackWriter()
Definition: client_callback_impl.h:150
void StartCall() override
Definition: client_callback_impl.h:794
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback_impl.h:222
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback_impl.h:107
::grpc_impl::Channel Channel
Definition: channel.h:26
virtual void OnDone(const ::grpc::Status &s)
Definition: client_callback_impl.h:328
Definition: call_op_set.h:218
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:358
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback_impl.h:389
Definition: call_op_set.h:696
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const Request *request, experimental::ClientReadReactor< Response > *reactor)
Definition: client_callback_impl.h:751
virtual ~ClientWriteReactor()
Definition: client_callback_impl.h:343
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback_impl.h:362
Definition: call_op_set.h:288
void WritesDone() override
Definition: client_callback_impl.h:860
virtual void OnDone(const ::grpc::Status &s)
Notifies the application that all operations associated with this RPC have completed and provides the...
Definition: client_callback_impl.h:275
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback_impl.h:109
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void RemoveHold()
Definition: client_callback_impl.h:359
Codegen interface for grpc::Channel.
Definition: channel_interface.h:70
void StartCall()
Definition: client_callback_impl.h:387
void AddHold(int holds) override
Definition: client_callback_impl.h:882
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue_impl.h:91
virtual void OnDone(const ::grpc::Status &s)
Definition: client_callback_impl.h:361
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback_impl.h:204
Definition: byte_buffer.h:52
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:325
void MaybeFinish()
Definition: client_callback_impl.h:430
Per-message write options.
Definition: call_op_set.h:85
void MaybeFinish()
Definition: client_callback_impl.h:995
virtual void OnWriteDone(bool ok)
Definition: client_callback_impl.h:363
void WriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:154
virtual void OnWritesDoneDone(bool ok)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback_impl.h:304
void RemoveHold() override
Definition: client_callback_impl.h:555
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
Definition: call_op_set.h:594
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
void MaybeFinish()
Definition: client_callback_impl.h:642
virtual ~ClientCallbackReaderWriter()
Definition: client_callback_impl.h:118
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
virtual ~ClientCallbackUnary()
Definition: client_callback_impl.h:170
bool ok() const
Is the status OK?
Definition: status.h:118
virtual void OnReadInitialMetadataDone(bool ok)
Notifies the application that a read of initial metadata from the server is done. ...
Definition: client_callback_impl.h:284
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback_impl.h:244
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue_impl.h:101
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:268
void AddHold(int holds) override
Definition: client_callback_impl.h:552
void AddHold()
Definition: client_callback_impl.h:324
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start&#39;ed before this call...
Definition: client_callback_impl.h:197
A ClientContext allows the person implementing a service client to:
Definition: client_context_impl.h:180
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback_impl.h:235
Did it work? If it didn&#39;t, why?
Definition: status.h:31
void StartRead(Response *resp)
Definition: client_callback_impl.h:322
void StartCall() override
Definition: client_callback_impl.h:654
Definition: callback_common.h:68
void RemoveHold()
Definition: client_callback_impl.h:269
void RemoveHold() override
Definition: client_callback_impl.h:885
Definition: channel_interface.h:52
Definition: call_op_set.h:516
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
void StartWritesDone()
Definition: client_callback_impl.h:355
void WritesDone() override
Definition: client_callback_impl.h:530
Definition: call_op_set.h:744
virtual void OnDone(const ::grpc::Status &s)
Definition: client_callback_impl.h:388
Definition: client_callback_impl.h:627
virtual void OnWritesDoneDone(bool ok)
Definition: client_callback_impl.h:364
void StartCall()
Definition: client_callback_impl.h:321
void StartCall() override
Definition: client_callback_impl.h:442
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, experimental::ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback_impl.h:610
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, Response *response, experimental::ClientWriteReactor< Request > *reactor)
Definition: client_callback_impl.h:942
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback_impl.h:267
virtual ~ClientBidiReactor()
Definition: client_callback_impl.h:191
Definition: client_callback_impl.h:956
Definition: client_callback_impl.h:415
Straightforward wrapping of the C call object.
Definition: call.h:38
void CallbackUnaryCall(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Perform a callback-based unary call TODO(vjpai): Combine as much as possible with the blocking unary ...
Definition: client_callback_impl.h:46
Definition: client_callback_impl.h:116
void MaybeFinish()
Definition: client_callback_impl.h:782