GRPC C++  1.36.1
client_callback.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
20 #include <atomic>
21 #include <functional>
22 
30 
31 namespace grpc {
32 class Channel;
33 class ClientContext;
34 
35 namespace internal {
36 class RpcMethod;
37 
44 template <class InputMessage, class OutputMessage,
45  class BaseInputMessage = InputMessage,
46  class BaseOutputMessage = OutputMessage>
48  const ::grpc::internal::RpcMethod& method,
49  ::grpc::ClientContext* context,
50  const InputMessage* request, OutputMessage* result,
51  std::function<void(::grpc::Status)> on_completion) {
52  static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
53  "Invalid input message specification");
54  static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
55  "Invalid output message specification");
57  channel, method, context, request, result, on_completion);
58 }
59 
60 template <class InputMessage, class OutputMessage>
61 class CallbackUnaryCallImpl {
62  public:
64  const ::grpc::internal::RpcMethod& method,
65  ::grpc::ClientContext* context,
66  const InputMessage* request, OutputMessage* result,
67  std::function<void(::grpc::Status)> on_completion) {
68  ::grpc::CompletionQueue* cq = channel->CallbackCQ();
69  GPR_CODEGEN_ASSERT(cq != nullptr);
70  grpc::internal::Call call(channel->CreateCall(method, context, cq));
71 
72  using FullCallOpSet = grpc::internal::CallOpSet<
79 
80  struct OpSetAndTag {
81  FullCallOpSet opset;
83  };
84  const size_t alloc_sz = sizeof(OpSetAndTag);
85  auto* const alloced = static_cast<OpSetAndTag*>(
87  alloc_sz));
88  auto* ops = new (&alloced->opset) FullCallOpSet;
89  auto* tag = new (&alloced->tag)
90  grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
91 
92  // TODO(vjpai): Unify code with sync API as much as possible
93  ::grpc::Status s = ops->SendMessagePtr(request);
94  if (!s.ok()) {
95  tag->force_run(s);
96  return;
97  }
98  ops->SendInitialMetadata(&context->send_initial_metadata_,
99  context->initial_metadata_flags());
100  ops->RecvInitialMetadata(context);
101  ops->RecvMessage(result);
102  ops->AllowNoMessage();
103  ops->ClientSendClose();
104  ops->ClientRecvStatus(context, tag->status_ptr());
105  ops->set_core_cq_tag(tag);
106  call.PerformOps(ops);
107  }
108 };
109 
110 // Base class for public API classes.
112  public:
120  virtual void OnDone(const ::grpc::Status& /*s*/) = 0;
121 
129  virtual void InternalScheduleOnDone(::grpc::Status s);
130 };
131 
132 } // namespace internal
133 
134 // Forward declarations
135 template <class Request, class Response>
137 template <class Response>
139 template <class Request>
141 class ClientUnaryReactor;
142 
143 // NOTE: The streaming objects are not actually implemented in the public API.
144 // These interfaces are provided for mocking only. Typical applications
145 // will interact exclusively with the reactors that they define.
146 template <class Request, class Response>
148  public:
150  virtual void StartCall() = 0;
151  virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
152  virtual void WritesDone() = 0;
153  virtual void Read(Response* resp) = 0;
154  virtual void AddHold(int holds) = 0;
155  virtual void RemoveHold() = 0;
156 
157  protected:
159  reactor->BindStream(this);
160  }
161 };
162 
163 template <class Response>
165  public:
167  virtual void StartCall() = 0;
168  virtual void Read(Response* resp) = 0;
169  virtual void AddHold(int holds) = 0;
170  virtual void RemoveHold() = 0;
171 
172  protected:
174  reactor->BindReader(this);
175  }
176 };
177 
178 template <class Request>
180  public:
182  virtual void StartCall() = 0;
183  void Write(const Request* req) { Write(req, ::grpc::WriteOptions()); }
184  virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
185  void WriteLast(const Request* req, ::grpc::WriteOptions options) {
186  Write(req, options.set_last_message());
187  }
188  virtual void WritesDone() = 0;
189 
190  virtual void AddHold(int holds) = 0;
191  virtual void RemoveHold() = 0;
192 
193  protected:
195  reactor->BindWriter(this);
196  }
197 };
198 
200  public:
201  virtual ~ClientCallbackUnary() {}
202  virtual void StartCall() = 0;
203 
204  protected:
205  void BindReactor(ClientUnaryReactor* reactor);
206 };
207 
208 // The following classes are the reactor interfaces that are to be implemented
209 // by the user. They are passed in to the library as an argument to a call on a
210 // stub (either a codegen-ed call or a generic call). The streaming RPC is
211 // activated by calling StartCall, possibly after initiating StartRead,
212 // StartWrite, or AddHold operations on the streaming object. Note that none of
213 // the classes are pure; all reactions have a default empty reaction so that the
214 // user class only needs to override those classes that it cares about.
215 // The reactor must be passed to the stub invocation before any of the below
216 // operations can be called.
217 
219 template <class Request, class Response>
220 class ClientBidiReactor : public internal::ClientReactor {
221  public:
222  virtual ~ClientBidiReactor() {}
223 
228  void StartCall() { stream_->StartCall(); }
229 
235  void StartRead(Response* resp) { stream_->Read(resp); }
236 
243  void StartWrite(const Request* req) {
244  StartWrite(req, ::grpc::WriteOptions());
245  }
246 
253  void StartWrite(const Request* req, ::grpc::WriteOptions options) {
254  stream_->Write(req, options);
255  }
256 
266  void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
267  StartWrite(req, options.set_last_message());
268  }
269 
275  void StartWritesDone() { stream_->WritesDone(); }
276 
299  void AddHold() { AddMultipleHolds(1); }
300  void AddMultipleHolds(int holds) {
301  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
302  stream_->AddHold(holds);
303  }
304  void RemoveHold() { stream_->RemoveHold(); }
305 
313  void OnDone(const ::grpc::Status& /*s*/) override {}
314 
323  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
324 
329  virtual void OnReadDone(bool /*ok*/) {}
330 
336  virtual void OnWriteDone(bool /*ok*/) {}
337 
345  virtual void OnWritesDoneDone(bool /*ok*/) {}
346 
347  private:
348  friend class ClientCallbackReaderWriter<Request, Response>;
349  void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
350  stream_ = stream;
351  }
353 };
354 
357 template <class Response>
358 class ClientReadReactor : public internal::ClientReactor {
359  public:
360  virtual ~ClientReadReactor() {}
361 
362  void StartCall() { reader_->StartCall(); }
363  void StartRead(Response* resp) { reader_->Read(resp); }
364 
365  void AddHold() { AddMultipleHolds(1); }
366  void AddMultipleHolds(int holds) {
367  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
368  reader_->AddHold(holds);
369  }
370  void RemoveHold() { reader_->RemoveHold(); }
371 
372  void OnDone(const ::grpc::Status& /*s*/) override {}
373  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
374  virtual void OnReadDone(bool /*ok*/) {}
375 
376  private:
377  friend class ClientCallbackReader<Response>;
378  void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
380 };
381 
384 template <class Request>
385 class ClientWriteReactor : public internal::ClientReactor {
386  public:
387  virtual ~ClientWriteReactor() {}
388 
389  void StartCall() { writer_->StartCall(); }
390  void StartWrite(const Request* req) {
391  StartWrite(req, ::grpc::WriteOptions());
392  }
393  void StartWrite(const Request* req, ::grpc::WriteOptions options) {
394  writer_->Write(req, options);
395  }
396  void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
397  StartWrite(req, options.set_last_message());
398  }
399  void StartWritesDone() { writer_->WritesDone(); }
400 
401  void AddHold() { AddMultipleHolds(1); }
402  void AddMultipleHolds(int holds) {
403  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
404  writer_->AddHold(holds);
405  }
406  void RemoveHold() { writer_->RemoveHold(); }
407 
408  void OnDone(const ::grpc::Status& /*s*/) override {}
409  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
410  virtual void OnWriteDone(bool /*ok*/) {}
411  virtual void OnWritesDoneDone(bool /*ok*/) {}
412 
413  private:
414  friend class ClientCallbackWriter<Request>;
415  void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
416 
418 };
419 
432  public:
433  virtual ~ClientUnaryReactor() {}
434 
435  void StartCall() { call_->StartCall(); }
436  void OnDone(const ::grpc::Status& /*s*/) override {}
437  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
438 
439  private:
440  friend class ClientCallbackUnary;
441  void BindCall(ClientCallbackUnary* call) { call_ = call; }
442  ClientCallbackUnary* call_;
443 };
444 
445 // Define function out-of-line from class to avoid forward declaration issue
447  reactor->BindCall(this);
448 }
449 
450 namespace internal {
451 
452 // Forward declare factory classes for friendship
453 template <class Request, class Response>
454 class ClientCallbackReaderWriterFactory;
455 template <class Response>
456 class ClientCallbackReaderFactory;
457 template <class Request>
458 class ClientCallbackWriterFactory;
459 
460 template <class Request, class Response>
462  : public ClientCallbackReaderWriter<Request, Response> {
463  public:
464  // always allocated against a call arena, no memory free required
465  static void operator delete(void* /*ptr*/, std::size_t size) {
467  }
468 
469  // This operator should never be called as the memory should be freed as part
470  // of the arena destruction. It only exists to provide a matching operator
471  // delete to the operator new so that some compilers will not complain (see
472  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
473  // there are no tests catching the compiler warning.
474  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
475 
476  void StartCall() override {
477  // This call initiates two batches, plus any backlog, each with a callback
478  // 1. Send initial metadata (unless corked) + recv initial metadata
479  // 2. Any read backlog
480  // 3. Any write backlog
481  // 4. Recv trailing metadata (unless corked)
482  if (!start_corked_) {
483  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
484  context_->initial_metadata_flags());
485  }
486 
487  call_.PerformOps(&start_ops_);
488 
489  {
490  grpc::internal::MutexLock lock(&start_mu_);
491 
492  if (backlog_.read_ops) {
493  call_.PerformOps(&read_ops_);
494  }
495  if (backlog_.write_ops) {
496  call_.PerformOps(&write_ops_);
497  }
498  if (backlog_.writes_done_ops) {
499  call_.PerformOps(&writes_done_ops_);
500  }
501  call_.PerformOps(&finish_ops_);
502  // The last thing in this critical section is to set started_ so that it
503  // can be used lock-free as well.
504  started_.store(true, std::memory_order_release);
505  }
506  // MaybeFinish outside the lock to make sure that destruction of this object
507  // doesn't take place while holding the lock (which would cause the lock to
508  // be released after destruction)
509  this->MaybeFinish(/*from_reaction=*/false);
510  }
511 
512  void Read(Response* msg) override {
513  read_ops_.RecvMessage(msg);
514  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
515  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
516  grpc::internal::MutexLock lock(&start_mu_);
517  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
518  backlog_.read_ops = true;
519  return;
520  }
521  }
522  call_.PerformOps(&read_ops_);
523  }
524 
525  void Write(const Request* msg, ::grpc::WriteOptions options) override {
526  if (options.is_last_message()) {
527  options.set_buffer_hint();
528  write_ops_.ClientSendClose();
529  }
530  // TODO(vjpai): don't assert
531  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
532  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
533  if (GPR_UNLIKELY(corked_write_needed_)) {
534  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
535  context_->initial_metadata_flags());
536  corked_write_needed_ = false;
537  }
538 
539  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
540  grpc::internal::MutexLock lock(&start_mu_);
541  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
542  backlog_.write_ops = true;
543  return;
544  }
545  }
546  call_.PerformOps(&write_ops_);
547  }
548  void WritesDone() override {
549  writes_done_ops_.ClientSendClose();
550  writes_done_tag_.Set(
551  call_.call(),
552  [this](bool ok) {
553  reactor_->OnWritesDoneDone(ok);
554  MaybeFinish(/*from_reaction=*/true);
555  },
556  &writes_done_ops_, /*can_inline=*/false);
557  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
558  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
559  if (GPR_UNLIKELY(corked_write_needed_)) {
560  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
561  context_->initial_metadata_flags());
562  corked_write_needed_ = false;
563  }
564  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
565  grpc::internal::MutexLock lock(&start_mu_);
566  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
567  backlog_.writes_done_ops = true;
568  return;
569  }
570  }
571  call_.PerformOps(&writes_done_ops_);
572  }
573 
574  void AddHold(int holds) override {
575  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
576  }
577  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
578 
579  private:
580  friend class ClientCallbackReaderWriterFactory<Request, Response>;
581 
583  ::grpc::ClientContext* context,
585  : context_(context),
586  call_(call),
587  reactor_(reactor),
588  start_corked_(context_->initial_metadata_corked_),
589  corked_write_needed_(start_corked_) {
590  this->BindReactor(reactor);
591 
592  // Set up the unchanging parts of the start, read, and write tags and ops.
593  start_tag_.Set(
594  call_.call(),
595  [this](bool ok) {
596  reactor_->OnReadInitialMetadataDone(ok);
597  MaybeFinish(/*from_reaction=*/true);
598  },
599  &start_ops_, /*can_inline=*/false);
600  start_ops_.RecvInitialMetadata(context_);
601  start_ops_.set_core_cq_tag(&start_tag_);
602 
603  write_tag_.Set(
604  call_.call(),
605  [this](bool ok) {
606  reactor_->OnWriteDone(ok);
607  MaybeFinish(/*from_reaction=*/true);
608  },
609  &write_ops_, /*can_inline=*/false);
610  write_ops_.set_core_cq_tag(&write_tag_);
611 
612  read_tag_.Set(
613  call_.call(),
614  [this](bool ok) {
615  reactor_->OnReadDone(ok);
616  MaybeFinish(/*from_reaction=*/true);
617  },
618  &read_ops_, /*can_inline=*/false);
619  read_ops_.set_core_cq_tag(&read_tag_);
620 
621  // Also set up the Finish tag and op set.
622  finish_tag_.Set(
623  call_.call(),
624  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
625  &finish_ops_,
626  /*can_inline=*/false);
627  finish_ops_.ClientRecvStatus(context_, &finish_status_);
628  finish_ops_.set_core_cq_tag(&finish_tag_);
629  }
630 
631  // MaybeFinish can be called from reactions or from user-initiated operations
632  // like StartCall or RemoveHold. If this is the last operation or hold on this
633  // object, it will invoke the OnDone reaction. If MaybeFinish was called from
634  // a reaction, it can call OnDone directly. If not, it would need to schedule
635  // OnDone onto an executor thread to avoid the possibility of deadlocking with
636  // any locks in the user code that invoked it.
637  void MaybeFinish(bool from_reaction) {
638  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
639  1, std::memory_order_acq_rel) == 1)) {
640  ::grpc::Status s = std::move(finish_status_);
641  auto* reactor = reactor_;
642  auto* call = call_.call();
643  this->~ClientCallbackReaderWriterImpl();
645  if (GPR_LIKELY(from_reaction)) {
646  reactor->OnDone(s);
647  } else {
648  reactor->InternalScheduleOnDone(std::move(s));
649  }
650  }
651  }
652 
653  ::grpc::ClientContext* const context_;
654  grpc::internal::Call call_;
655  ClientBidiReactor<Request, Response>* const reactor_;
656 
659  start_ops_;
661  const bool start_corked_;
662  bool corked_write_needed_; // no lock needed since only accessed in
663  // Write/WritesDone which cannot be concurrent
664 
667  ::grpc::Status finish_status_;
668 
672  write_ops_;
674 
677  writes_done_ops_;
679 
681  read_ops_;
683 
684  struct StartCallBacklog {
685  bool write_ops = false;
686  bool writes_done_ops = false;
687  bool read_ops = false;
688  };
689  StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
690 
691  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
692  std::atomic<intptr_t> callbacks_outstanding_{3};
693  std::atomic_bool started_{false};
694  grpc::internal::Mutex start_mu_;
695 };
696 
697 template <class Request, class Response>
698 class ClientCallbackReaderWriterFactory {
699  public:
700  static void Create(::grpc::ChannelInterface* channel,
701  const ::grpc::internal::RpcMethod& method,
702  ::grpc::ClientContext* context,
704  grpc::internal::Call call =
705  channel->CreateCall(method, context, channel->CallbackCQ());
706 
711  reactor);
712  }
713 };
714 
715 template <class Response>
717  public:
718  // always allocated against a call arena, no memory free required
719  static void operator delete(void* /*ptr*/, std::size_t size) {
721  }
722 
723  // This operator should never be called as the memory should be freed as part
724  // of the arena destruction. It only exists to provide a matching operator
725  // delete to the operator new so that some compilers will not complain (see
726  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
727  // there are no tests catching the compiler warning.
728  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
729 
730  void StartCall() override {
731  // This call initiates two batches, plus any backlog, each with a callback
732  // 1. Send initial metadata (unless corked) + recv initial metadata
733  // 2. Any backlog
734  // 3. Recv trailing metadata
735 
736  start_tag_.Set(
737  call_.call(),
738  [this](bool ok) {
739  reactor_->OnReadInitialMetadataDone(ok);
740  MaybeFinish(/*from_reaction=*/true);
741  },
742  &start_ops_, /*can_inline=*/false);
743  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
744  context_->initial_metadata_flags());
745  start_ops_.RecvInitialMetadata(context_);
746  start_ops_.set_core_cq_tag(&start_tag_);
747  call_.PerformOps(&start_ops_);
748 
749  // Also set up the read tag so it doesn't have to be set up each time
750  read_tag_.Set(
751  call_.call(),
752  [this](bool ok) {
753  reactor_->OnReadDone(ok);
754  MaybeFinish(/*from_reaction=*/true);
755  },
756  &read_ops_, /*can_inline=*/false);
757  read_ops_.set_core_cq_tag(&read_tag_);
758 
759  {
760  grpc::internal::MutexLock lock(&start_mu_);
761  if (backlog_.read_ops) {
762  call_.PerformOps(&read_ops_);
763  }
764  started_.store(true, std::memory_order_release);
765  }
766 
767  finish_tag_.Set(
768  call_.call(),
769  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
770  &finish_ops_, /*can_inline=*/false);
771  finish_ops_.ClientRecvStatus(context_, &finish_status_);
772  finish_ops_.set_core_cq_tag(&finish_tag_);
773  call_.PerformOps(&finish_ops_);
774  }
775 
776  void Read(Response* msg) override {
777  read_ops_.RecvMessage(msg);
778  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
779  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
780  grpc::internal::MutexLock lock(&start_mu_);
781  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
782  backlog_.read_ops = true;
783  return;
784  }
785  }
786  call_.PerformOps(&read_ops_);
787  }
788 
789  void AddHold(int holds) override {
790  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
791  }
792  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
793 
794  private:
795  friend class ClientCallbackReaderFactory<Response>;
796 
797  template <class Request>
799  ::grpc::ClientContext* context, Request* request,
801  : context_(context), call_(call), reactor_(reactor) {
802  this->BindReactor(reactor);
803  // TODO(vjpai): don't assert
804  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
805  start_ops_.ClientSendClose();
806  }
807 
808  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
809  void MaybeFinish(bool from_reaction) {
810  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
811  1, std::memory_order_acq_rel) == 1)) {
812  ::grpc::Status s = std::move(finish_status_);
813  auto* reactor = reactor_;
814  auto* call = call_.call();
815  this->~ClientCallbackReaderImpl();
817  if (GPR_LIKELY(from_reaction)) {
818  reactor->OnDone(s);
819  } else {
820  reactor->InternalScheduleOnDone(std::move(s));
821  }
822  }
823  }
824 
825  ::grpc::ClientContext* const context_;
826  grpc::internal::Call call_;
827  ClientReadReactor<Response>* const reactor_;
828 
833  start_ops_;
835 
838  ::grpc::Status finish_status_;
839 
841  read_ops_;
843 
844  struct StartCallBacklog {
845  bool read_ops = false;
846  };
847  StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
848 
849  // Minimum of 2 callbacks to pre-register for start and finish
850  std::atomic<intptr_t> callbacks_outstanding_{2};
851  std::atomic_bool started_{false};
852  grpc::internal::Mutex start_mu_;
853 };
854 
855 template <class Response>
856 class ClientCallbackReaderFactory {
857  public:
858  template <class Request>
859  static void Create(::grpc::ChannelInterface* channel,
860  const ::grpc::internal::RpcMethod& method,
861  ::grpc::ClientContext* context, const Request* request,
862  ClientReadReactor<Response>* reactor) {
863  grpc::internal::Call call =
864  channel->CreateCall(method, context, channel->CallbackCQ());
865 
868  call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
869  ClientCallbackReaderImpl<Response>(call, context, request, reactor);
870  }
871 };
872 
873 template <class Request>
875  public:
876  // always allocated against a call arena, no memory free required
877  static void operator delete(void* /*ptr*/, std::size_t size) {
879  }
880 
881  // This operator should never be called as the memory should be freed as part
882  // of the arena destruction. It only exists to provide a matching operator
883  // delete to the operator new so that some compilers will not complain (see
884  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
885  // there are no tests catching the compiler warning.
886  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
887 
888  void StartCall() override {
889  // This call initiates two batches, plus any backlog, each with a callback
890  // 1. Send initial metadata (unless corked) + recv initial metadata
891  // 2. Any backlog
892  // 3. Recv trailing metadata
893 
894  if (!start_corked_) {
895  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
896  context_->initial_metadata_flags());
897  }
898  call_.PerformOps(&start_ops_);
899 
900  {
901  grpc::internal::MutexLock lock(&start_mu_);
902 
903  if (backlog_.write_ops) {
904  call_.PerformOps(&write_ops_);
905  }
906  if (backlog_.writes_done_ops) {
907  call_.PerformOps(&writes_done_ops_);
908  }
909  call_.PerformOps(&finish_ops_);
910  // The last thing in this critical section is to set started_ so that it
911  // can be used lock-free as well.
912  started_.store(true, std::memory_order_release);
913  }
914  // MaybeFinish outside the lock to make sure that destruction of this object
915  // doesn't take place while holding the lock (which would cause the lock to
916  // be released after destruction)
917  this->MaybeFinish(/*from_reaction=*/false);
918  }
919 
920  void Write(const Request* msg, ::grpc::WriteOptions options) override {
921  if (GPR_UNLIKELY(options.is_last_message())) {
922  options.set_buffer_hint();
923  write_ops_.ClientSendClose();
924  }
925  // TODO(vjpai): don't assert
926  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
927  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
928 
929  if (GPR_UNLIKELY(corked_write_needed_)) {
930  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
931  context_->initial_metadata_flags());
932  corked_write_needed_ = false;
933  }
934 
935  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
936  grpc::internal::MutexLock lock(&start_mu_);
937  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
938  backlog_.write_ops = true;
939  return;
940  }
941  }
942  call_.PerformOps(&write_ops_);
943  }
944 
945  void WritesDone() override {
946  writes_done_ops_.ClientSendClose();
947  writes_done_tag_.Set(
948  call_.call(),
949  [this](bool ok) {
950  reactor_->OnWritesDoneDone(ok);
951  MaybeFinish(/*from_reaction=*/true);
952  },
953  &writes_done_ops_, /*can_inline=*/false);
954  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
955  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
956 
957  if (GPR_UNLIKELY(corked_write_needed_)) {
958  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
959  context_->initial_metadata_flags());
960  corked_write_needed_ = false;
961  }
962 
963  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
964  grpc::internal::MutexLock lock(&start_mu_);
965  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
966  backlog_.writes_done_ops = true;
967  return;
968  }
969  }
970  call_.PerformOps(&writes_done_ops_);
971  }
972 
973  void AddHold(int holds) override {
974  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
975  }
976  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
977 
978  private:
979  friend class ClientCallbackWriterFactory<Request>;
980 
981  template <class Response>
983  ::grpc::ClientContext* context, Response* response,
985  : context_(context),
986  call_(call),
987  reactor_(reactor),
988  start_corked_(context_->initial_metadata_corked_),
989  corked_write_needed_(start_corked_) {
990  this->BindReactor(reactor);
991 
992  // Set up the unchanging parts of the start and write tags and ops.
993  start_tag_.Set(
994  call_.call(),
995  [this](bool ok) {
996  reactor_->OnReadInitialMetadataDone(ok);
997  MaybeFinish(/*from_reaction=*/true);
998  },
999  &start_ops_, /*can_inline=*/false);
1000  start_ops_.RecvInitialMetadata(context_);
1001  start_ops_.set_core_cq_tag(&start_tag_);
1002 
1003  write_tag_.Set(
1004  call_.call(),
1005  [this](bool ok) {
1006  reactor_->OnWriteDone(ok);
1007  MaybeFinish(/*from_reaction=*/true);
1008  },
1009  &write_ops_, /*can_inline=*/false);
1010  write_ops_.set_core_cq_tag(&write_tag_);
1011 
1012  // Also set up the Finish tag and op set.
1013  finish_ops_.RecvMessage(response);
1014  finish_ops_.AllowNoMessage();
1015  finish_tag_.Set(
1016  call_.call(),
1017  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1018  &finish_ops_,
1019  /*can_inline=*/false);
1020  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1021  finish_ops_.set_core_cq_tag(&finish_tag_);
1022  }
1023 
1024  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
1025  void MaybeFinish(bool from_reaction) {
1026  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1027  1, std::memory_order_acq_rel) == 1)) {
1028  ::grpc::Status s = std::move(finish_status_);
1029  auto* reactor = reactor_;
1030  auto* call = call_.call();
1031  this->~ClientCallbackWriterImpl();
1033  if (GPR_LIKELY(from_reaction)) {
1034  reactor->OnDone(s);
1035  } else {
1036  reactor->InternalScheduleOnDone(std::move(s));
1037  }
1038  }
1039  }
1040 
1041  ::grpc::ClientContext* const context_;
1042  grpc::internal::Call call_;
1043  ClientWriteReactor<Request>* const reactor_;
1044 
1047  start_ops_;
1049  const bool start_corked_;
1050  bool corked_write_needed_; // no lock needed since only accessed in
1051  // Write/WritesDone which cannot be concurrent
1052 
1055  finish_ops_;
1057  ::grpc::Status finish_status_;
1058 
1062  write_ops_;
1064 
1067  writes_done_ops_;
1068  grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1069 
1070  struct StartCallBacklog {
1071  bool write_ops = false;
1072  bool writes_done_ops = false;
1073  };
1074  StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
1075 
1076  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1077  std::atomic<intptr_t> callbacks_outstanding_{3};
1078  std::atomic_bool started_{false};
1079  grpc::internal::Mutex start_mu_;
1080 };
1081 
1082 template <class Request>
1083 class ClientCallbackWriterFactory {
1084  public:
1085  template <class Response>
1086  static void Create(::grpc::ChannelInterface* channel,
1087  const ::grpc::internal::RpcMethod& method,
1088  ::grpc::ClientContext* context, Response* response,
1089  ClientWriteReactor<Request>* reactor) {
1090  grpc::internal::Call call =
1091  channel->CreateCall(method, context, channel->CallbackCQ());
1092 
1095  call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
1096  ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1097  }
1098 };
1099 
1101  public:
1102  // always allocated against a call arena, no memory free required
1103  static void operator delete(void* /*ptr*/, std::size_t size) {
1105  }
1106 
1107  // This operator should never be called as the memory should be freed as part
1108  // of the arena destruction. It only exists to provide a matching operator
1109  // delete to the operator new so that some compilers will not complain (see
1110  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1111  // there are no tests catching the compiler warning.
1112  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
1113 
1114  void StartCall() override {
1115  // This call initiates two batches, each with a callback
1116  // 1. Send initial metadata + write + writes done + recv initial metadata
1117  // 2. Read message, recv trailing metadata
1118 
1119  start_tag_.Set(
1120  call_.call(),
1121  [this](bool ok) {
1122  reactor_->OnReadInitialMetadataDone(ok);
1123  MaybeFinish();
1124  },
1125  &start_ops_, /*can_inline=*/false);
1126  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1127  context_->initial_metadata_flags());
1128  start_ops_.RecvInitialMetadata(context_);
1129  start_ops_.set_core_cq_tag(&start_tag_);
1130  call_.PerformOps(&start_ops_);
1131 
1132  finish_tag_.Set(
1133  call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_,
1134  /*can_inline=*/false);
1135  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1136  finish_ops_.set_core_cq_tag(&finish_tag_);
1137  call_.PerformOps(&finish_ops_);
1138  }
1139 
1140  private:
1142 
1143  template <class Request, class Response>
1145  ::grpc::ClientContext* context, Request* request,
1146  Response* response, ClientUnaryReactor* reactor)
1147  : context_(context), call_(call), reactor_(reactor) {
1148  this->BindReactor(reactor);
1149  // TODO(vjpai): don't assert
1150  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
1151  start_ops_.ClientSendClose();
1152  finish_ops_.RecvMessage(response);
1153  finish_ops_.AllowNoMessage();
1154  }
1155 
1156  // In the unary case, MaybeFinish is only ever invoked from a
1157  // library-initiated reaction, so it will just directly call OnDone if this is
1158  // the last reaction for this RPC.
1159  void MaybeFinish() {
1160  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1161  1, std::memory_order_acq_rel) == 1)) {
1162  ::grpc::Status s = std::move(finish_status_);
1163  auto* reactor = reactor_;
1164  auto* call = call_.call();
1165  this->~ClientCallbackUnaryImpl();
1167  reactor->OnDone(s);
1168  }
1169  }
1170 
1171  ::grpc::ClientContext* const context_;
1172  grpc::internal::Call call_;
1173  ClientUnaryReactor* const reactor_;
1174 
1179  start_ops_;
1181 
1184  finish_ops_;
1186  ::grpc::Status finish_status_;
1187 
1188  // This call will have 2 callbacks: start and finish
1189  std::atomic<intptr_t> callbacks_outstanding_{2};
1190 };
1191 
1193  public:
1194  template <class Request, class Response, class BaseRequest = Request,
1195  class BaseResponse = Response>
1196  static void Create(::grpc::ChannelInterface* channel,
1197  const ::grpc::internal::RpcMethod& method,
1198  ::grpc::ClientContext* context, const Request* request,
1199  Response* response, ClientUnaryReactor* reactor) {
1200  grpc::internal::Call call =
1201  channel->CreateCall(method, context, channel->CallbackCQ());
1202 
1204 
1206  call.call(), sizeof(ClientCallbackUnaryImpl)))
1207  ClientCallbackUnaryImpl(call, context,
1208  static_cast<const BaseRequest*>(request),
1209  static_cast<BaseResponse*>(response), reactor);
1210  }
1211 };
1212 
1213 } // namespace internal
1214 
1215 // TODO(vjpai): Remove namespace experimental when de-experimentalized fully.
1216 namespace experimental {
1217 
1218 template <class Response>
1220 
1221 template <class Request>
1223 
1224 template <class Request, class Response>
1227 
1228 template <class Response>
1230 
1231 template <class Request>
1233 
1234 template <class Request, class Response>
1236 
1238 
1239 } // namespace experimental
1240 
1241 } // namespace grpc
1242 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
grpc::internal::ClientCallbackUnaryImpl
Definition: client_callback.h:1100
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:137
grpc::ClientReadReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:373
grpc::ClientCallbackReaderWriter::Write
virtual void Write(const Request *req, ::grpc::WriteOptions options)=0
grpc::ClientWriteReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:409
grpc::ClientWriteReactor::StartWritesDone
void StartWritesDone()
Definition: client_callback.h:399
grpc::internal::CallOpRecvInitialMetadata
Definition: call_op_set.h:721
grpc::ClientCallbackWriter::~ClientCallbackWriter
virtual ~ClientCallbackWriter()
Definition: client_callback.h:181
grpc::internal::ClientCallbackReaderWriterImpl::Read
void Read(Response *msg) override
Definition: client_callback.h:512
grpc::internal::ClientCallbackWriterImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:973
grpc::experimental::ClientBidiReactor
::grpc::ClientBidiReactor< Request, Response > ClientBidiReactor
Definition: client_callback.h:1235
grpc::internal::CallOpClientSendClose
Definition: call_op_set.h:619
grpc::ClientBidiReactor::StartWrite
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:243
grpc::internal::CallOpGenericRecvMessage
Definition: call_op_set.h:526
grpc::ClientBidiReactor::AddHold
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback.h:299
grpc::internal::ClientCallbackReaderWriterImpl
Definition: client_callback.h:461
grpc::internal::ClientReactor::OnDone
virtual void OnDone(const ::grpc::Status &)=0
Called by the library when all operations associated with this RPC have completed and all Holds have ...
grpc::ClientReadReactor::OnReadDone
virtual void OnReadDone(bool)
Definition: client_callback.h:374
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:852
status.h
grpc::internal::ClientCallbackReaderImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:792
grpc::CoreCodegenInterface::grpc_call_ref
virtual void grpc_call_ref(grpc_call *call)=0
grpc::ClientWriteReactor::StartWriteLast
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:396
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:282
grpc::WriteOptions::set_last_message
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:156
grpc::ClientUnaryReactor
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback.h:431
grpc::ClientCallbackReaderWriter::Read
virtual void Read(Response *resp)=0
grpc::CoreCodegenInterface::grpc_call_arena_alloc
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
GPR_LIKELY
#define GPR_LIKELY(x)
Definition: port_platform.h:652
config.h
grpc::ClientWriteReactor::StartCall
void StartCall()
Definition: client_callback.h:389
grpc::internal::ClientCallbackReaderWriterImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:577
grpc::ClientUnaryReactor::~ClientUnaryReactor
virtual ~ClientUnaryReactor()
Definition: client_callback.h:433
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:35
grpc::ClientCallbackReaderWriter::WritesDone
virtual void WritesDone()=0
grpc::internal::ClientCallbackWriterImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:976
core_codegen_interface.h
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:212
grpc::internal::CallbackUnaryCallImpl::CallbackUnaryCallImpl
CallbackUnaryCallImpl(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Definition: client_callback.h:63
grpc::experimental::ClientUnaryReactor
::grpc::ClientUnaryReactor ClientUnaryReactor
Definition: client_callback.h:1237
grpc::ClientBidiReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Notifies the application that a read of initial metadata from the server is done.
Definition: client_callback.h:323
grpc::internal::CallbackWithStatusTag
Definition: callback_common.h:69
grpc::ClientCallbackReaderWriter::BindReactor
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:158
grpc::internal::ClientCallbackUnaryFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, Response *response, ClientUnaryReactor *reactor)
Definition: client_callback.h:1196
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:118
grpc::internal::ClientCallbackReaderFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, ClientReadReactor< Response > *reactor)
Definition: client_callback.h:859
grpc::ClientCallbackUnary::BindReactor
void BindReactor(ClientUnaryReactor *reactor)
Definition: client_callback.h:446
grpc::ClientBidiReactor::StartRead
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:235
grpc::ClientReadReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:366
grpc::internal::ClientCallbackWriterImpl::StartCall
void StartCall() override
Definition: client_callback.h:888
grpc::experimental::ClientReadReactor
::grpc::ClientReadReactor< Response > ClientReadReactor
Definition: client_callback.h:1229
grpc::ClientCallbackUnary::StartCall
virtual void StartCall()=0
grpc::ClientCallbackReaderWriter::AddHold
virtual void AddHold(int holds)=0
grpc::ClientBidiReactor::OnReadDone
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: client_callback.h:329
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:31
grpc::ClientReadReactor::~ClientReadReactor
virtual ~ClientReadReactor()
Definition: client_callback.h:360
grpc::ClientCallbackWriter::WritesDone
virtual void WritesDone()=0
grpc::internal::ClientCallbackReaderWriterImpl::Write
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback.h:525
grpc::internal::ClientCallbackWriterFactory
Definition: channel_interface.h:49
grpc::ClientReadReactor::StartCall
void StartCall()
Definition: client_callback.h:362
grpc::ClientBidiReactor::OnWriteDone
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite or StartWriteLast operation completed.
Definition: client_callback.h:336
grpc::ClientContext
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:192
grpc::ClientBidiReactor::OnDone
void OnDone(const ::grpc::Status &) override
Notifies the application that all operations associated with this RPC have completed and all Holds ha...
Definition: client_callback.h:313
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: port_platform.h:653
grpc::internal::ClientCallbackReaderImpl::StartCall
void StartCall() override
Definition: client_callback.h:730
grpc::internal::ClientCallbackReaderWriterFactory
Definition: channel_interface.h:45
grpc::internal::CallbackUnaryCallImpl
Definition: channel_interface.h:36
grpc::ClientUnaryReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:437
grpc::ClientWriteReactor::StartWrite
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:393
grpc::ClientCallbackWriter::BindReactor
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:194
grpc::ClientCallbackReader::RemoveHold
virtual void RemoveHold()=0
grpc::ClientCallbackReaderWriter
Definition: client_callback.h:147
grpc::ClientBidiReactor::StartWritesDone
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback.h:275
grpc::ClientCallbackReader::Read
virtual void Read(Response *resp)=0
grpc::ClientReadReactor::StartRead
void StartRead(Response *resp)
Definition: client_callback.h:363
grpc::ChannelInterface
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
grpc::internal::ClientReactor
Definition: client_callback.h:111
grpc::internal::ClientCallbackUnaryFactory
Definition: client_callback.h:1192
grpc::internal::ClientCallbackReaderImpl::Read
void Read(Response *msg) override
Definition: client_callback.h:776
grpc::internal::ClientCallbackWriterImpl
Definition: client_callback.h:874
grpc::ClientWriteReactor
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback.h:140
grpc::internal::ClientCallbackReaderWriterFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:700
grpc::protobuf::util::Status
::google::protobuf::util::Status Status
Definition: config_protobuf.h:91
grpc::ClientCallbackUnary
Definition: client_callback.h:199
grpc::ClientBidiReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:300
grpc::ClientWriteReactor::OnDone
void OnDone(const ::grpc::Status &) override
Definition: client_callback.h:408
grpc::CoreCodegenInterface::grpc_call_unref
virtual void grpc_call_unref(grpc_call *call)=0
grpc::ClientReadReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:370
grpc::internal::CallbackUnaryCall
void CallbackUnaryCall(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Perform a callback-based unary call.
Definition: client_callback.h:47
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:79
grpc::ClientCallbackWriter::RemoveHold
virtual void RemoveHold()=0
grpc::ClientBidiReactor::OnWritesDoneDone
virtual void OnWritesDoneDone(bool)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback.h:345
callback_common.h
grpc::ClientCallbackReader::~ClientCallbackReader
virtual ~ClientCallbackReader()
Definition: client_callback.h:166
grpc::internal::CallbackWithSuccessTag::Set
void Set(grpc_call *call, std::function< void(bool)> f, CompletionQueueTag *ops, bool can_inline)
Definition: callback_common.h:165
grpc::internal::ClientCallbackReaderImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:789
grpc::ClientCallbackReader::BindReactor
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:173
grpc::internal::MutexLock
Definition: sync.h:69
grpc::ClientCallbackWriter
Definition: client_callback.h:179
grpc::ClientCallbackWriter::AddHold
virtual void AddHold(int holds)=0
grpc::ClientBidiReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:304
grpc::ClientCallbackReaderWriter::~ClientCallbackReaderWriter
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:149
grpc::ClientCallbackReaderWriter::RemoveHold
virtual void RemoveHold()=0
channel_interface.h
grpc::ClientCallbackWriter::Write
void Write(const Request *req)
Definition: client_callback.h:183
grpc::ClientCallbackReader::StartCall
virtual void StartCall()=0
grpc::ClientWriteReactor::AddHold
void AddHold()
Definition: client_callback.h:401
grpc::internal::Call::PerformOps
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:65
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:69
grpc::internal::ClientCallbackWriterImpl::Write
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback.h:920
grpc::ClientUnaryReactor::StartCall
void StartCall()
Definition: client_callback.h:435
call.h
grpc::ClientCallbackReaderWriter::StartCall
virtual void StartCall()=0
call_op_set.h
grpc::internal::CallOpClientRecvStatus
Definition: call_op_set.h:769
grpc::WriteOptions::set_buffer_hint
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:117
grpc::ClientBidiReactor::StartWrite
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback.h:253
grpc::CompletionQueue
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:102
grpc::ClientBidiReactor
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback.h:136
grpc::internal::ClientCallbackReaderFactory
Definition: channel_interface.h:47
grpc::internal::ClientCallbackReaderImpl
Definition: client_callback.h:716
grpc::ClientCallbackWriter::StartCall
virtual void StartCall()=0
grpc::internal::ClientCallbackWriterImpl::WritesDone
void WritesDone() override
Definition: client_callback.h:945
grpc::internal::ClientCallbackUnaryImpl::StartCall
void StartCall() override
Definition: client_callback.h:1114
grpc::ClientWriteReactor::StartWrite
void StartWrite(const Request *req)
Definition: client_callback.h:390
grpc::ClientBidiReactor::StartCall
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call.
Definition: client_callback.h:228
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:96
GPR_CODEGEN_ASSERT
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
grpc::ClientWriteReactor::OnWriteDone
virtual void OnWriteDone(bool)
Definition: client_callback.h:410
grpc::WriteOptions::is_last_message
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:181
grpc::ClientBidiReactor::StartWriteLast
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback.h:266
grpc::internal::CallOpRecvMessage
Definition: byte_buffer.h:52
grpc::internal::Mutex
Definition: sync.h:47
grpc::ClientUnaryReactor::OnDone
void OnDone(const ::grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback.h:436
grpc::ClientCallbackReader::AddHold
virtual void AddHold(int holds)=0
grpc::internal::ClientCallbackWriterFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, Response *response, ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:1086
grpc::ClientWriteReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:402
grpc::ClientWriteReactor::~ClientWriteReactor
virtual ~ClientWriteReactor()
Definition: client_callback.h:387
grpc::internal::ClientReactor::InternalScheduleOnDone
virtual void InternalScheduleOnDone(::grpc::Status s)
InternalScheduleOnDone is not part of the API and is not meant to be overridden.
grpc::internal::ClientCallbackReaderWriterImpl::StartCall
void StartCall() override
Definition: client_callback.h:476
grpc::ClientCallbackReader
Definition: client_callback.h:164
grpc::ClientCallbackUnary::~ClientCallbackUnary
virtual ~ClientCallbackUnary()
Definition: client_callback.h:201
grpc::internal::CallOpSet::set_core_cq_tag
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:941
grpc::internal::ClientCallbackReaderWriterImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:574
grpc::ClientWriteReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:406
grpc::ClientReadReactor
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback.h:138
grpc::internal::ClientCallbackReaderWriterImpl::WritesDone
void WritesDone() override
Definition: client_callback.h:548
grpc::ClientBidiReactor::~ClientBidiReactor
virtual ~ClientBidiReactor()
Definition: client_callback.h:222
grpc::ClientReadReactor::AddHold
void AddHold()
Definition: client_callback.h:365
grpc::ClientCallbackWriter::WriteLast
void WriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:185
GPR_CODEGEN_DEBUG_ASSERT
#define GPR_CODEGEN_DEBUG_ASSERT(x)
Codegen specific version of GPR_DEBUG_ASSERT.
Definition: core_codegen_interface.h:155
grpc::experimental::ClientWriteReactor
::grpc::ClientWriteReactor< Request > ClientWriteReactor
Definition: client_callback.h:1232
grpc::ClientReadReactor::OnDone
void OnDone(const ::grpc::Status &) override
Definition: client_callback.h:372
grpc::ClientWriteReactor::OnWritesDoneDone
virtual void OnWritesDoneDone(bool)
Definition: client_callback.h:411