GRPC C++  1.32.0
client_callback_impl.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
20 #include <atomic>
21 #include <functional>
22 
30 
31 namespace grpc {
32 class Channel;
33 class ClientContext;
34 namespace internal {
35 class RpcMethod;
36 } // namespace internal
37 } // namespace grpc
38 
39 namespace grpc_impl {
40 
41 namespace internal {
42 
45 template <class InputMessage, class OutputMessage>
47  const ::grpc::internal::RpcMethod& method,
48  ::grpc::ClientContext* context,
49  const InputMessage* request, OutputMessage* result,
50  std::function<void(::grpc::Status)> on_completion) {
52  channel, method, context, request, result, on_completion);
53 }
54 
55 template <class InputMessage, class OutputMessage>
56 class CallbackUnaryCallImpl {
57  public:
59  const ::grpc::internal::RpcMethod& method,
60  ::grpc::ClientContext* context,
61  const InputMessage* request, OutputMessage* result,
62  std::function<void(::grpc::Status)> on_completion) {
63  ::grpc::CompletionQueue* cq = channel->CallbackCQ();
64  GPR_CODEGEN_ASSERT(cq != nullptr);
65  grpc::internal::Call call(channel->CreateCall(method, context, cq));
66 
67  using FullCallOpSet = grpc::internal::CallOpSet<
74 
75  struct OpSetAndTag {
76  FullCallOpSet opset;
78  };
79  const size_t alloc_sz = sizeof(OpSetAndTag);
80  auto* const alloced = static_cast<OpSetAndTag*>(
82  alloc_sz));
83  auto* ops = new (&alloced->opset) FullCallOpSet;
84  auto* tag = new (&alloced->tag)
85  grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
86 
87  // TODO(vjpai): Unify code with sync API as much as possible
88  ::grpc::Status s = ops->SendMessagePtr(request);
89  if (!s.ok()) {
90  tag->force_run(s);
91  return;
92  }
93  ops->SendInitialMetadata(&context->send_initial_metadata_,
94  context->initial_metadata_flags());
95  ops->RecvInitialMetadata(context);
96  ops->RecvMessage(result);
97  ops->AllowNoMessage();
98  ops->ClientSendClose();
99  ops->ClientRecvStatus(context, tag->status_ptr());
100  ops->set_core_cq_tag(tag);
101  call.PerformOps(ops);
102  }
103 };
104 
105 // Base class for public API classes.
107  public:
115  virtual void OnDone(const ::grpc::Status& /*s*/) = 0;
116 
124  virtual void InternalScheduleOnDone(::grpc::Status s);
125 };
126 
127 } // namespace internal
128 
129 // Forward declarations
130 template <class Request, class Response>
132 template <class Response>
134 template <class Request>
136 class ClientUnaryReactor;
137 
138 // NOTE: The streaming objects are not actually implemented in the public API.
139 // These interfaces are provided for mocking only. Typical applications
140 // will interact exclusively with the reactors that they define.
141 template <class Request, class Response>
143  public:
145  virtual void StartCall() = 0;
146  virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
147  virtual void WritesDone() = 0;
148  virtual void Read(Response* resp) = 0;
149  virtual void AddHold(int holds) = 0;
150  virtual void RemoveHold() = 0;
151 
152  protected:
154  reactor->BindStream(this);
155  }
156 };
157 
158 template <class Response>
160  public:
162  virtual void StartCall() = 0;
163  virtual void Read(Response* resp) = 0;
164  virtual void AddHold(int holds) = 0;
165  virtual void RemoveHold() = 0;
166 
167  protected:
169  reactor->BindReader(this);
170  }
171 };
172 
173 template <class Request>
175  public:
177  virtual void StartCall() = 0;
178  void Write(const Request* req) { Write(req, ::grpc::WriteOptions()); }
179  virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
180  void WriteLast(const Request* req, ::grpc::WriteOptions options) {
181  Write(req, options.set_last_message());
182  }
183  virtual void WritesDone() = 0;
184 
185  virtual void AddHold(int holds) = 0;
186  virtual void RemoveHold() = 0;
187 
188  protected:
190  reactor->BindWriter(this);
191  }
192 };
193 
195  public:
196  virtual ~ClientCallbackUnary() {}
197  virtual void StartCall() = 0;
198 
199  protected:
200  void BindReactor(ClientUnaryReactor* reactor);
201 };
202 
203 // The following classes are the reactor interfaces that are to be implemented
204 // by the user. They are passed in to the library as an argument to a call on a
205 // stub (either a codegen-ed call or a generic call). The streaming RPC is
206 // activated by calling StartCall, possibly after initiating StartRead,
207 // StartWrite, or AddHold operations on the streaming object. Note that none of
208 // the classes are pure; all reactions have a default empty reaction so that the
209 // user class only needs to override those classes that it cares about.
210 // The reactor must be passed to the stub invocation before any of the below
211 // operations can be called.
212 
214 template <class Request, class Response>
215 class ClientBidiReactor : public internal::ClientReactor {
216  public:
217  virtual ~ClientBidiReactor() {}
218 
223  void StartCall() { stream_->StartCall(); }
224 
230  void StartRead(Response* resp) { stream_->Read(resp); }
231 
238  void StartWrite(const Request* req) {
239  StartWrite(req, ::grpc::WriteOptions());
240  }
241 
248  void StartWrite(const Request* req, ::grpc::WriteOptions options) {
249  stream_->Write(req, std::move(options));
250  }
251 
261  void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
262  StartWrite(req, std::move(options.set_last_message()));
263  }
264 
270  void StartWritesDone() { stream_->WritesDone(); }
271 
294  void AddHold() { AddMultipleHolds(1); }
295  void AddMultipleHolds(int holds) {
296  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
297  stream_->AddHold(holds);
298  }
299  void RemoveHold() { stream_->RemoveHold(); }
300 
308  void OnDone(const ::grpc::Status& /*s*/) override {}
309 
318  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
319 
324  virtual void OnReadDone(bool /*ok*/) {}
325 
331  virtual void OnWriteDone(bool /*ok*/) {}
332 
340  virtual void OnWritesDoneDone(bool /*ok*/) {}
341 
342  private:
343  friend class ClientCallbackReaderWriter<Request, Response>;
344  void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
345  stream_ = stream;
346  }
348 };
349 
352 template <class Response>
353 class ClientReadReactor : public internal::ClientReactor {
354  public:
355  virtual ~ClientReadReactor() {}
356 
357  void StartCall() { reader_->StartCall(); }
358  void StartRead(Response* resp) { reader_->Read(resp); }
359 
360  void AddHold() { AddMultipleHolds(1); }
361  void AddMultipleHolds(int holds) {
362  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
363  reader_->AddHold(holds);
364  }
365  void RemoveHold() { reader_->RemoveHold(); }
366 
367  void OnDone(const ::grpc::Status& /*s*/) override {}
368  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
369  virtual void OnReadDone(bool /*ok*/) {}
370 
371  private:
372  friend class ClientCallbackReader<Response>;
373  void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
375 };
376 
379 template <class Request>
380 class ClientWriteReactor : public internal::ClientReactor {
381  public:
382  virtual ~ClientWriteReactor() {}
383 
384  void StartCall() { writer_->StartCall(); }
385  void StartWrite(const Request* req) {
386  StartWrite(req, ::grpc::WriteOptions());
387  }
388  void StartWrite(const Request* req, ::grpc::WriteOptions options) {
389  writer_->Write(req, std::move(options));
390  }
391  void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
392  StartWrite(req, std::move(options.set_last_message()));
393  }
394  void StartWritesDone() { writer_->WritesDone(); }
395 
396  void AddHold() { AddMultipleHolds(1); }
397  void AddMultipleHolds(int holds) {
398  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
399  writer_->AddHold(holds);
400  }
401  void RemoveHold() { writer_->RemoveHold(); }
402 
403  void OnDone(const ::grpc::Status& /*s*/) override {}
404  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
405  virtual void OnWriteDone(bool /*ok*/) {}
406  virtual void OnWritesDoneDone(bool /*ok*/) {}
407 
408  private:
409  friend class ClientCallbackWriter<Request>;
410  void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
411 
413 };
414 
427  public:
428  virtual ~ClientUnaryReactor() {}
429 
430  void StartCall() { call_->StartCall(); }
431  void OnDone(const ::grpc::Status& /*s*/) override {}
432  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
433 
434  private:
435  friend class ClientCallbackUnary;
436  void BindCall(ClientCallbackUnary* call) { call_ = call; }
437  ClientCallbackUnary* call_;
438 };
439 
440 // Define function out-of-line from class to avoid forward declaration issue
442  reactor->BindCall(this);
443 }
444 
445 namespace internal {
446 
447 // Forward declare factory classes for friendship
448 template <class Request, class Response>
449 class ClientCallbackReaderWriterFactory;
450 template <class Response>
451 class ClientCallbackReaderFactory;
452 template <class Request>
453 class ClientCallbackWriterFactory;
454 
455 template <class Request, class Response>
457  : public ClientCallbackReaderWriter<Request, Response> {
458  public:
459  // always allocated against a call arena, no memory free required
460  static void operator delete(void* /*ptr*/, std::size_t size) {
462  }
463 
464  // This operator should never be called as the memory should be freed as part
465  // of the arena destruction. It only exists to provide a matching operator
466  // delete to the operator new so that some compilers will not complain (see
467  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
468  // there are no tests catching the compiler warning.
469  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
470 
471  void StartCall() override {
472  // This call initiates two batches, plus any backlog, each with a callback
473  // 1. Send initial metadata (unless corked) + recv initial metadata
474  // 2. Any read backlog
475  // 3. Any write backlog
476  // 4. Recv trailing metadata (unless corked)
477  if (!start_corked_) {
478  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
479  context_->initial_metadata_flags());
480  }
481 
482  call_.PerformOps(&start_ops_);
483 
484  {
485  grpc::internal::MutexLock lock(&start_mu_);
486 
487  if (backlog_.read_ops) {
488  call_.PerformOps(&read_ops_);
489  }
490  if (backlog_.write_ops) {
491  call_.PerformOps(&write_ops_);
492  }
493  if (backlog_.writes_done_ops) {
494  call_.PerformOps(&writes_done_ops_);
495  }
496  call_.PerformOps(&finish_ops_);
497  // The last thing in this critical section is to set started_ so that it
498  // can be used lock-free as well.
499  started_.store(true, std::memory_order_release);
500  }
501  // MaybeFinish outside the lock to make sure that destruction of this object
502  // doesn't take place while holding the lock (which would cause the lock to
503  // be released after destruction)
504  this->MaybeFinish(/*from_reaction=*/false);
505  }
506 
507  void Read(Response* msg) override {
508  read_ops_.RecvMessage(msg);
509  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
510  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
511  grpc::internal::MutexLock lock(&start_mu_);
512  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
513  backlog_.read_ops = true;
514  return;
515  }
516  }
517  call_.PerformOps(&read_ops_);
518  }
519 
520  void Write(const Request* msg, ::grpc::WriteOptions options) override {
521  if (options.is_last_message()) {
522  options.set_buffer_hint();
523  write_ops_.ClientSendClose();
524  }
525  // TODO(vjpai): don't assert
526  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
527  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
528  if (GPR_UNLIKELY(corked_write_needed_)) {
529  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
530  context_->initial_metadata_flags());
531  corked_write_needed_ = false;
532  }
533 
534  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
535  grpc::internal::MutexLock lock(&start_mu_);
536  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
537  backlog_.write_ops = true;
538  return;
539  }
540  }
541  call_.PerformOps(&write_ops_);
542  }
543  void WritesDone() override {
544  writes_done_ops_.ClientSendClose();
545  writes_done_tag_.Set(call_.call(),
546  [this](bool ok) {
547  reactor_->OnWritesDoneDone(ok);
548  MaybeFinish(/*from_reaction=*/true);
549  },
550  &writes_done_ops_, /*can_inline=*/false);
551  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
552  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
553  if (GPR_UNLIKELY(corked_write_needed_)) {
554  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
555  context_->initial_metadata_flags());
556  corked_write_needed_ = false;
557  }
558  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
559  grpc::internal::MutexLock lock(&start_mu_);
560  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
561  backlog_.writes_done_ops = true;
562  return;
563  }
564  }
565  call_.PerformOps(&writes_done_ops_);
566  }
567 
568  void AddHold(int holds) override {
569  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
570  }
571  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
572 
573  private:
574  friend class ClientCallbackReaderWriterFactory<Request, Response>;
575 
577  ::grpc::ClientContext* context,
579  : context_(context),
580  call_(call),
581  reactor_(reactor),
582  start_corked_(context_->initial_metadata_corked_),
583  corked_write_needed_(start_corked_) {
584  this->BindReactor(reactor);
585 
586  // Set up the unchanging parts of the start, read, and write tags and ops.
587  start_tag_.Set(call_.call(),
588  [this](bool ok) {
589  reactor_->OnReadInitialMetadataDone(ok);
590  MaybeFinish(/*from_reaction=*/true);
591  },
592  &start_ops_, /*can_inline=*/false);
593  start_ops_.RecvInitialMetadata(context_);
594  start_ops_.set_core_cq_tag(&start_tag_);
595 
596  write_tag_.Set(call_.call(),
597  [this](bool ok) {
598  reactor_->OnWriteDone(ok);
599  MaybeFinish(/*from_reaction=*/true);
600  },
601  &write_ops_, /*can_inline=*/false);
602  write_ops_.set_core_cq_tag(&write_tag_);
603 
604  read_tag_.Set(call_.call(),
605  [this](bool ok) {
606  reactor_->OnReadDone(ok);
607  MaybeFinish(/*from_reaction=*/true);
608  },
609  &read_ops_, /*can_inline=*/false);
610  read_ops_.set_core_cq_tag(&read_tag_);
611 
612  // Also set up the Finish tag and op set.
613  finish_tag_.Set(
614  call_.call(),
615  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
616  &finish_ops_,
617  /*can_inline=*/false);
618  finish_ops_.ClientRecvStatus(context_, &finish_status_);
619  finish_ops_.set_core_cq_tag(&finish_tag_);
620  }
621 
622  // MaybeFinish can be called from reactions or from user-initiated operations
623  // like StartCall or RemoveHold. If this is the last operation or hold on this
624  // object, it will invoke the OnDone reaction. If MaybeFinish was called from
625  // a reaction, it can call OnDone directly. If not, it would need to schedule
626  // OnDone onto an executor thread to avoid the possibility of deadlocking with
627  // any locks in the user code that invoked it.
628  void MaybeFinish(bool from_reaction) {
629  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
630  1, std::memory_order_acq_rel) == 1)) {
631  ::grpc::Status s = std::move(finish_status_);
632  auto* reactor = reactor_;
633  auto* call = call_.call();
634  this->~ClientCallbackReaderWriterImpl();
636  if (GPR_LIKELY(from_reaction)) {
637  reactor->OnDone(s);
638  } else {
639  reactor->InternalScheduleOnDone(std::move(s));
640  }
641  }
642  }
643 
644  ::grpc::ClientContext* const context_;
645  grpc::internal::Call call_;
646  ClientBidiReactor<Request, Response>* const reactor_;
647 
650  start_ops_;
652  const bool start_corked_;
653  bool corked_write_needed_; // no lock needed since only accessed in
654  // Write/WritesDone which cannot be concurrent
655 
658  ::grpc::Status finish_status_;
659 
663  write_ops_;
665 
668  writes_done_ops_;
670 
672  read_ops_;
674 
675  struct StartCallBacklog {
676  bool write_ops = false;
677  bool writes_done_ops = false;
678  bool read_ops = false;
679  };
680  StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
681 
682  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
683  std::atomic<intptr_t> callbacks_outstanding_{3};
684  std::atomic_bool started_{false};
685  grpc::internal::Mutex start_mu_;
686 };
687 
688 template <class Request, class Response>
689 class ClientCallbackReaderWriterFactory {
690  public:
691  static void Create(::grpc::ChannelInterface* channel,
692  const ::grpc::internal::RpcMethod& method,
693  ::grpc::ClientContext* context,
695  grpc::internal::Call call =
696  channel->CreateCall(method, context, channel->CallbackCQ());
697 
702  reactor);
703  }
704 };
705 
706 template <class Response>
708  public:
709  // always allocated against a call arena, no memory free required
710  static void operator delete(void* /*ptr*/, std::size_t size) {
712  }
713 
714  // This operator should never be called as the memory should be freed as part
715  // of the arena destruction. It only exists to provide a matching operator
716  // delete to the operator new so that some compilers will not complain (see
717  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
718  // there are no tests catching the compiler warning.
719  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
720 
721  void StartCall() override {
722  // This call initiates two batches, plus any backlog, each with a callback
723  // 1. Send initial metadata (unless corked) + recv initial metadata
724  // 2. Any backlog
725  // 3. Recv trailing metadata
726 
727  start_tag_.Set(call_.call(),
728  [this](bool ok) {
729  reactor_->OnReadInitialMetadataDone(ok);
730  MaybeFinish(/*from_reaction=*/true);
731  },
732  &start_ops_, /*can_inline=*/false);
733  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
734  context_->initial_metadata_flags());
735  start_ops_.RecvInitialMetadata(context_);
736  start_ops_.set_core_cq_tag(&start_tag_);
737  call_.PerformOps(&start_ops_);
738 
739  // Also set up the read tag so it doesn't have to be set up each time
740  read_tag_.Set(call_.call(),
741  [this](bool ok) {
742  reactor_->OnReadDone(ok);
743  MaybeFinish(/*from_reaction=*/true);
744  },
745  &read_ops_, /*can_inline=*/false);
746  read_ops_.set_core_cq_tag(&read_tag_);
747 
748  {
749  grpc::internal::MutexLock lock(&start_mu_);
750  if (backlog_.read_ops) {
751  call_.PerformOps(&read_ops_);
752  }
753  started_.store(true, std::memory_order_release);
754  }
755 
756  finish_tag_.Set(
757  call_.call(),
758  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
759  &finish_ops_, /*can_inline=*/false);
760  finish_ops_.ClientRecvStatus(context_, &finish_status_);
761  finish_ops_.set_core_cq_tag(&finish_tag_);
762  call_.PerformOps(&finish_ops_);
763  }
764 
765  void Read(Response* msg) override {
766  read_ops_.RecvMessage(msg);
767  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
768  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
769  grpc::internal::MutexLock lock(&start_mu_);
770  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
771  backlog_.read_ops = true;
772  return;
773  }
774  }
775  call_.PerformOps(&read_ops_);
776  }
777 
778  void AddHold(int holds) override {
779  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
780  }
781  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
782 
783  private:
784  friend class ClientCallbackReaderFactory<Response>;
785 
786  template <class Request>
788  ::grpc::ClientContext* context, Request* request,
790  : context_(context), call_(call), reactor_(reactor) {
791  this->BindReactor(reactor);
792  // TODO(vjpai): don't assert
793  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
794  start_ops_.ClientSendClose();
795  }
796 
797  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
798  void MaybeFinish(bool from_reaction) {
799  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
800  1, std::memory_order_acq_rel) == 1)) {
801  ::grpc::Status s = std::move(finish_status_);
802  auto* reactor = reactor_;
803  auto* call = call_.call();
804  this->~ClientCallbackReaderImpl();
806  if (GPR_LIKELY(from_reaction)) {
807  reactor->OnDone(s);
808  } else {
809  reactor->InternalScheduleOnDone(std::move(s));
810  }
811  }
812  }
813 
814  ::grpc::ClientContext* const context_;
815  grpc::internal::Call call_;
816  ClientReadReactor<Response>* const reactor_;
817 
822  start_ops_;
824 
827  ::grpc::Status finish_status_;
828 
830  read_ops_;
832 
833  struct StartCallBacklog {
834  bool read_ops = false;
835  };
836  StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
837 
838  // Minimum of 2 callbacks to pre-register for start and finish
839  std::atomic<intptr_t> callbacks_outstanding_{2};
840  std::atomic_bool started_{false};
841  grpc::internal::Mutex start_mu_;
842 };
843 
844 template <class Response>
845 class ClientCallbackReaderFactory {
846  public:
847  template <class Request>
848  static void Create(::grpc::ChannelInterface* channel,
849  const ::grpc::internal::RpcMethod& method,
850  ::grpc::ClientContext* context, const Request* request,
851  ClientReadReactor<Response>* reactor) {
852  grpc::internal::Call call =
853  channel->CreateCall(method, context, channel->CallbackCQ());
854 
857  call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
858  ClientCallbackReaderImpl<Response>(call, context, request, reactor);
859  }
860 };
861 
862 template <class Request>
864  public:
865  // always allocated against a call arena, no memory free required
866  static void operator delete(void* /*ptr*/, std::size_t size) {
868  }
869 
870  // This operator should never be called as the memory should be freed as part
871  // of the arena destruction. It only exists to provide a matching operator
872  // delete to the operator new so that some compilers will not complain (see
873  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
874  // there are no tests catching the compiler warning.
875  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
876 
877  void StartCall() override {
878  // This call initiates two batches, plus any backlog, each with a callback
879  // 1. Send initial metadata (unless corked) + recv initial metadata
880  // 2. Any backlog
881  // 3. Recv trailing metadata
882 
883  if (!start_corked_) {
884  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
885  context_->initial_metadata_flags());
886  }
887  call_.PerformOps(&start_ops_);
888 
889  {
890  grpc::internal::MutexLock lock(&start_mu_);
891 
892  if (backlog_.write_ops) {
893  call_.PerformOps(&write_ops_);
894  }
895  if (backlog_.writes_done_ops) {
896  call_.PerformOps(&writes_done_ops_);
897  }
898  call_.PerformOps(&finish_ops_);
899  // The last thing in this critical section is to set started_ so that it
900  // can be used lock-free as well.
901  started_.store(true, std::memory_order_release);
902  }
903  // MaybeFinish outside the lock to make sure that destruction of this object
904  // doesn't take place while holding the lock (which would cause the lock to
905  // be released after destruction)
906  this->MaybeFinish(/*from_reaction=*/false);
907  }
908 
909  void Write(const Request* msg, ::grpc::WriteOptions options) override {
910  if (GPR_UNLIKELY(options.is_last_message())) {
911  options.set_buffer_hint();
912  write_ops_.ClientSendClose();
913  }
914  // TODO(vjpai): don't assert
915  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
916  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
917 
918  if (GPR_UNLIKELY(corked_write_needed_)) {
919  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
920  context_->initial_metadata_flags());
921  corked_write_needed_ = false;
922  }
923 
924  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
925  grpc::internal::MutexLock lock(&start_mu_);
926  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
927  backlog_.write_ops = true;
928  return;
929  }
930  }
931  call_.PerformOps(&write_ops_);
932  }
933 
934  void WritesDone() override {
935  writes_done_ops_.ClientSendClose();
936  writes_done_tag_.Set(call_.call(),
937  [this](bool ok) {
938  reactor_->OnWritesDoneDone(ok);
939  MaybeFinish(/*from_reaction=*/true);
940  },
941  &writes_done_ops_, /*can_inline=*/false);
942  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
943  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
944 
945  if (GPR_UNLIKELY(corked_write_needed_)) {
946  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
947  context_->initial_metadata_flags());
948  corked_write_needed_ = false;
949  }
950 
951  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
952  grpc::internal::MutexLock lock(&start_mu_);
953  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
954  backlog_.writes_done_ops = true;
955  return;
956  }
957  }
958  call_.PerformOps(&writes_done_ops_);
959  }
960 
961  void AddHold(int holds) override {
962  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
963  }
964  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
965 
966  private:
967  friend class ClientCallbackWriterFactory<Request>;
968 
969  template <class Response>
971  ::grpc::ClientContext* context, Response* response,
973  : context_(context),
974  call_(call),
975  reactor_(reactor),
976  start_corked_(context_->initial_metadata_corked_),
977  corked_write_needed_(start_corked_) {
978  this->BindReactor(reactor);
979 
980  // Set up the unchanging parts of the start and write tags and ops.
981  start_tag_.Set(call_.call(),
982  [this](bool ok) {
983  reactor_->OnReadInitialMetadataDone(ok);
984  MaybeFinish(/*from_reaction=*/true);
985  },
986  &start_ops_, /*can_inline=*/false);
987  start_ops_.RecvInitialMetadata(context_);
988  start_ops_.set_core_cq_tag(&start_tag_);
989 
990  write_tag_.Set(call_.call(),
991  [this](bool ok) {
992  reactor_->OnWriteDone(ok);
993  MaybeFinish(/*from_reaction=*/true);
994  },
995  &write_ops_, /*can_inline=*/false);
996  write_ops_.set_core_cq_tag(&write_tag_);
997 
998  // Also set up the Finish tag and op set.
999  finish_ops_.RecvMessage(response);
1000  finish_ops_.AllowNoMessage();
1001  finish_tag_.Set(
1002  call_.call(),
1003  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1004  &finish_ops_,
1005  /*can_inline=*/false);
1006  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1007  finish_ops_.set_core_cq_tag(&finish_tag_);
1008  }
1009 
1010  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
1011  void MaybeFinish(bool from_reaction) {
1012  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1013  1, std::memory_order_acq_rel) == 1)) {
1014  ::grpc::Status s = std::move(finish_status_);
1015  auto* reactor = reactor_;
1016  auto* call = call_.call();
1017  this->~ClientCallbackWriterImpl();
1019  if (GPR_LIKELY(from_reaction)) {
1020  reactor->OnDone(s);
1021  } else {
1022  reactor->InternalScheduleOnDone(std::move(s));
1023  }
1024  }
1025  }
1026 
1027  ::grpc::ClientContext* const context_;
1028  grpc::internal::Call call_;
1029  ClientWriteReactor<Request>* const reactor_;
1030 
1033  start_ops_;
1035  const bool start_corked_;
1036  bool corked_write_needed_; // no lock needed since only accessed in
1037  // Write/WritesDone which cannot be concurrent
1038 
1041  finish_ops_;
1043  ::grpc::Status finish_status_;
1044 
1048  write_ops_;
1050 
1053  writes_done_ops_;
1054  grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1055 
1056  struct StartCallBacklog {
1057  bool write_ops = false;
1058  bool writes_done_ops = false;
1059  };
1060  StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
1061 
1062  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1063  std::atomic<intptr_t> callbacks_outstanding_{3};
1064  std::atomic_bool started_{false};
1065  grpc::internal::Mutex start_mu_;
1066 };
1067 
1068 template <class Request>
1069 class ClientCallbackWriterFactory {
1070  public:
1071  template <class Response>
1072  static void Create(::grpc::ChannelInterface* channel,
1073  const ::grpc::internal::RpcMethod& method,
1074  ::grpc::ClientContext* context, Response* response,
1075  ClientWriteReactor<Request>* reactor) {
1076  grpc::internal::Call call =
1077  channel->CreateCall(method, context, channel->CallbackCQ());
1078 
1081  call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
1082  ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1083  }
1084 };
1085 
1087  public:
1088  // always allocated against a call arena, no memory free required
1089  static void operator delete(void* /*ptr*/, std::size_t size) {
1091  }
1092 
1093  // This operator should never be called as the memory should be freed as part
1094  // of the arena destruction. It only exists to provide a matching operator
1095  // delete to the operator new so that some compilers will not complain (see
1096  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1097  // there are no tests catching the compiler warning.
1098  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
1099 
1100  void StartCall() override {
1101  // This call initiates two batches, each with a callback
1102  // 1. Send initial metadata + write + writes done + recv initial metadata
1103  // 2. Read message, recv trailing metadata
1104 
1105  start_tag_.Set(call_.call(),
1106  [this](bool ok) {
1107  reactor_->OnReadInitialMetadataDone(ok);
1108  MaybeFinish();
1109  },
1110  &start_ops_, /*can_inline=*/false);
1111  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1112  context_->initial_metadata_flags());
1113  start_ops_.RecvInitialMetadata(context_);
1114  start_ops_.set_core_cq_tag(&start_tag_);
1115  call_.PerformOps(&start_ops_);
1116 
1117  finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
1118  &finish_ops_,
1119  /*can_inline=*/false);
1120  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1121  finish_ops_.set_core_cq_tag(&finish_tag_);
1122  call_.PerformOps(&finish_ops_);
1123  }
1124 
1125  private:
1127 
1128  template <class Request, class Response>
1130  ::grpc::ClientContext* context, Request* request,
1131  Response* response, ClientUnaryReactor* reactor)
1132  : context_(context), call_(call), reactor_(reactor) {
1133  this->BindReactor(reactor);
1134  // TODO(vjpai): don't assert
1135  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
1136  start_ops_.ClientSendClose();
1137  finish_ops_.RecvMessage(response);
1138  finish_ops_.AllowNoMessage();
1139  }
1140 
1141  // In the unary case, MaybeFinish is only ever invoked from a
1142  // library-initiated reaction, so it will just directly call OnDone if this is
1143  // the last reaction for this RPC.
1144  void MaybeFinish() {
1145  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1146  1, std::memory_order_acq_rel) == 1)) {
1147  ::grpc::Status s = std::move(finish_status_);
1148  auto* reactor = reactor_;
1149  auto* call = call_.call();
1150  this->~ClientCallbackUnaryImpl();
1152  reactor->OnDone(s);
1153  }
1154  }
1155 
1156  ::grpc::ClientContext* const context_;
1157  grpc::internal::Call call_;
1158  ClientUnaryReactor* const reactor_;
1159 
1164  start_ops_;
1166 
1169  finish_ops_;
1171  ::grpc::Status finish_status_;
1172 
1173  // This call will have 2 callbacks: start and finish
1174  std::atomic<intptr_t> callbacks_outstanding_{2};
1175 };
1176 
1178  public:
1179  template <class Request, class Response>
1180  static void Create(::grpc::ChannelInterface* channel,
1181  const ::grpc::internal::RpcMethod& method,
1182  ::grpc::ClientContext* context, const Request* request,
1183  Response* response, ClientUnaryReactor* reactor) {
1184  grpc::internal::Call call =
1185  channel->CreateCall(method, context, channel->CallbackCQ());
1186 
1188 
1190  call.call(), sizeof(ClientCallbackUnaryImpl)))
1191  ClientCallbackUnaryImpl(call, context, request, response, reactor);
1192  }
1193 };
1194 
1195 } // namespace internal
1196 } // namespace grpc_impl
1197 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:136
grpc_impl::ClientCallbackWriter::WritesDone
virtual void WritesDone()=0
grpc_impl::internal::ClientCallbackWriterImpl::StartCall
void StartCall() override
Definition: client_callback_impl.h:877
grpc::internal::CallOpRecvInitialMetadata
Definition: call_op_set.h:720
grpc_impl::internal::ClientCallbackReaderImpl::RemoveHold
void RemoveHold() override
Definition: client_callback_impl.h:781
grpc_impl::internal::ClientCallbackWriterImpl::Write
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback_impl.h:909
grpc_impl::internal::ClientCallbackReaderWriterFactory
Definition: channel_interface.h:46
grpc_impl::ClientCallbackWriter
Definition: client_callback_impl.h:174
grpc::internal::CallOpClientSendClose
Definition: call_op_set.h:618
grpc_impl::ClientUnaryReactor::StartCall
void StartCall()
Definition: client_callback_impl.h:430
grpc_impl::internal::ClientCallbackWriterImpl
Definition: client_callback_impl.h:863
grpc_impl::ClientReadReactor::OnReadDone
virtual void OnReadDone(bool)
Definition: client_callback_impl.h:369
grpc_impl::internal::ClientCallbackReaderImpl::Read
void Read(Response *msg) override
Definition: client_callback_impl.h:765
grpc::internal::CallOpGenericRecvMessage
Definition: call_op_set.h:525
grpc_impl::ClientCallbackWriter::Write
void Write(const Request *req)
Definition: client_callback_impl.h:178
grpc_impl::ClientBidiReactor::StartRead
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback_impl.h:230
grpc
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:850
status.h
grpc::CoreCodegenInterface::grpc_call_ref
virtual void grpc_call_ref(grpc_call *call)=0
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:287
grpc::WriteOptions::set_last_message
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:161
grpc_impl::ClientCallbackUnary::StartCall
virtual void StartCall()=0
grpc::CoreCodegenInterface::grpc_call_arena_alloc
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
grpc_impl::ClientCallbackReaderWriter::RemoveHold
virtual void RemoveHold()=0
GPR_LIKELY
#define GPR_LIKELY(x)
Definition: port_platform.h:685
grpc_impl::ClientUnaryReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback_impl.h:432
grpc_impl::internal::ClientReactor::OnDone
virtual void OnDone(const ::grpc::Status &)=0
Called by the library when all operations associated with this RPC have completed and all Holds have ...
grpc_impl::ClientCallbackReaderWriter::~ClientCallbackReaderWriter
virtual ~ClientCallbackReaderWriter()
Definition: client_callback_impl.h:144
config.h
grpc_impl::ClientCallbackReaderWriter::WritesDone
virtual void WritesDone()=0
grpc_impl::ClientWriteReactor::StartWriteLast
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:391
grpc_impl::internal::CallbackUnaryCallImpl
Definition: channel_interface.h:36
grpc_impl::ClientUnaryReactor::~ClientUnaryReactor
virtual ~ClientUnaryReactor()
Definition: client_callback_impl.h:428
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:35
grpc_impl::internal::ClientReactor
Definition: client_callback_impl.h:106
grpc_impl::internal::ClientCallbackWriterFactory
Definition: channel_interface.h:50
grpc_impl::internal::CallbackUnaryCall
void CallbackUnaryCall(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Perform a callback-based unary call TODO(vjpai): Combine as much as possible with the blocking unary ...
Definition: client_callback_impl.h:46
grpc_impl::ClientBidiReactor::OnDone
void OnDone(const ::grpc::Status &) override
Notifies the application that all operations associated with this RPC have completed and all Holds ha...
Definition: client_callback_impl.h:308
core_codegen_interface.h
grpc_impl::internal::ClientCallbackReaderFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, ClientReadReactor< Response > *reactor)
Definition: client_callback_impl.h:848
grpc_impl::internal::ClientCallbackUnaryImpl::StartCall
void StartCall() override
Definition: client_callback_impl.h:1100
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:217
grpc_impl::ClientCallbackWriter::BindReactor
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback_impl.h:189
grpc_impl::ClientBidiReactor::RemoveHold
void RemoveHold()
Definition: client_callback_impl.h:299
grpc_impl::ClientCallbackReaderWriter::StartCall
virtual void StartCall()=0
grpc::internal::CallbackWithStatusTag
Definition: callback_common.h:68
grpc_impl::ClientReadReactor::StartRead
void StartRead(Response *resp)
Definition: client_callback_impl.h:358
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:118
grpc_impl::ClientBidiReactor::StartWriteLast
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback_impl.h:261
grpc_impl::ClientCallbackReader::StartCall
virtual void StartCall()=0
grpc_impl::ClientCallbackWriter::StartCall
virtual void StartCall()=0
grpc_impl::ClientBidiReactor
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback_impl.h:131
grpc_impl::ClientReadReactor::RemoveHold
void RemoveHold()
Definition: client_callback_impl.h:365
grpc_impl::internal::ClientCallbackReaderImpl::StartCall
void StartCall() override
Definition: client_callback_impl.h:721
grpc_impl::internal::ClientCallbackReaderImpl::AddHold
void AddHold(int holds) override
Definition: client_callback_impl.h:778
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:31
grpc_impl::ClientCallbackReader::Read
virtual void Read(Response *resp)=0
grpc_impl::ClientBidiReactor::StartWrite
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback_impl.h:238
grpc_impl::ClientReadReactor
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback_impl.h:133
grpc_impl::internal::ClientCallbackReaderWriterImpl::Write
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback_impl.h:520
grpc_impl::ClientCallbackReader
Definition: client_callback_impl.h:159
grpc_impl::ClientBidiReactor::OnReadDone
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: client_callback_impl.h:324
grpc_impl::ClientBidiReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:295
grpc_impl::internal::ClientCallbackReaderWriterImpl
Definition: client_callback_impl.h:456
grpc_impl::ClientBidiReactor::~ClientBidiReactor
virtual ~ClientBidiReactor()
Definition: client_callback_impl.h:217
grpc_impl::ClientCallbackUnary
Definition: client_callback_impl.h:194
grpc_impl::internal::ClientCallbackReaderWriterImpl::RemoveHold
void RemoveHold() override
Definition: client_callback_impl.h:571
grpc_impl::ClientCallbackWriter::RemoveHold
virtual void RemoveHold()=0
grpc_impl::ClientWriteReactor::OnDone
void OnDone(const ::grpc::Status &) override
Definition: client_callback_impl.h:403
grpc_impl::ClientReadReactor::~ClientReadReactor
virtual ~ClientReadReactor()
Definition: client_callback_impl.h:355
grpc::ClientContext
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:195
grpc_impl::ClientUnaryReactor
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback_impl.h:426
grpc_impl::ClientUnaryReactor::OnDone
void OnDone(const ::grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback_impl.h:431
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: port_platform.h:686
grpc_impl::ClientCallbackReaderWriter::Write
virtual void Write(const Request *req, ::grpc::WriteOptions options)=0
grpc::experimental::ClientUnaryReactor
::grpc_impl::ClientUnaryReactor ClientUnaryReactor
Definition: client_callback.h:71
grpc_impl::ClientBidiReactor::AddHold
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback_impl.h:294
grpc::experimental::ClientReadReactor
::grpc_impl::ClientReadReactor< Response > ClientReadReactor
Definition: client_callback.h:63
grpc::ChannelInterface
Codegen interface for grpc::Channel.
Definition: channel_interface.h:74
grpc_impl::internal::ClientCallbackReaderWriterImpl::AddHold
void AddHold(int holds) override
Definition: client_callback_impl.h:568
grpc_impl::ClientWriteReactor::StartCall
void StartCall()
Definition: client_callback_impl.h:384
grpc_impl::ClientWriteReactor::OnWriteDone
virtual void OnWriteDone(bool)
Definition: client_callback_impl.h:405
grpc_impl::internal::CallbackUnaryCallImpl::CallbackUnaryCallImpl
CallbackUnaryCallImpl(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Definition: client_callback_impl.h:58
grpc_impl::ClientBidiReactor::OnWritesDoneDone
virtual void OnWritesDoneDone(bool)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback_impl.h:340
grpc_impl::internal::ClientCallbackWriterImpl::RemoveHold
void RemoveHold() override
Definition: client_callback_impl.h:964
grpc_impl::ClientBidiReactor::StartCall
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call.
Definition: client_callback_impl.h:223
grpc_impl::ClientWriteReactor::RemoveHold
void RemoveHold()
Definition: client_callback_impl.h:401
grpc_impl::ClientCallbackUnary::~ClientCallbackUnary
virtual ~ClientCallbackUnary()
Definition: client_callback_impl.h:196
grpc_impl::ClientReadReactor::StartCall
void StartCall()
Definition: client_callback_impl.h:357
grpc_impl::ClientWriteReactor::StartWritesDone
void StartWritesDone()
Definition: client_callback_impl.h:394
grpc_impl::ClientCallbackReaderWriter::Read
virtual void Read(Response *resp)=0
grpc::protobuf::util::Status
::google::protobuf::util::Status Status
Definition: config_protobuf.h:90
grpc_impl::internal::ClientCallbackReaderImpl
Definition: client_callback_impl.h:707
grpc_impl::ClientCallbackReader::AddHold
virtual void AddHold(int holds)=0
grpc_impl::ClientReadReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback_impl.h:368
grpc_impl::internal::ClientCallbackReaderWriterImpl::StartCall
void StartCall() override
Definition: client_callback_impl.h:471
grpc::CoreCodegenInterface::grpc_call_unref
virtual void grpc_call_unref(grpc_call *call)=0
grpc_impl::internal::ClientCallbackReaderFactory
Definition: channel_interface.h:48
grpc_impl::ClientBidiReactor::StartWritesDone
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback_impl.h:270
grpc_impl::ClientCallbackReader::RemoveHold
virtual void RemoveHold()=0
grpc_impl::ClientWriteReactor::~ClientWriteReactor
virtual ~ClientWriteReactor()
Definition: client_callback_impl.h:382
grpc_impl::ClientCallbackReaderWriter
Definition: client_callback_impl.h:142
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:79
grpc_impl::ClientReadReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:361
callback_common.h
grpc::internal::CallbackWithSuccessTag::Set
void Set(grpc_call *call, std::function< void(bool)> f, CompletionQueueTag *ops, bool can_inline)
Definition: callback_common.h:164
grpc_impl::ClientWriteReactor
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback_impl.h:135
grpc::internal::MutexLock
Definition: sync.h:69
grpc_impl::ClientCallbackReader::~ClientCallbackReader
virtual ~ClientCallbackReader()
Definition: client_callback_impl.h:161
channel_interface.h
grpc_impl::ClientWriteReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:397
grpc_impl::internal::ClientCallbackUnaryFactory
Definition: client_callback_impl.h:1177
grpc_impl::ClientCallbackWriter::WriteLast
void WriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:180
grpc::internal::Call::PerformOps
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:65
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:69
grpc_impl::ClientCallbackReader::BindReactor
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback_impl.h:168
call.h
call_op_set.h
grpc::internal::CallOpClientRecvStatus
Definition: call_op_set.h:768
grpc::WriteOptions::set_buffer_hint
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:122
grpc_impl::internal::ClientCallbackWriterFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, Response *response, ClientWriteReactor< Request > *reactor)
Definition: client_callback_impl.h:1072
grpc_impl::ClientCallbackUnary::BindReactor
void BindReactor(ClientUnaryReactor *reactor)
Definition: client_callback_impl.h:441
grpc_impl::internal::ClientCallbackUnaryFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, Response *response, ClientUnaryReactor *reactor)
Definition: client_callback_impl.h:1180
grpc_impl::ClientReadReactor::OnDone
void OnDone(const ::grpc::Status &) override
Definition: client_callback_impl.h:367
grpc::CompletionQueue
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:99
grpc_impl::internal::ClientCallbackReaderWriterImpl::WritesDone
void WritesDone() override
Definition: client_callback_impl.h:543
grpc_impl
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
grpc_impl::ClientWriteReactor::OnWritesDoneDone
virtual void OnWritesDoneDone(bool)
Definition: client_callback_impl.h:406
grpc_impl::ClientWriteReactor::StartWrite
void StartWrite(const Request *req)
Definition: client_callback_impl.h:385
grpc_impl::ClientBidiReactor::StartWrite
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback_impl.h:248
grpc_impl::ClientCallbackReaderWriter::AddHold
virtual void AddHold(int holds)=0
grpc_impl::internal::ClientCallbackUnaryImpl
Definition: client_callback_impl.h:1086
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:93
GPR_CODEGEN_ASSERT
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
grpc::WriteOptions::is_last_message
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:186
grpc_impl::ClientWriteReactor::AddHold
void AddHold()
Definition: client_callback_impl.h:396
grpc_impl::internal::ClientCallbackWriterImpl::WritesDone
void WritesDone() override
Definition: client_callback_impl.h:934
grpc_impl::ClientCallbackReaderWriter::BindReactor
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback_impl.h:153
grpc_impl::ClientWriteReactor::StartWrite
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:388
grpc::experimental::ClientBidiReactor
::grpc_impl::ClientBidiReactor< Request, Response > ClientBidiReactor
Definition: client_callback.h:69
grpc_impl::ClientReadReactor::AddHold
void AddHold()
Definition: client_callback_impl.h:360
grpc::internal::CallOpRecvMessage
Definition: byte_buffer.h:58
grpc::internal::Mutex
Definition: sync.h:47
grpc_impl::internal::ClientCallbackReaderWriterImpl::Read
void Read(Response *msg) override
Definition: client_callback_impl.h:507
grpc_impl::ClientCallbackWriter::~ClientCallbackWriter
virtual ~ClientCallbackWriter()
Definition: client_callback_impl.h:176
grpc_impl::ClientBidiReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Notifies the application that a read of initial metadata from the server is done.
Definition: client_callback_impl.h:318
grpc_impl::ClientCallbackWriter::AddHold
virtual void AddHold(int holds)=0
grpc::internal::CallOpSet::set_core_cq_tag
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:939
grpc::experimental::ClientWriteReactor
::grpc_impl::ClientWriteReactor< Request > ClientWriteReactor
Definition: client_callback.h:66
grpc_impl::internal::ClientReactor::InternalScheduleOnDone
virtual void InternalScheduleOnDone(::grpc::Status s)
InternalScheduleOnDone is not part of the API and is not meant to be overridden.
grpc_impl::internal::ClientCallbackReaderWriterFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback_impl.h:691
grpc_impl::ClientWriteReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback_impl.h:404
grpc_impl::internal::ClientCallbackWriterImpl::AddHold
void AddHold(int holds) override
Definition: client_callback_impl.h:961
grpc_impl::ClientBidiReactor::OnWriteDone
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite or StartWriteLast operation completed.
Definition: client_callback_impl.h:331
GPR_CODEGEN_DEBUG_ASSERT
#define GPR_CODEGEN_DEBUG_ASSERT(x)
Codegen specific version of GPR_DEBUG_ASSERT.
Definition: core_codegen_interface.h:155