GRPC C++  1.33.1
client_callback.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
20 #include <atomic>
21 #include <functional>
22 
30 
31 namespace grpc {
32 class Channel;
33 class ClientContext;
34 
35 namespace internal {
36 class RpcMethod;
37 
40 template <class InputMessage, class OutputMessage>
42  const ::grpc::internal::RpcMethod& method,
43  ::grpc::ClientContext* context,
44  const InputMessage* request, OutputMessage* result,
45  std::function<void(::grpc::Status)> on_completion) {
47  channel, method, context, request, result, on_completion);
48 }
49 
50 template <class InputMessage, class OutputMessage>
51 class CallbackUnaryCallImpl {
52  public:
54  const ::grpc::internal::RpcMethod& method,
55  ::grpc::ClientContext* context,
56  const InputMessage* request, OutputMessage* result,
57  std::function<void(::grpc::Status)> on_completion) {
58  ::grpc::CompletionQueue* cq = channel->CallbackCQ();
59  GPR_CODEGEN_ASSERT(cq != nullptr);
60  grpc::internal::Call call(channel->CreateCall(method, context, cq));
61 
62  using FullCallOpSet = grpc::internal::CallOpSet<
69 
70  struct OpSetAndTag {
71  FullCallOpSet opset;
73  };
74  const size_t alloc_sz = sizeof(OpSetAndTag);
75  auto* const alloced = static_cast<OpSetAndTag*>(
77  alloc_sz));
78  auto* ops = new (&alloced->opset) FullCallOpSet;
79  auto* tag = new (&alloced->tag)
80  grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
81 
82  // TODO(vjpai): Unify code with sync API as much as possible
83  ::grpc::Status s = ops->SendMessagePtr(request);
84  if (!s.ok()) {
85  tag->force_run(s);
86  return;
87  }
88  ops->SendInitialMetadata(&context->send_initial_metadata_,
89  context->initial_metadata_flags());
90  ops->RecvInitialMetadata(context);
91  ops->RecvMessage(result);
92  ops->AllowNoMessage();
93  ops->ClientSendClose();
94  ops->ClientRecvStatus(context, tag->status_ptr());
95  ops->set_core_cq_tag(tag);
96  call.PerformOps(ops);
97  }
98 };
99 
100 // Base class for public API classes.
102  public:
110  virtual void OnDone(const ::grpc::Status& /*s*/) = 0;
111 
119  virtual void InternalScheduleOnDone(::grpc::Status s);
120 };
121 
122 } // namespace internal
123 
124 // Forward declarations
125 template <class Request, class Response>
127 template <class Response>
129 template <class Request>
131 class ClientUnaryReactor;
132 
133 // NOTE: The streaming objects are not actually implemented in the public API.
134 // These interfaces are provided for mocking only. Typical applications
135 // will interact exclusively with the reactors that they define.
136 template <class Request, class Response>
138  public:
140  virtual void StartCall() = 0;
141  virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
142  virtual void WritesDone() = 0;
143  virtual void Read(Response* resp) = 0;
144  virtual void AddHold(int holds) = 0;
145  virtual void RemoveHold() = 0;
146 
147  protected:
149  reactor->BindStream(this);
150  }
151 };
152 
153 template <class Response>
155  public:
157  virtual void StartCall() = 0;
158  virtual void Read(Response* resp) = 0;
159  virtual void AddHold(int holds) = 0;
160  virtual void RemoveHold() = 0;
161 
162  protected:
164  reactor->BindReader(this);
165  }
166 };
167 
168 template <class Request>
170  public:
172  virtual void StartCall() = 0;
173  void Write(const Request* req) { Write(req, ::grpc::WriteOptions()); }
174  virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
175  void WriteLast(const Request* req, ::grpc::WriteOptions options) {
176  Write(req, options.set_last_message());
177  }
178  virtual void WritesDone() = 0;
179 
180  virtual void AddHold(int holds) = 0;
181  virtual void RemoveHold() = 0;
182 
183  protected:
185  reactor->BindWriter(this);
186  }
187 };
188 
190  public:
191  virtual ~ClientCallbackUnary() {}
192  virtual void StartCall() = 0;
193 
194  protected:
195  void BindReactor(ClientUnaryReactor* reactor);
196 };
197 
198 // The following classes are the reactor interfaces that are to be implemented
199 // by the user. They are passed in to the library as an argument to a call on a
200 // stub (either a codegen-ed call or a generic call). The streaming RPC is
201 // activated by calling StartCall, possibly after initiating StartRead,
202 // StartWrite, or AddHold operations on the streaming object. Note that none of
203 // the classes are pure; all reactions have a default empty reaction so that the
204 // user class only needs to override those classes that it cares about.
205 // The reactor must be passed to the stub invocation before any of the below
206 // operations can be called.
207 
209 template <class Request, class Response>
210 class ClientBidiReactor : public internal::ClientReactor {
211  public:
212  virtual ~ClientBidiReactor() {}
213 
218  void StartCall() { stream_->StartCall(); }
219 
225  void StartRead(Response* resp) { stream_->Read(resp); }
226 
233  void StartWrite(const Request* req) {
234  StartWrite(req, ::grpc::WriteOptions());
235  }
236 
243  void StartWrite(const Request* req, ::grpc::WriteOptions options) {
244  stream_->Write(req, std::move(options));
245  }
246 
256  void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
257  StartWrite(req, std::move(options.set_last_message()));
258  }
259 
265  void StartWritesDone() { stream_->WritesDone(); }
266 
289  void AddHold() { AddMultipleHolds(1); }
290  void AddMultipleHolds(int holds) {
291  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
292  stream_->AddHold(holds);
293  }
294  void RemoveHold() { stream_->RemoveHold(); }
295 
303  void OnDone(const ::grpc::Status& /*s*/) override {}
304 
313  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
314 
319  virtual void OnReadDone(bool /*ok*/) {}
320 
326  virtual void OnWriteDone(bool /*ok*/) {}
327 
335  virtual void OnWritesDoneDone(bool /*ok*/) {}
336 
337  private:
338  friend class ClientCallbackReaderWriter<Request, Response>;
339  void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
340  stream_ = stream;
341  }
343 };
344 
347 template <class Response>
348 class ClientReadReactor : public internal::ClientReactor {
349  public:
350  virtual ~ClientReadReactor() {}
351 
352  void StartCall() { reader_->StartCall(); }
353  void StartRead(Response* resp) { reader_->Read(resp); }
354 
355  void AddHold() { AddMultipleHolds(1); }
356  void AddMultipleHolds(int holds) {
357  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
358  reader_->AddHold(holds);
359  }
360  void RemoveHold() { reader_->RemoveHold(); }
361 
362  void OnDone(const ::grpc::Status& /*s*/) override {}
363  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
364  virtual void OnReadDone(bool /*ok*/) {}
365 
366  private:
367  friend class ClientCallbackReader<Response>;
368  void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
370 };
371 
374 template <class Request>
375 class ClientWriteReactor : public internal::ClientReactor {
376  public:
377  virtual ~ClientWriteReactor() {}
378 
379  void StartCall() { writer_->StartCall(); }
380  void StartWrite(const Request* req) {
381  StartWrite(req, ::grpc::WriteOptions());
382  }
383  void StartWrite(const Request* req, ::grpc::WriteOptions options) {
384  writer_->Write(req, std::move(options));
385  }
386  void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
387  StartWrite(req, std::move(options.set_last_message()));
388  }
389  void StartWritesDone() { writer_->WritesDone(); }
390 
391  void AddHold() { AddMultipleHolds(1); }
392  void AddMultipleHolds(int holds) {
393  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
394  writer_->AddHold(holds);
395  }
396  void RemoveHold() { writer_->RemoveHold(); }
397 
398  void OnDone(const ::grpc::Status& /*s*/) override {}
399  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
400  virtual void OnWriteDone(bool /*ok*/) {}
401  virtual void OnWritesDoneDone(bool /*ok*/) {}
402 
403  private:
404  friend class ClientCallbackWriter<Request>;
405  void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
406 
408 };
409 
422  public:
423  virtual ~ClientUnaryReactor() {}
424 
425  void StartCall() { call_->StartCall(); }
426  void OnDone(const ::grpc::Status& /*s*/) override {}
427  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
428 
429  private:
430  friend class ClientCallbackUnary;
431  void BindCall(ClientCallbackUnary* call) { call_ = call; }
432  ClientCallbackUnary* call_;
433 };
434 
435 // Define function out-of-line from class to avoid forward declaration issue
437  reactor->BindCall(this);
438 }
439 
440 namespace internal {
441 
442 // Forward declare factory classes for friendship
443 template <class Request, class Response>
444 class ClientCallbackReaderWriterFactory;
445 template <class Response>
446 class ClientCallbackReaderFactory;
447 template <class Request>
448 class ClientCallbackWriterFactory;
449 
450 template <class Request, class Response>
452  : public ClientCallbackReaderWriter<Request, Response> {
453  public:
454  // always allocated against a call arena, no memory free required
455  static void operator delete(void* /*ptr*/, std::size_t size) {
457  }
458 
459  // This operator should never be called as the memory should be freed as part
460  // of the arena destruction. It only exists to provide a matching operator
461  // delete to the operator new so that some compilers will not complain (see
462  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
463  // there are no tests catching the compiler warning.
464  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
465 
466  void StartCall() override {
467  // This call initiates two batches, plus any backlog, each with a callback
468  // 1. Send initial metadata (unless corked) + recv initial metadata
469  // 2. Any read backlog
470  // 3. Any write backlog
471  // 4. Recv trailing metadata (unless corked)
472  if (!start_corked_) {
473  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
474  context_->initial_metadata_flags());
475  }
476 
477  call_.PerformOps(&start_ops_);
478 
479  {
480  grpc::internal::MutexLock lock(&start_mu_);
481 
482  if (backlog_.read_ops) {
483  call_.PerformOps(&read_ops_);
484  }
485  if (backlog_.write_ops) {
486  call_.PerformOps(&write_ops_);
487  }
488  if (backlog_.writes_done_ops) {
489  call_.PerformOps(&writes_done_ops_);
490  }
491  call_.PerformOps(&finish_ops_);
492  // The last thing in this critical section is to set started_ so that it
493  // can be used lock-free as well.
494  started_.store(true, std::memory_order_release);
495  }
496  // MaybeFinish outside the lock to make sure that destruction of this object
497  // doesn't take place while holding the lock (which would cause the lock to
498  // be released after destruction)
499  this->MaybeFinish(/*from_reaction=*/false);
500  }
501 
502  void Read(Response* msg) override {
503  read_ops_.RecvMessage(msg);
504  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
505  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
506  grpc::internal::MutexLock lock(&start_mu_);
507  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
508  backlog_.read_ops = true;
509  return;
510  }
511  }
512  call_.PerformOps(&read_ops_);
513  }
514 
515  void Write(const Request* msg, ::grpc::WriteOptions options) override {
516  if (options.is_last_message()) {
517  options.set_buffer_hint();
518  write_ops_.ClientSendClose();
519  }
520  // TODO(vjpai): don't assert
521  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
522  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
523  if (GPR_UNLIKELY(corked_write_needed_)) {
524  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
525  context_->initial_metadata_flags());
526  corked_write_needed_ = false;
527  }
528 
529  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
530  grpc::internal::MutexLock lock(&start_mu_);
531  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
532  backlog_.write_ops = true;
533  return;
534  }
535  }
536  call_.PerformOps(&write_ops_);
537  }
538  void WritesDone() override {
539  writes_done_ops_.ClientSendClose();
540  writes_done_tag_.Set(call_.call(),
541  [this](bool ok) {
542  reactor_->OnWritesDoneDone(ok);
543  MaybeFinish(/*from_reaction=*/true);
544  },
545  &writes_done_ops_, /*can_inline=*/false);
546  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
547  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
548  if (GPR_UNLIKELY(corked_write_needed_)) {
549  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
550  context_->initial_metadata_flags());
551  corked_write_needed_ = false;
552  }
553  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
554  grpc::internal::MutexLock lock(&start_mu_);
555  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
556  backlog_.writes_done_ops = true;
557  return;
558  }
559  }
560  call_.PerformOps(&writes_done_ops_);
561  }
562 
563  void AddHold(int holds) override {
564  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
565  }
566  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
567 
568  private:
569  friend class ClientCallbackReaderWriterFactory<Request, Response>;
570 
572  ::grpc::ClientContext* context,
574  : context_(context),
575  call_(call),
576  reactor_(reactor),
577  start_corked_(context_->initial_metadata_corked_),
578  corked_write_needed_(start_corked_) {
579  this->BindReactor(reactor);
580 
581  // Set up the unchanging parts of the start, read, and write tags and ops.
582  start_tag_.Set(call_.call(),
583  [this](bool ok) {
584  reactor_->OnReadInitialMetadataDone(ok);
585  MaybeFinish(/*from_reaction=*/true);
586  },
587  &start_ops_, /*can_inline=*/false);
588  start_ops_.RecvInitialMetadata(context_);
589  start_ops_.set_core_cq_tag(&start_tag_);
590 
591  write_tag_.Set(call_.call(),
592  [this](bool ok) {
593  reactor_->OnWriteDone(ok);
594  MaybeFinish(/*from_reaction=*/true);
595  },
596  &write_ops_, /*can_inline=*/false);
597  write_ops_.set_core_cq_tag(&write_tag_);
598 
599  read_tag_.Set(call_.call(),
600  [this](bool ok) {
601  reactor_->OnReadDone(ok);
602  MaybeFinish(/*from_reaction=*/true);
603  },
604  &read_ops_, /*can_inline=*/false);
605  read_ops_.set_core_cq_tag(&read_tag_);
606 
607  // Also set up the Finish tag and op set.
608  finish_tag_.Set(
609  call_.call(),
610  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
611  &finish_ops_,
612  /*can_inline=*/false);
613  finish_ops_.ClientRecvStatus(context_, &finish_status_);
614  finish_ops_.set_core_cq_tag(&finish_tag_);
615  }
616 
617  // MaybeFinish can be called from reactions or from user-initiated operations
618  // like StartCall or RemoveHold. If this is the last operation or hold on this
619  // object, it will invoke the OnDone reaction. If MaybeFinish was called from
620  // a reaction, it can call OnDone directly. If not, it would need to schedule
621  // OnDone onto an executor thread to avoid the possibility of deadlocking with
622  // any locks in the user code that invoked it.
623  void MaybeFinish(bool from_reaction) {
624  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
625  1, std::memory_order_acq_rel) == 1)) {
626  ::grpc::Status s = std::move(finish_status_);
627  auto* reactor = reactor_;
628  auto* call = call_.call();
629  this->~ClientCallbackReaderWriterImpl();
631  if (GPR_LIKELY(from_reaction)) {
632  reactor->OnDone(s);
633  } else {
634  reactor->InternalScheduleOnDone(std::move(s));
635  }
636  }
637  }
638 
639  ::grpc::ClientContext* const context_;
640  grpc::internal::Call call_;
641  ClientBidiReactor<Request, Response>* const reactor_;
642 
645  start_ops_;
647  const bool start_corked_;
648  bool corked_write_needed_; // no lock needed since only accessed in
649  // Write/WritesDone which cannot be concurrent
650 
653  ::grpc::Status finish_status_;
654 
658  write_ops_;
660 
663  writes_done_ops_;
665 
667  read_ops_;
669 
670  struct StartCallBacklog {
671  bool write_ops = false;
672  bool writes_done_ops = false;
673  bool read_ops = false;
674  };
675  StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
676 
677  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
678  std::atomic<intptr_t> callbacks_outstanding_{3};
679  std::atomic_bool started_{false};
680  grpc::internal::Mutex start_mu_;
681 };
682 
683 template <class Request, class Response>
684 class ClientCallbackReaderWriterFactory {
685  public:
686  static void Create(::grpc::ChannelInterface* channel,
687  const ::grpc::internal::RpcMethod& method,
688  ::grpc::ClientContext* context,
690  grpc::internal::Call call =
691  channel->CreateCall(method, context, channel->CallbackCQ());
692 
697  reactor);
698  }
699 };
700 
701 template <class Response>
703  public:
704  // always allocated against a call arena, no memory free required
705  static void operator delete(void* /*ptr*/, std::size_t size) {
707  }
708 
709  // This operator should never be called as the memory should be freed as part
710  // of the arena destruction. It only exists to provide a matching operator
711  // delete to the operator new so that some compilers will not complain (see
712  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
713  // there are no tests catching the compiler warning.
714  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
715 
716  void StartCall() override {
717  // This call initiates two batches, plus any backlog, each with a callback
718  // 1. Send initial metadata (unless corked) + recv initial metadata
719  // 2. Any backlog
720  // 3. Recv trailing metadata
721 
722  start_tag_.Set(call_.call(),
723  [this](bool ok) {
724  reactor_->OnReadInitialMetadataDone(ok);
725  MaybeFinish(/*from_reaction=*/true);
726  },
727  &start_ops_, /*can_inline=*/false);
728  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
729  context_->initial_metadata_flags());
730  start_ops_.RecvInitialMetadata(context_);
731  start_ops_.set_core_cq_tag(&start_tag_);
732  call_.PerformOps(&start_ops_);
733 
734  // Also set up the read tag so it doesn't have to be set up each time
735  read_tag_.Set(call_.call(),
736  [this](bool ok) {
737  reactor_->OnReadDone(ok);
738  MaybeFinish(/*from_reaction=*/true);
739  },
740  &read_ops_, /*can_inline=*/false);
741  read_ops_.set_core_cq_tag(&read_tag_);
742 
743  {
744  grpc::internal::MutexLock lock(&start_mu_);
745  if (backlog_.read_ops) {
746  call_.PerformOps(&read_ops_);
747  }
748  started_.store(true, std::memory_order_release);
749  }
750 
751  finish_tag_.Set(
752  call_.call(),
753  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
754  &finish_ops_, /*can_inline=*/false);
755  finish_ops_.ClientRecvStatus(context_, &finish_status_);
756  finish_ops_.set_core_cq_tag(&finish_tag_);
757  call_.PerformOps(&finish_ops_);
758  }
759 
760  void Read(Response* msg) override {
761  read_ops_.RecvMessage(msg);
762  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
763  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
764  grpc::internal::MutexLock lock(&start_mu_);
765  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
766  backlog_.read_ops = true;
767  return;
768  }
769  }
770  call_.PerformOps(&read_ops_);
771  }
772 
773  void AddHold(int holds) override {
774  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
775  }
776  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
777 
778  private:
779  friend class ClientCallbackReaderFactory<Response>;
780 
781  template <class Request>
783  ::grpc::ClientContext* context, Request* request,
785  : context_(context), call_(call), reactor_(reactor) {
786  this->BindReactor(reactor);
787  // TODO(vjpai): don't assert
788  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
789  start_ops_.ClientSendClose();
790  }
791 
792  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
793  void MaybeFinish(bool from_reaction) {
794  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
795  1, std::memory_order_acq_rel) == 1)) {
796  ::grpc::Status s = std::move(finish_status_);
797  auto* reactor = reactor_;
798  auto* call = call_.call();
799  this->~ClientCallbackReaderImpl();
801  if (GPR_LIKELY(from_reaction)) {
802  reactor->OnDone(s);
803  } else {
804  reactor->InternalScheduleOnDone(std::move(s));
805  }
806  }
807  }
808 
809  ::grpc::ClientContext* const context_;
810  grpc::internal::Call call_;
811  ClientReadReactor<Response>* const reactor_;
812 
817  start_ops_;
819 
822  ::grpc::Status finish_status_;
823 
825  read_ops_;
827 
828  struct StartCallBacklog {
829  bool read_ops = false;
830  };
831  StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
832 
833  // Minimum of 2 callbacks to pre-register for start and finish
834  std::atomic<intptr_t> callbacks_outstanding_{2};
835  std::atomic_bool started_{false};
836  grpc::internal::Mutex start_mu_;
837 };
838 
839 template <class Response>
840 class ClientCallbackReaderFactory {
841  public:
842  template <class Request>
843  static void Create(::grpc::ChannelInterface* channel,
844  const ::grpc::internal::RpcMethod& method,
845  ::grpc::ClientContext* context, const Request* request,
846  ClientReadReactor<Response>* reactor) {
847  grpc::internal::Call call =
848  channel->CreateCall(method, context, channel->CallbackCQ());
849 
852  call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
853  ClientCallbackReaderImpl<Response>(call, context, request, reactor);
854  }
855 };
856 
857 template <class Request>
859  public:
860  // always allocated against a call arena, no memory free required
861  static void operator delete(void* /*ptr*/, std::size_t size) {
863  }
864 
865  // This operator should never be called as the memory should be freed as part
866  // of the arena destruction. It only exists to provide a matching operator
867  // delete to the operator new so that some compilers will not complain (see
868  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
869  // there are no tests catching the compiler warning.
870  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
871 
872  void StartCall() override {
873  // This call initiates two batches, plus any backlog, each with a callback
874  // 1. Send initial metadata (unless corked) + recv initial metadata
875  // 2. Any backlog
876  // 3. Recv trailing metadata
877 
878  if (!start_corked_) {
879  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
880  context_->initial_metadata_flags());
881  }
882  call_.PerformOps(&start_ops_);
883 
884  {
885  grpc::internal::MutexLock lock(&start_mu_);
886 
887  if (backlog_.write_ops) {
888  call_.PerformOps(&write_ops_);
889  }
890  if (backlog_.writes_done_ops) {
891  call_.PerformOps(&writes_done_ops_);
892  }
893  call_.PerformOps(&finish_ops_);
894  // The last thing in this critical section is to set started_ so that it
895  // can be used lock-free as well.
896  started_.store(true, std::memory_order_release);
897  }
898  // MaybeFinish outside the lock to make sure that destruction of this object
899  // doesn't take place while holding the lock (which would cause the lock to
900  // be released after destruction)
901  this->MaybeFinish(/*from_reaction=*/false);
902  }
903 
904  void Write(const Request* msg, ::grpc::WriteOptions options) override {
905  if (GPR_UNLIKELY(options.is_last_message())) {
906  options.set_buffer_hint();
907  write_ops_.ClientSendClose();
908  }
909  // TODO(vjpai): don't assert
910  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
911  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
912 
913  if (GPR_UNLIKELY(corked_write_needed_)) {
914  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
915  context_->initial_metadata_flags());
916  corked_write_needed_ = false;
917  }
918 
919  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
920  grpc::internal::MutexLock lock(&start_mu_);
921  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
922  backlog_.write_ops = true;
923  return;
924  }
925  }
926  call_.PerformOps(&write_ops_);
927  }
928 
929  void WritesDone() override {
930  writes_done_ops_.ClientSendClose();
931  writes_done_tag_.Set(call_.call(),
932  [this](bool ok) {
933  reactor_->OnWritesDoneDone(ok);
934  MaybeFinish(/*from_reaction=*/true);
935  },
936  &writes_done_ops_, /*can_inline=*/false);
937  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
938  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
939 
940  if (GPR_UNLIKELY(corked_write_needed_)) {
941  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
942  context_->initial_metadata_flags());
943  corked_write_needed_ = false;
944  }
945 
946  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
947  grpc::internal::MutexLock lock(&start_mu_);
948  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
949  backlog_.writes_done_ops = true;
950  return;
951  }
952  }
953  call_.PerformOps(&writes_done_ops_);
954  }
955 
956  void AddHold(int holds) override {
957  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
958  }
959  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
960 
961  private:
962  friend class ClientCallbackWriterFactory<Request>;
963 
964  template <class Response>
966  ::grpc::ClientContext* context, Response* response,
968  : context_(context),
969  call_(call),
970  reactor_(reactor),
971  start_corked_(context_->initial_metadata_corked_),
972  corked_write_needed_(start_corked_) {
973  this->BindReactor(reactor);
974 
975  // Set up the unchanging parts of the start and write tags and ops.
976  start_tag_.Set(call_.call(),
977  [this](bool ok) {
978  reactor_->OnReadInitialMetadataDone(ok);
979  MaybeFinish(/*from_reaction=*/true);
980  },
981  &start_ops_, /*can_inline=*/false);
982  start_ops_.RecvInitialMetadata(context_);
983  start_ops_.set_core_cq_tag(&start_tag_);
984 
985  write_tag_.Set(call_.call(),
986  [this](bool ok) {
987  reactor_->OnWriteDone(ok);
988  MaybeFinish(/*from_reaction=*/true);
989  },
990  &write_ops_, /*can_inline=*/false);
991  write_ops_.set_core_cq_tag(&write_tag_);
992 
993  // Also set up the Finish tag and op set.
994  finish_ops_.RecvMessage(response);
995  finish_ops_.AllowNoMessage();
996  finish_tag_.Set(
997  call_.call(),
998  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
999  &finish_ops_,
1000  /*can_inline=*/false);
1001  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1002  finish_ops_.set_core_cq_tag(&finish_tag_);
1003  }
1004 
1005  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
1006  void MaybeFinish(bool from_reaction) {
1007  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1008  1, std::memory_order_acq_rel) == 1)) {
1009  ::grpc::Status s = std::move(finish_status_);
1010  auto* reactor = reactor_;
1011  auto* call = call_.call();
1012  this->~ClientCallbackWriterImpl();
1014  if (GPR_LIKELY(from_reaction)) {
1015  reactor->OnDone(s);
1016  } else {
1017  reactor->InternalScheduleOnDone(std::move(s));
1018  }
1019  }
1020  }
1021 
1022  ::grpc::ClientContext* const context_;
1023  grpc::internal::Call call_;
1024  ClientWriteReactor<Request>* const reactor_;
1025 
1028  start_ops_;
1030  const bool start_corked_;
1031  bool corked_write_needed_; // no lock needed since only accessed in
1032  // Write/WritesDone which cannot be concurrent
1033 
1036  finish_ops_;
1038  ::grpc::Status finish_status_;
1039 
1043  write_ops_;
1045 
1048  writes_done_ops_;
1049  grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1050 
1051  struct StartCallBacklog {
1052  bool write_ops = false;
1053  bool writes_done_ops = false;
1054  };
1055  StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
1056 
1057  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1058  std::atomic<intptr_t> callbacks_outstanding_{3};
1059  std::atomic_bool started_{false};
1060  grpc::internal::Mutex start_mu_;
1061 };
1062 
1063 template <class Request>
1064 class ClientCallbackWriterFactory {
1065  public:
1066  template <class Response>
1067  static void Create(::grpc::ChannelInterface* channel,
1068  const ::grpc::internal::RpcMethod& method,
1069  ::grpc::ClientContext* context, Response* response,
1070  ClientWriteReactor<Request>* reactor) {
1071  grpc::internal::Call call =
1072  channel->CreateCall(method, context, channel->CallbackCQ());
1073 
1076  call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
1077  ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1078  }
1079 };
1080 
1082  public:
1083  // always allocated against a call arena, no memory free required
1084  static void operator delete(void* /*ptr*/, std::size_t size) {
1086  }
1087 
1088  // This operator should never be called as the memory should be freed as part
1089  // of the arena destruction. It only exists to provide a matching operator
1090  // delete to the operator new so that some compilers will not complain (see
1091  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1092  // there are no tests catching the compiler warning.
1093  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
1094 
1095  void StartCall() override {
1096  // This call initiates two batches, each with a callback
1097  // 1. Send initial metadata + write + writes done + recv initial metadata
1098  // 2. Read message, recv trailing metadata
1099 
1100  start_tag_.Set(call_.call(),
1101  [this](bool ok) {
1102  reactor_->OnReadInitialMetadataDone(ok);
1103  MaybeFinish();
1104  },
1105  &start_ops_, /*can_inline=*/false);
1106  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1107  context_->initial_metadata_flags());
1108  start_ops_.RecvInitialMetadata(context_);
1109  start_ops_.set_core_cq_tag(&start_tag_);
1110  call_.PerformOps(&start_ops_);
1111 
1112  finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
1113  &finish_ops_,
1114  /*can_inline=*/false);
1115  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1116  finish_ops_.set_core_cq_tag(&finish_tag_);
1117  call_.PerformOps(&finish_ops_);
1118  }
1119 
1120  private:
1122 
1123  template <class Request, class Response>
1125  ::grpc::ClientContext* context, Request* request,
1126  Response* response, ClientUnaryReactor* reactor)
1127  : context_(context), call_(call), reactor_(reactor) {
1128  this->BindReactor(reactor);
1129  // TODO(vjpai): don't assert
1130  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
1131  start_ops_.ClientSendClose();
1132  finish_ops_.RecvMessage(response);
1133  finish_ops_.AllowNoMessage();
1134  }
1135 
1136  // In the unary case, MaybeFinish is only ever invoked from a
1137  // library-initiated reaction, so it will just directly call OnDone if this is
1138  // the last reaction for this RPC.
1139  void MaybeFinish() {
1140  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1141  1, std::memory_order_acq_rel) == 1)) {
1142  ::grpc::Status s = std::move(finish_status_);
1143  auto* reactor = reactor_;
1144  auto* call = call_.call();
1145  this->~ClientCallbackUnaryImpl();
1147  reactor->OnDone(s);
1148  }
1149  }
1150 
1151  ::grpc::ClientContext* const context_;
1152  grpc::internal::Call call_;
1153  ClientUnaryReactor* const reactor_;
1154 
1159  start_ops_;
1161 
1164  finish_ops_;
1166  ::grpc::Status finish_status_;
1167 
1168  // This call will have 2 callbacks: start and finish
1169  std::atomic<intptr_t> callbacks_outstanding_{2};
1170 };
1171 
1173  public:
1174  template <class Request, class Response>
1175  static void Create(::grpc::ChannelInterface* channel,
1176  const ::grpc::internal::RpcMethod& method,
1177  ::grpc::ClientContext* context, const Request* request,
1178  Response* response, ClientUnaryReactor* reactor) {
1179  grpc::internal::Call call =
1180  channel->CreateCall(method, context, channel->CallbackCQ());
1181 
1183 
1185  call.call(), sizeof(ClientCallbackUnaryImpl)))
1186  ClientCallbackUnaryImpl(call, context, request, response, reactor);
1187  }
1188 };
1189 
1190 } // namespace internal
1191 
1192 // TODO(vjpai): Remove namespace experimental when de-experimentalized fully.
1193 namespace experimental {
1194 
1195 template <class Response>
1197 
1198 template <class Request>
1200 
1201 template <class Request, class Response>
1204 
1205 template <class Response>
1207 
1208 template <class Request>
1210 
1211 template <class Request, class Response>
1213 
1215 
1216 } // namespace experimental
1217 
1218 } // namespace grpc
1219 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
grpc::internal::ClientCallbackUnaryImpl
Definition: client_callback.h:1081
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:136
grpc::ClientReadReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:363
grpc::ClientCallbackReaderWriter::Write
virtual void Write(const Request *req, ::grpc::WriteOptions options)=0
grpc::ClientWriteReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:399
grpc::ClientWriteReactor::StartWritesDone
void StartWritesDone()
Definition: client_callback.h:389
grpc::internal::CallOpRecvInitialMetadata
Definition: call_op_set.h:720
grpc::ClientCallbackWriter::~ClientCallbackWriter
virtual ~ClientCallbackWriter()
Definition: client_callback.h:171
grpc::internal::ClientCallbackReaderWriterImpl::Read
void Read(Response *msg) override
Definition: client_callback.h:502
grpc::internal::ClientCallbackWriterImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:956
grpc::experimental::ClientBidiReactor
::grpc::ClientBidiReactor< Request, Response > ClientBidiReactor
Definition: client_callback.h:1212
grpc::internal::CallOpClientSendClose
Definition: call_op_set.h:618
grpc::ClientBidiReactor::StartWrite
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:233
grpc::internal::CallOpGenericRecvMessage
Definition: call_op_set.h:525
grpc::ClientBidiReactor::AddHold
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback.h:289
grpc::internal::ClientCallbackReaderWriterImpl
Definition: client_callback.h:451
grpc::internal::ClientReactor::OnDone
virtual void OnDone(const ::grpc::Status &)=0
Called by the library when all operations associated with this RPC have completed and all Holds have ...
grpc::ClientReadReactor::OnReadDone
virtual void OnReadDone(bool)
Definition: client_callback.h:364
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:850
status.h
grpc::internal::ClientCallbackReaderImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:776
grpc::CoreCodegenInterface::grpc_call_ref
virtual void grpc_call_ref(grpc_call *call)=0
grpc::ClientWriteReactor::StartWriteLast
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:386
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:287
grpc::WriteOptions::set_last_message
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:161
grpc::ClientUnaryReactor
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback.h:421
grpc::internal::CallbackUnaryCall
void CallbackUnaryCall(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Perform a callback-based unary call TODO(vjpai): Combine as much as possible with the blocking unary ...
Definition: client_callback.h:41
grpc::ClientCallbackReaderWriter::Read
virtual void Read(Response *resp)=0
grpc::CoreCodegenInterface::grpc_call_arena_alloc
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
GPR_LIKELY
#define GPR_LIKELY(x)
Definition: port_platform.h:685
config.h
grpc::ClientWriteReactor::StartCall
void StartCall()
Definition: client_callback.h:379
grpc::internal::ClientCallbackReaderWriterImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:566
grpc::ClientUnaryReactor::~ClientUnaryReactor
virtual ~ClientUnaryReactor()
Definition: client_callback.h:423
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:35
grpc::ClientCallbackReaderWriter::WritesDone
virtual void WritesDone()=0
grpc::internal::ClientCallbackWriterImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:959
core_codegen_interface.h
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:217
grpc::internal::CallbackUnaryCallImpl::CallbackUnaryCallImpl
CallbackUnaryCallImpl(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Definition: client_callback.h:53
grpc::experimental::ClientUnaryReactor
::grpc::ClientUnaryReactor ClientUnaryReactor
Definition: client_callback.h:1214
grpc::ClientBidiReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Notifies the application that a read of initial metadata from the server is done.
Definition: client_callback.h:313
grpc::internal::CallbackWithStatusTag
Definition: callback_common.h:68
grpc::ClientCallbackReaderWriter::BindReactor
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:148
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:118
grpc::internal::ClientCallbackReaderFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, ClientReadReactor< Response > *reactor)
Definition: client_callback.h:843
grpc::ClientCallbackUnary::BindReactor
void BindReactor(ClientUnaryReactor *reactor)
Definition: client_callback.h:436
grpc::ClientBidiReactor::StartRead
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:225
grpc::ClientReadReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:356
grpc::internal::ClientCallbackWriterImpl::StartCall
void StartCall() override
Definition: client_callback.h:872
grpc::experimental::ClientReadReactor
::grpc::ClientReadReactor< Response > ClientReadReactor
Definition: client_callback.h:1206
grpc::ClientCallbackUnary::StartCall
virtual void StartCall()=0
grpc::ClientCallbackReaderWriter::AddHold
virtual void AddHold(int holds)=0
grpc::ClientBidiReactor::OnReadDone
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: client_callback.h:319
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:31
grpc::ClientReadReactor::~ClientReadReactor
virtual ~ClientReadReactor()
Definition: client_callback.h:350
grpc::ClientCallbackWriter::WritesDone
virtual void WritesDone()=0
grpc::internal::ClientCallbackReaderWriterImpl::Write
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback.h:515
grpc::internal::ClientCallbackWriterFactory
Definition: channel_interface.h:50
grpc::ClientReadReactor::StartCall
void StartCall()
Definition: client_callback.h:352
grpc::ClientBidiReactor::OnWriteDone
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite or StartWriteLast operation completed.
Definition: client_callback.h:326
grpc::ClientContext
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:191
grpc::ClientBidiReactor::OnDone
void OnDone(const ::grpc::Status &) override
Notifies the application that all operations associated with this RPC have completed and all Holds ha...
Definition: client_callback.h:303
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: port_platform.h:686
grpc::internal::ClientCallbackReaderImpl::StartCall
void StartCall() override
Definition: client_callback.h:716
grpc::internal::ClientCallbackReaderWriterFactory
Definition: channel_interface.h:46
grpc::internal::CallbackUnaryCallImpl
Definition: channel_interface.h:36
grpc::ClientUnaryReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:427
grpc::ClientWriteReactor::StartWrite
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:383
grpc::ClientCallbackWriter::BindReactor
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:184
grpc::ClientCallbackReader::RemoveHold
virtual void RemoveHold()=0
grpc::ClientCallbackReaderWriter
Definition: client_callback.h:137
grpc::ClientBidiReactor::StartWritesDone
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback.h:265
grpc::ClientCallbackReader::Read
virtual void Read(Response *resp)=0
grpc::ClientReadReactor::StartRead
void StartRead(Response *resp)
Definition: client_callback.h:353
grpc::ChannelInterface
Codegen interface for grpc::Channel.
Definition: channel_interface.h:72
grpc::internal::ClientReactor
Definition: client_callback.h:101
grpc::internal::ClientCallbackUnaryFactory
Definition: client_callback.h:1172
grpc::internal::ClientCallbackReaderImpl::Read
void Read(Response *msg) override
Definition: client_callback.h:760
grpc::internal::ClientCallbackWriterImpl
Definition: client_callback.h:858
grpc::ClientWriteReactor
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback.h:130
grpc::internal::ClientCallbackReaderWriterFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:686
grpc::protobuf::util::Status
::google::protobuf::util::Status Status
Definition: config_protobuf.h:90
grpc::ClientCallbackUnary
Definition: client_callback.h:189
grpc::ClientBidiReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:290
grpc::ClientWriteReactor::OnDone
void OnDone(const ::grpc::Status &) override
Definition: client_callback.h:398
grpc::CoreCodegenInterface::grpc_call_unref
virtual void grpc_call_unref(grpc_call *call)=0
grpc::ClientReadReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:360
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:79
grpc::ClientCallbackWriter::RemoveHold
virtual void RemoveHold()=0
grpc::ClientBidiReactor::OnWritesDoneDone
virtual void OnWritesDoneDone(bool)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback.h:335
callback_common.h
grpc::ClientCallbackReader::~ClientCallbackReader
virtual ~ClientCallbackReader()
Definition: client_callback.h:156
grpc::internal::CallbackWithSuccessTag::Set
void Set(grpc_call *call, std::function< void(bool)> f, CompletionQueueTag *ops, bool can_inline)
Definition: callback_common.h:164
grpc::internal::ClientCallbackReaderImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:773
grpc::ClientCallbackReader::BindReactor
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:163
grpc::internal::MutexLock
Definition: sync.h:69
grpc::ClientCallbackWriter
Definition: client_callback.h:169
grpc::ClientCallbackWriter::AddHold
virtual void AddHold(int holds)=0
grpc::ClientBidiReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:294
grpc::ClientCallbackReaderWriter::~ClientCallbackReaderWriter
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:139
grpc::ClientCallbackReaderWriter::RemoveHold
virtual void RemoveHold()=0
channel_interface.h
grpc::ClientCallbackWriter::Write
void Write(const Request *req)
Definition: client_callback.h:173
grpc::ClientCallbackReader::StartCall
virtual void StartCall()=0
grpc::ClientWriteReactor::AddHold
void AddHold()
Definition: client_callback.h:391
grpc::internal::Call::PerformOps
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:65
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:69
grpc::internal::ClientCallbackWriterImpl::Write
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback.h:904
grpc::ClientUnaryReactor::StartCall
void StartCall()
Definition: client_callback.h:425
call.h
grpc::ClientCallbackReaderWriter::StartCall
virtual void StartCall()=0
call_op_set.h
grpc::internal::CallOpClientRecvStatus
Definition: call_op_set.h:768
grpc::WriteOptions::set_buffer_hint
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:122
grpc::ClientBidiReactor::StartWrite
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback.h:243
grpc::CompletionQueue
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:96
grpc::ClientBidiReactor
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback.h:126
grpc::internal::ClientCallbackReaderFactory
Definition: channel_interface.h:48
grpc::internal::ClientCallbackReaderImpl
Definition: client_callback.h:702
grpc::ClientCallbackWriter::StartCall
virtual void StartCall()=0
grpc::internal::ClientCallbackWriterImpl::WritesDone
void WritesDone() override
Definition: client_callback.h:929
grpc::internal::ClientCallbackUnaryImpl::StartCall
void StartCall() override
Definition: client_callback.h:1095
grpc::ClientWriteReactor::StartWrite
void StartWrite(const Request *req)
Definition: client_callback.h:380
grpc::ClientBidiReactor::StartCall
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call.
Definition: client_callback.h:218
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:90
GPR_CODEGEN_ASSERT
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
grpc::ClientWriteReactor::OnWriteDone
virtual void OnWriteDone(bool)
Definition: client_callback.h:400
grpc::WriteOptions::is_last_message
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:186
grpc::ClientBidiReactor::StartWriteLast
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback.h:256
grpc::internal::CallOpRecvMessage
Definition: byte_buffer.h:51
grpc::internal::Mutex
Definition: sync.h:47
grpc::ClientUnaryReactor::OnDone
void OnDone(const ::grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback.h:426
grpc::ClientCallbackReader::AddHold
virtual void AddHold(int holds)=0
grpc::internal::ClientCallbackWriterFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, Response *response, ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:1067
grpc::ClientWriteReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:392
grpc::ClientWriteReactor::~ClientWriteReactor
virtual ~ClientWriteReactor()
Definition: client_callback.h:377
grpc::internal::ClientReactor::InternalScheduleOnDone
virtual void InternalScheduleOnDone(::grpc::Status s)
InternalScheduleOnDone is not part of the API and is not meant to be overridden.
grpc::internal::ClientCallbackReaderWriterImpl::StartCall
void StartCall() override
Definition: client_callback.h:466
grpc::ClientCallbackReader
Definition: client_callback.h:154
grpc::ClientCallbackUnary::~ClientCallbackUnary
virtual ~ClientCallbackUnary()
Definition: client_callback.h:191
grpc::internal::CallOpSet::set_core_cq_tag
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:939
grpc::internal::ClientCallbackReaderWriterImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:563
grpc::internal::ClientCallbackUnaryFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, Response *response, ClientUnaryReactor *reactor)
Definition: client_callback.h:1175
grpc::ClientWriteReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:396
grpc::ClientReadReactor
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback.h:128
grpc::internal::ClientCallbackReaderWriterImpl::WritesDone
void WritesDone() override
Definition: client_callback.h:538
grpc::ClientBidiReactor::~ClientBidiReactor
virtual ~ClientBidiReactor()
Definition: client_callback.h:212
grpc::ClientReadReactor::AddHold
void AddHold()
Definition: client_callback.h:355
grpc::ClientCallbackWriter::WriteLast
void WriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:175
GPR_CODEGEN_DEBUG_ASSERT
#define GPR_CODEGEN_DEBUG_ASSERT(x)
Codegen specific version of GPR_DEBUG_ASSERT.
Definition: core_codegen_interface.h:155
grpc::experimental::ClientWriteReactor
::grpc::ClientWriteReactor< Request > ClientWriteReactor
Definition: client_callback.h:1209
grpc::ClientReadReactor::OnDone
void OnDone(const ::grpc::Status &) override
Definition: client_callback.h:362
grpc::ClientWriteReactor::OnWritesDoneDone
virtual void OnWritesDoneDone(bool)
Definition: client_callback.h:401