GRPC C++  1.31.1
client_callback_impl.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
20 #include <atomic>
21 #include <functional>
22 
30 
31 namespace grpc {
32 namespace internal {
33 class RpcMethod;
34 } // namespace internal
35 } // namespace grpc
36 
37 namespace grpc_impl {
38 class Channel;
39 class ClientContext;
40 
41 namespace internal {
42 
45 template <class InputMessage, class OutputMessage>
47  const ::grpc::internal::RpcMethod& method,
48  ::grpc_impl::ClientContext* context,
49  const InputMessage* request, OutputMessage* result,
50  std::function<void(::grpc::Status)> on_completion) {
52  channel, method, context, request, result, on_completion);
53 }
54 
55 template <class InputMessage, class OutputMessage>
56 class CallbackUnaryCallImpl {
57  public:
59  const ::grpc::internal::RpcMethod& method,
60  ::grpc_impl::ClientContext* context,
61  const InputMessage* request, OutputMessage* result,
62  std::function<void(::grpc::Status)> on_completion) {
63  ::grpc_impl::CompletionQueue* cq = channel->CallbackCQ();
64  GPR_CODEGEN_ASSERT(cq != nullptr);
65  grpc::internal::Call call(channel->CreateCall(method, context, cq));
66 
67  using FullCallOpSet = grpc::internal::CallOpSet<
74 
75  struct OpSetAndTag {
76  FullCallOpSet opset;
78  };
79  const size_t alloc_sz = sizeof(OpSetAndTag);
80  auto* const alloced = static_cast<OpSetAndTag*>(
82  alloc_sz));
83  auto* ops = new (&alloced->opset) FullCallOpSet;
84  auto* tag = new (&alloced->tag)
85  grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
86 
87  // TODO(vjpai): Unify code with sync API as much as possible
88  ::grpc::Status s = ops->SendMessagePtr(request);
89  if (!s.ok()) {
90  tag->force_run(s);
91  return;
92  }
93  ops->SendInitialMetadata(&context->send_initial_metadata_,
94  context->initial_metadata_flags());
95  ops->RecvInitialMetadata(context);
96  ops->RecvMessage(result);
97  ops->AllowNoMessage();
98  ops->ClientSendClose();
99  ops->ClientRecvStatus(context, tag->status_ptr());
100  ops->set_core_cq_tag(tag);
101  call.PerformOps(ops);
102  }
103 };
104 
105 // Base class for public API classes.
107  public:
115  virtual void OnDone(const ::grpc::Status& /*s*/) = 0;
116 
124  virtual void InternalScheduleOnDone(::grpc::Status s);
125 };
126 
127 } // namespace internal
128 
129 // Forward declarations
130 template <class Request, class Response>
132 template <class Response>
134 template <class Request>
136 class ClientUnaryReactor;
137 
138 // NOTE: The streaming objects are not actually implemented in the public API.
139 // These interfaces are provided for mocking only. Typical applications
140 // will interact exclusively with the reactors that they define.
141 template <class Request, class Response>
143  public:
145  virtual void StartCall() = 0;
146  virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
147  virtual void WritesDone() = 0;
148  virtual void Read(Response* resp) = 0;
149  virtual void AddHold(int holds) = 0;
150  virtual void RemoveHold() = 0;
151 
152  protected:
154  reactor->BindStream(this);
155  }
156 };
157 
158 template <class Response>
160  public:
162  virtual void StartCall() = 0;
163  virtual void Read(Response* resp) = 0;
164  virtual void AddHold(int holds) = 0;
165  virtual void RemoveHold() = 0;
166 
167  protected:
169  reactor->BindReader(this);
170  }
171 };
172 
173 template <class Request>
175  public:
177  virtual void StartCall() = 0;
178  void Write(const Request* req) { Write(req, ::grpc::WriteOptions()); }
179  virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
180  void WriteLast(const Request* req, ::grpc::WriteOptions options) {
181  Write(req, options.set_last_message());
182  }
183  virtual void WritesDone() = 0;
184 
185  virtual void AddHold(int holds) = 0;
186  virtual void RemoveHold() = 0;
187 
188  protected:
190  reactor->BindWriter(this);
191  }
192 };
193 
195  public:
196  virtual ~ClientCallbackUnary() {}
197  virtual void StartCall() = 0;
198 
199  protected:
200  void BindReactor(ClientUnaryReactor* reactor);
201 };
202 
203 // The following classes are the reactor interfaces that are to be implemented
204 // by the user. They are passed in to the library as an argument to a call on a
205 // stub (either a codegen-ed call or a generic call). The streaming RPC is
206 // activated by calling StartCall, possibly after initiating StartRead,
207 // StartWrite, or AddHold operations on the streaming object. Note that none of
208 // the classes are pure; all reactions have a default empty reaction so that the
209 // user class only needs to override those classes that it cares about.
210 // The reactor must be passed to the stub invocation before any of the below
211 // operations can be called.
212 
214 template <class Request, class Response>
215 class ClientBidiReactor : public internal::ClientReactor {
216  public:
217  virtual ~ClientBidiReactor() {}
218 
223  void StartCall() { stream_->StartCall(); }
224 
230  void StartRead(Response* resp) { stream_->Read(resp); }
231 
238  void StartWrite(const Request* req) {
239  StartWrite(req, ::grpc::WriteOptions());
240  }
241 
248  void StartWrite(const Request* req, ::grpc::WriteOptions options) {
249  stream_->Write(req, std::move(options));
250  }
251 
261  void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
262  StartWrite(req, std::move(options.set_last_message()));
263  }
264 
270  void StartWritesDone() { stream_->WritesDone(); }
271 
294  void AddHold() { AddMultipleHolds(1); }
295  void AddMultipleHolds(int holds) {
296  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
297  stream_->AddHold(holds);
298  }
299  void RemoveHold() { stream_->RemoveHold(); }
300 
308  void OnDone(const ::grpc::Status& /*s*/) override {}
309 
318  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
319 
324  virtual void OnReadDone(bool /*ok*/) {}
325 
331  virtual void OnWriteDone(bool /*ok*/) {}
332 
340  virtual void OnWritesDoneDone(bool /*ok*/) {}
341 
342  private:
343  friend class ClientCallbackReaderWriter<Request, Response>;
344  void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
345  stream_ = stream;
346  }
348 };
349 
352 template <class Response>
353 class ClientReadReactor : public internal::ClientReactor {
354  public:
355  virtual ~ClientReadReactor() {}
356 
357  void StartCall() { reader_->StartCall(); }
358  void StartRead(Response* resp) { reader_->Read(resp); }
359 
360  void AddHold() { AddMultipleHolds(1); }
361  void AddMultipleHolds(int holds) {
362  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
363  reader_->AddHold(holds);
364  }
365  void RemoveHold() { reader_->RemoveHold(); }
366 
367  void OnDone(const ::grpc::Status& /*s*/) override {}
368  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
369  virtual void OnReadDone(bool /*ok*/) {}
370 
371  private:
372  friend class ClientCallbackReader<Response>;
373  void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
375 };
376 
379 template <class Request>
380 class ClientWriteReactor : public internal::ClientReactor {
381  public:
382  virtual ~ClientWriteReactor() {}
383 
384  void StartCall() { writer_->StartCall(); }
385  void StartWrite(const Request* req) {
386  StartWrite(req, ::grpc::WriteOptions());
387  }
388  void StartWrite(const Request* req, ::grpc::WriteOptions options) {
389  writer_->Write(req, std::move(options));
390  }
391  void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
392  StartWrite(req, std::move(options.set_last_message()));
393  }
394  void StartWritesDone() { writer_->WritesDone(); }
395 
396  void AddHold() { AddMultipleHolds(1); }
397  void AddMultipleHolds(int holds) {
398  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
399  writer_->AddHold(holds);
400  }
401  void RemoveHold() { writer_->RemoveHold(); }
402 
403  void OnDone(const ::grpc::Status& /*s*/) override {}
404  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
405  virtual void OnWriteDone(bool /*ok*/) {}
406  virtual void OnWritesDoneDone(bool /*ok*/) {}
407 
408  private:
409  friend class ClientCallbackWriter<Request>;
410  void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
411 
413 };
414 
427  public:
428  virtual ~ClientUnaryReactor() {}
429 
430  void StartCall() { call_->StartCall(); }
431  void OnDone(const ::grpc::Status& /*s*/) override {}
432  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
433 
434  private:
435  friend class ClientCallbackUnary;
436  void BindCall(ClientCallbackUnary* call) { call_ = call; }
437  ClientCallbackUnary* call_;
438 };
439 
440 // Define function out-of-line from class to avoid forward declaration issue
442  reactor->BindCall(this);
443 }
444 
445 namespace internal {
446 
447 // Forward declare factory classes for friendship
448 template <class Request, class Response>
449 class ClientCallbackReaderWriterFactory;
450 template <class Response>
451 class ClientCallbackReaderFactory;
452 template <class Request>
453 class ClientCallbackWriterFactory;
454 
455 template <class Request, class Response>
457  : public ClientCallbackReaderWriter<Request, Response> {
458  public:
459  // always allocated against a call arena, no memory free required
460  static void operator delete(void* /*ptr*/, std::size_t size) {
462  }
463 
464  // This operator should never be called as the memory should be freed as part
465  // of the arena destruction. It only exists to provide a matching operator
466  // delete to the operator new so that some compilers will not complain (see
467  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
468  // there are no tests catching the compiler warning.
469  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
470 
471  void StartCall() override {
472  // This call initiates two batches, plus any backlog, each with a callback
473  // 1. Send initial metadata (unless corked) + recv initial metadata
474  // 2. Any read backlog
475  // 3. Any write backlog
476  // 4. Recv trailing metadata (unless corked)
477  if (!start_corked_) {
478  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
479  context_->initial_metadata_flags());
480  }
481 
482  call_.PerformOps(&start_ops_);
483 
484  {
485  grpc::internal::MutexLock lock(&start_mu_);
486 
487  if (backlog_.read_ops) {
488  call_.PerformOps(&read_ops_);
489  }
490  if (backlog_.write_ops) {
491  call_.PerformOps(&write_ops_);
492  }
493  if (backlog_.writes_done_ops) {
494  call_.PerformOps(&writes_done_ops_);
495  }
496  call_.PerformOps(&finish_ops_);
497  // The last thing in this critical section is to set started_ so that it
498  // can be used lock-free as well.
499  started_.store(true, std::memory_order_release);
500  }
501  // MaybeFinish outside the lock to make sure that destruction of this object
502  // doesn't take place while holding the lock (which would cause the lock to
503  // be released after destruction)
504  this->MaybeFinish(/*from_reaction=*/false);
505  }
506 
507  void Read(Response* msg) override {
508  read_ops_.RecvMessage(msg);
509  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
510  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
511  grpc::internal::MutexLock lock(&start_mu_);
512  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
513  backlog_.read_ops = true;
514  return;
515  }
516  }
517  call_.PerformOps(&read_ops_);
518  }
519 
520  void Write(const Request* msg, ::grpc::WriteOptions options) override {
521  if (options.is_last_message()) {
522  options.set_buffer_hint();
523  write_ops_.ClientSendClose();
524  }
525  // TODO(vjpai): don't assert
526  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
527  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
528  if (GPR_UNLIKELY(corked_write_needed_)) {
529  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
530  context_->initial_metadata_flags());
531  corked_write_needed_ = false;
532  }
533 
534  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
535  grpc::internal::MutexLock lock(&start_mu_);
536  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
537  backlog_.write_ops = true;
538  return;
539  }
540  }
541  call_.PerformOps(&write_ops_);
542  }
543  void WritesDone() override {
544  writes_done_ops_.ClientSendClose();
545  writes_done_tag_.Set(call_.call(),
546  [this](bool ok) {
547  reactor_->OnWritesDoneDone(ok);
548  MaybeFinish(/*from_reaction=*/true);
549  },
550  &writes_done_ops_, /*can_inline=*/false);
551  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
552  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
553  if (GPR_UNLIKELY(corked_write_needed_)) {
554  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
555  context_->initial_metadata_flags());
556  corked_write_needed_ = false;
557  }
558  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
559  grpc::internal::MutexLock lock(&start_mu_);
560  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
561  backlog_.writes_done_ops = true;
562  return;
563  }
564  }
565  call_.PerformOps(&writes_done_ops_);
566  }
567 
568  void AddHold(int holds) override {
569  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
570  }
571  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
572 
573  private:
574  friend class ClientCallbackReaderWriterFactory<Request, Response>;
575 
577  ::grpc_impl::ClientContext* context,
579  : context_(context),
580  call_(call),
581  reactor_(reactor),
582  start_corked_(context_->initial_metadata_corked_),
583  corked_write_needed_(start_corked_) {
584  this->BindReactor(reactor);
585 
586  // Set up the unchanging parts of the start, read, and write tags and ops.
587  start_tag_.Set(call_.call(),
588  [this](bool ok) {
589  reactor_->OnReadInitialMetadataDone(ok);
590  MaybeFinish(/*from_reaction=*/true);
591  },
592  &start_ops_, /*can_inline=*/false);
593  start_ops_.RecvInitialMetadata(context_);
594  start_ops_.set_core_cq_tag(&start_tag_);
595 
596  write_tag_.Set(call_.call(),
597  [this](bool ok) {
598  reactor_->OnWriteDone(ok);
599  MaybeFinish(/*from_reaction=*/true);
600  },
601  &write_ops_, /*can_inline=*/false);
602  write_ops_.set_core_cq_tag(&write_tag_);
603 
604  read_tag_.Set(call_.call(),
605  [this](bool ok) {
606  reactor_->OnReadDone(ok);
607  MaybeFinish(/*from_reaction=*/true);
608  },
609  &read_ops_, /*can_inline=*/false);
610  read_ops_.set_core_cq_tag(&read_tag_);
611 
612  // Also set up the Finish tag and op set.
613  finish_tag_.Set(
614  call_.call(),
615  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
616  &finish_ops_,
617  /*can_inline=*/false);
618  finish_ops_.ClientRecvStatus(context_, &finish_status_);
619  finish_ops_.set_core_cq_tag(&finish_tag_);
620  }
621 
622  // MaybeFinish can be called from reactions or from user-initiated operations
623  // like StartCall or RemoveHold. If this is the last operation or hold on this
624  // object, it will invoke the OnDone reaction. If MaybeFinish was called from
625  // a reaction, it can call OnDone directly. If not, it would need to schedule
626  // OnDone onto an executor thread to avoid the possibility of deadlocking with
627  // any locks in the user code that invoked it.
628  void MaybeFinish(bool from_reaction) {
629  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
630  1, std::memory_order_acq_rel) == 1)) {
631  ::grpc::Status s = std::move(finish_status_);
632  auto* reactor = reactor_;
633  auto* call = call_.call();
634  this->~ClientCallbackReaderWriterImpl();
636  if (GPR_LIKELY(from_reaction)) {
637  reactor->OnDone(s);
638  } else {
639  reactor->InternalScheduleOnDone(std::move(s));
640  }
641  }
642  }
643 
644  ::grpc_impl::ClientContext* const context_;
645  grpc::internal::Call call_;
646  ClientBidiReactor<Request, Response>* const reactor_;
647 
650  start_ops_;
652  const bool start_corked_;
653  bool corked_write_needed_; // no lock needed since only accessed in
654  // Write/WritesDone which cannot be concurrent
655 
658  ::grpc::Status finish_status_;
659 
663  write_ops_;
665 
668  writes_done_ops_;
670 
672  read_ops_;
674 
675  struct StartCallBacklog {
676  bool write_ops = false;
677  bool writes_done_ops = false;
678  bool read_ops = false;
679  };
680  StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
681 
682  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
683  std::atomic<intptr_t> callbacks_outstanding_{3};
684  std::atomic_bool started_{false};
685  grpc::internal::Mutex start_mu_;
686 };
687 
688 template <class Request, class Response>
689 class ClientCallbackReaderWriterFactory {
690  public:
691  static void Create(::grpc::ChannelInterface* channel,
692  const ::grpc::internal::RpcMethod& method,
693  ::grpc_impl::ClientContext* context,
695  grpc::internal::Call call =
696  channel->CreateCall(method, context, channel->CallbackCQ());
697 
702  reactor);
703  }
704 };
705 
706 template <class Response>
708  public:
709  // always allocated against a call arena, no memory free required
710  static void operator delete(void* /*ptr*/, std::size_t size) {
712  }
713 
714  // This operator should never be called as the memory should be freed as part
715  // of the arena destruction. It only exists to provide a matching operator
716  // delete to the operator new so that some compilers will not complain (see
717  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
718  // there are no tests catching the compiler warning.
719  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
720 
721  void StartCall() override {
722  // This call initiates two batches, plus any backlog, each with a callback
723  // 1. Send initial metadata (unless corked) + recv initial metadata
724  // 2. Any backlog
725  // 3. Recv trailing metadata
726 
727  start_tag_.Set(call_.call(),
728  [this](bool ok) {
729  reactor_->OnReadInitialMetadataDone(ok);
730  MaybeFinish(/*from_reaction=*/true);
731  },
732  &start_ops_, /*can_inline=*/false);
733  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
734  context_->initial_metadata_flags());
735  start_ops_.RecvInitialMetadata(context_);
736  start_ops_.set_core_cq_tag(&start_tag_);
737  call_.PerformOps(&start_ops_);
738 
739  // Also set up the read tag so it doesn't have to be set up each time
740  read_tag_.Set(call_.call(),
741  [this](bool ok) {
742  reactor_->OnReadDone(ok);
743  MaybeFinish(/*from_reaction=*/true);
744  },
745  &read_ops_, /*can_inline=*/false);
746  read_ops_.set_core_cq_tag(&read_tag_);
747 
748  {
749  grpc::internal::MutexLock lock(&start_mu_);
750  if (backlog_.read_ops) {
751  call_.PerformOps(&read_ops_);
752  }
753  started_.store(true, std::memory_order_release);
754  }
755 
756  finish_tag_.Set(
757  call_.call(),
758  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
759  &finish_ops_, /*can_inline=*/false);
760  finish_ops_.ClientRecvStatus(context_, &finish_status_);
761  finish_ops_.set_core_cq_tag(&finish_tag_);
762  call_.PerformOps(&finish_ops_);
763  }
764 
765  void Read(Response* msg) override {
766  read_ops_.RecvMessage(msg);
767  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
768  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
769  grpc::internal::MutexLock lock(&start_mu_);
770  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
771  backlog_.read_ops = true;
772  return;
773  }
774  }
775  call_.PerformOps(&read_ops_);
776  }
777 
778  void AddHold(int holds) override {
779  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
780  }
781  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
782 
783  private:
784  friend class ClientCallbackReaderFactory<Response>;
785 
786  template <class Request>
788  ::grpc_impl::ClientContext* context,
789  Request* request,
791  : context_(context), call_(call), reactor_(reactor) {
792  this->BindReactor(reactor);
793  // TODO(vjpai): don't assert
794  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
795  start_ops_.ClientSendClose();
796  }
797 
798  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
799  void MaybeFinish(bool from_reaction) {
800  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
801  1, std::memory_order_acq_rel) == 1)) {
802  ::grpc::Status s = std::move(finish_status_);
803  auto* reactor = reactor_;
804  auto* call = call_.call();
805  this->~ClientCallbackReaderImpl();
807  if (GPR_LIKELY(from_reaction)) {
808  reactor->OnDone(s);
809  } else {
810  reactor->InternalScheduleOnDone(std::move(s));
811  }
812  }
813  }
814 
815  ::grpc_impl::ClientContext* const context_;
816  grpc::internal::Call call_;
817  ClientReadReactor<Response>* const reactor_;
818 
823  start_ops_;
825 
828  ::grpc::Status finish_status_;
829 
831  read_ops_;
833 
834  struct StartCallBacklog {
835  bool read_ops = false;
836  };
837  StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
838 
839  // Minimum of 2 callbacks to pre-register for start and finish
840  std::atomic<intptr_t> callbacks_outstanding_{2};
841  std::atomic_bool started_{false};
842  grpc::internal::Mutex start_mu_;
843 };
844 
845 template <class Response>
846 class ClientCallbackReaderFactory {
847  public:
848  template <class Request>
849  static void Create(::grpc::ChannelInterface* channel,
850  const ::grpc::internal::RpcMethod& method,
851  ::grpc_impl::ClientContext* context,
852  const Request* request,
853  ClientReadReactor<Response>* reactor) {
854  grpc::internal::Call call =
855  channel->CreateCall(method, context, channel->CallbackCQ());
856 
859  call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
860  ClientCallbackReaderImpl<Response>(call, context, request, reactor);
861  }
862 };
863 
864 template <class Request>
866  public:
867  // always allocated against a call arena, no memory free required
868  static void operator delete(void* /*ptr*/, std::size_t size) {
870  }
871 
872  // This operator should never be called as the memory should be freed as part
873  // of the arena destruction. It only exists to provide a matching operator
874  // delete to the operator new so that some compilers will not complain (see
875  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
876  // there are no tests catching the compiler warning.
877  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
878 
879  void StartCall() override {
880  // This call initiates two batches, plus any backlog, each with a callback
881  // 1. Send initial metadata (unless corked) + recv initial metadata
882  // 2. Any backlog
883  // 3. Recv trailing metadata
884 
885  if (!start_corked_) {
886  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
887  context_->initial_metadata_flags());
888  }
889  call_.PerformOps(&start_ops_);
890 
891  {
892  grpc::internal::MutexLock lock(&start_mu_);
893 
894  if (backlog_.write_ops) {
895  call_.PerformOps(&write_ops_);
896  }
897  if (backlog_.writes_done_ops) {
898  call_.PerformOps(&writes_done_ops_);
899  }
900  call_.PerformOps(&finish_ops_);
901  // The last thing in this critical section is to set started_ so that it
902  // can be used lock-free as well.
903  started_.store(true, std::memory_order_release);
904  }
905  // MaybeFinish outside the lock to make sure that destruction of this object
906  // doesn't take place while holding the lock (which would cause the lock to
907  // be released after destruction)
908  this->MaybeFinish(/*from_reaction=*/false);
909  }
910 
911  void Write(const Request* msg, ::grpc::WriteOptions options) override {
912  if (GPR_UNLIKELY(options.is_last_message())) {
913  options.set_buffer_hint();
914  write_ops_.ClientSendClose();
915  }
916  // TODO(vjpai): don't assert
917  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
918  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
919 
920  if (GPR_UNLIKELY(corked_write_needed_)) {
921  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
922  context_->initial_metadata_flags());
923  corked_write_needed_ = false;
924  }
925 
926  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
927  grpc::internal::MutexLock lock(&start_mu_);
928  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
929  backlog_.write_ops = true;
930  return;
931  }
932  }
933  call_.PerformOps(&write_ops_);
934  }
935 
936  void WritesDone() override {
937  writes_done_ops_.ClientSendClose();
938  writes_done_tag_.Set(call_.call(),
939  [this](bool ok) {
940  reactor_->OnWritesDoneDone(ok);
941  MaybeFinish(/*from_reaction=*/true);
942  },
943  &writes_done_ops_, /*can_inline=*/false);
944  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
945  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
946 
947  if (GPR_UNLIKELY(corked_write_needed_)) {
948  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
949  context_->initial_metadata_flags());
950  corked_write_needed_ = false;
951  }
952 
953  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
954  grpc::internal::MutexLock lock(&start_mu_);
955  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
956  backlog_.writes_done_ops = true;
957  return;
958  }
959  }
960  call_.PerformOps(&writes_done_ops_);
961  }
962 
963  void AddHold(int holds) override {
964  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
965  }
966  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
967 
968  private:
969  friend class ClientCallbackWriterFactory<Request>;
970 
971  template <class Response>
973  ::grpc_impl::ClientContext* context,
974  Response* response,
976  : context_(context),
977  call_(call),
978  reactor_(reactor),
979  start_corked_(context_->initial_metadata_corked_),
980  corked_write_needed_(start_corked_) {
981  this->BindReactor(reactor);
982 
983  // Set up the unchanging parts of the start and write tags and ops.
984  start_tag_.Set(call_.call(),
985  [this](bool ok) {
986  reactor_->OnReadInitialMetadataDone(ok);
987  MaybeFinish(/*from_reaction=*/true);
988  },
989  &start_ops_, /*can_inline=*/false);
990  start_ops_.RecvInitialMetadata(context_);
991  start_ops_.set_core_cq_tag(&start_tag_);
992 
993  write_tag_.Set(call_.call(),
994  [this](bool ok) {
995  reactor_->OnWriteDone(ok);
996  MaybeFinish(/*from_reaction=*/true);
997  },
998  &write_ops_, /*can_inline=*/false);
999  write_ops_.set_core_cq_tag(&write_tag_);
1000 
1001  // Also set up the Finish tag and op set.
1002  finish_ops_.RecvMessage(response);
1003  finish_ops_.AllowNoMessage();
1004  finish_tag_.Set(
1005  call_.call(),
1006  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1007  &finish_ops_,
1008  /*can_inline=*/false);
1009  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1010  finish_ops_.set_core_cq_tag(&finish_tag_);
1011  }
1012 
1013  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
1014  void MaybeFinish(bool from_reaction) {
1015  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1016  1, std::memory_order_acq_rel) == 1)) {
1017  ::grpc::Status s = std::move(finish_status_);
1018  auto* reactor = reactor_;
1019  auto* call = call_.call();
1020  this->~ClientCallbackWriterImpl();
1022  if (GPR_LIKELY(from_reaction)) {
1023  reactor->OnDone(s);
1024  } else {
1025  reactor->InternalScheduleOnDone(std::move(s));
1026  }
1027  }
1028  }
1029 
1030  ::grpc_impl::ClientContext* const context_;
1031  grpc::internal::Call call_;
1032  ClientWriteReactor<Request>* const reactor_;
1033 
1036  start_ops_;
1038  const bool start_corked_;
1039  bool corked_write_needed_; // no lock needed since only accessed in
1040  // Write/WritesDone which cannot be concurrent
1041 
1044  finish_ops_;
1046  ::grpc::Status finish_status_;
1047 
1051  write_ops_;
1053 
1056  writes_done_ops_;
1057  grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1058 
1059  struct StartCallBacklog {
1060  bool write_ops = false;
1061  bool writes_done_ops = false;
1062  };
1063  StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
1064 
1065  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1066  std::atomic<intptr_t> callbacks_outstanding_{3};
1067  std::atomic_bool started_{false};
1068  grpc::internal::Mutex start_mu_;
1069 };
1070 
1071 template <class Request>
1072 class ClientCallbackWriterFactory {
1073  public:
1074  template <class Response>
1075  static void Create(::grpc::ChannelInterface* channel,
1076  const ::grpc::internal::RpcMethod& method,
1077  ::grpc_impl::ClientContext* context, Response* response,
1078  ClientWriteReactor<Request>* reactor) {
1079  grpc::internal::Call call =
1080  channel->CreateCall(method, context, channel->CallbackCQ());
1081 
1084  call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
1085  ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1086  }
1087 };
1088 
1090  public:
1091  // always allocated against a call arena, no memory free required
1092  static void operator delete(void* /*ptr*/, std::size_t size) {
1094  }
1095 
1096  // This operator should never be called as the memory should be freed as part
1097  // of the arena destruction. It only exists to provide a matching operator
1098  // delete to the operator new so that some compilers will not complain (see
1099  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1100  // there are no tests catching the compiler warning.
1101  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
1102 
1103  void StartCall() override {
1104  // This call initiates two batches, each with a callback
1105  // 1. Send initial metadata + write + writes done + recv initial metadata
1106  // 2. Read message, recv trailing metadata
1107 
1108  start_tag_.Set(call_.call(),
1109  [this](bool ok) {
1110  reactor_->OnReadInitialMetadataDone(ok);
1111  MaybeFinish();
1112  },
1113  &start_ops_, /*can_inline=*/false);
1114  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1115  context_->initial_metadata_flags());
1116  start_ops_.RecvInitialMetadata(context_);
1117  start_ops_.set_core_cq_tag(&start_tag_);
1118  call_.PerformOps(&start_ops_);
1119 
1120  finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
1121  &finish_ops_,
1122  /*can_inline=*/false);
1123  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1124  finish_ops_.set_core_cq_tag(&finish_tag_);
1125  call_.PerformOps(&finish_ops_);
1126  }
1127 
1128  private:
1130 
1131  template <class Request, class Response>
1133  ::grpc_impl::ClientContext* context, Request* request,
1134  Response* response, ClientUnaryReactor* reactor)
1135  : context_(context), call_(call), reactor_(reactor) {
1136  this->BindReactor(reactor);
1137  // TODO(vjpai): don't assert
1138  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
1139  start_ops_.ClientSendClose();
1140  finish_ops_.RecvMessage(response);
1141  finish_ops_.AllowNoMessage();
1142  }
1143 
1144  // In the unary case, MaybeFinish is only ever invoked from a
1145  // library-initiated reaction, so it will just directly call OnDone if this is
1146  // the last reaction for this RPC.
1147  void MaybeFinish() {
1148  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1149  1, std::memory_order_acq_rel) == 1)) {
1150  ::grpc::Status s = std::move(finish_status_);
1151  auto* reactor = reactor_;
1152  auto* call = call_.call();
1153  this->~ClientCallbackUnaryImpl();
1155  reactor->OnDone(s);
1156  }
1157  }
1158 
1159  ::grpc_impl::ClientContext* const context_;
1160  grpc::internal::Call call_;
1161  ClientUnaryReactor* const reactor_;
1162 
1167  start_ops_;
1169 
1172  finish_ops_;
1174  ::grpc::Status finish_status_;
1175 
1176  // This call will have 2 callbacks: start and finish
1177  std::atomic<intptr_t> callbacks_outstanding_{2};
1178 };
1179 
1181  public:
1182  template <class Request, class Response>
1183  static void Create(::grpc::ChannelInterface* channel,
1184  const ::grpc::internal::RpcMethod& method,
1185  ::grpc_impl::ClientContext* context,
1186  const Request* request, Response* response,
1187  ClientUnaryReactor* reactor) {
1188  grpc::internal::Call call =
1189  channel->CreateCall(method, context, channel->CallbackCQ());
1190 
1192 
1194  call.call(), sizeof(ClientCallbackUnaryImpl)))
1195  ClientCallbackUnaryImpl(call, context, request, response, reactor);
1196  }
1197 };
1198 
1199 } // namespace internal
1200 } // namespace grpc_impl
1201 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:136
grpc_impl::ClientCallbackWriter::WritesDone
virtual void WritesDone()=0
grpc_impl::internal::ClientCallbackWriterImpl::StartCall
void StartCall() override
Definition: client_callback_impl.h:879
grpc::internal::CallOpRecvInitialMetadata
Definition: call_op_set.h:720
grpc_impl::internal::ClientCallbackReaderImpl::RemoveHold
void RemoveHold() override
Definition: client_callback_impl.h:781
grpc_impl::internal::ClientCallbackWriterImpl::Write
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback_impl.h:911
grpc_impl::internal::ClientCallbackReaderWriterFactory
Definition: channel_interface.h:48
grpc_impl::ClientCallbackWriter
Definition: client_callback_impl.h:174
grpc::internal::CallOpClientSendClose
Definition: call_op_set.h:618
grpc_impl::ClientUnaryReactor::StartCall
void StartCall()
Definition: client_callback_impl.h:430
grpc_impl::internal::ClientCallbackWriterImpl
Definition: client_callback_impl.h:865
grpc_impl::ClientReadReactor::OnReadDone
virtual void OnReadDone(bool)
Definition: client_callback_impl.h:369
grpc_impl::internal::ClientCallbackReaderImpl::Read
void Read(Response *msg) override
Definition: client_callback_impl.h:765
grpc::internal::CallOpGenericRecvMessage
Definition: call_op_set.h:525
grpc_impl::ClientCallbackWriter::Write
void Write(const Request *req)
Definition: client_callback_impl.h:178
grpc_impl::ClientBidiReactor::StartRead
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback_impl.h:230
grpc
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:850
status.h
grpc::CoreCodegenInterface::grpc_call_ref
virtual void grpc_call_ref(grpc_call *call)=0
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:287
grpc::WriteOptions::set_last_message
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:161
grpc_impl::ClientCallbackUnary::StartCall
virtual void StartCall()=0
grpc::CoreCodegenInterface::grpc_call_arena_alloc
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
grpc_impl::ClientCallbackReaderWriter::RemoveHold
virtual void RemoveHold()=0
GPR_LIKELY
#define GPR_LIKELY(x)
Definition: port_platform.h:680
grpc_impl::ClientUnaryReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback_impl.h:432
grpc_impl::internal::ClientReactor::OnDone
virtual void OnDone(const ::grpc::Status &)=0
Called by the library when all operations associated with this RPC have completed and all Holds have ...
grpc_impl::ClientCallbackReaderWriter::~ClientCallbackReaderWriter
virtual ~ClientCallbackReaderWriter()
Definition: client_callback_impl.h:144
config.h
grpc_impl::ClientCallbackReaderWriter::WritesDone
virtual void WritesDone()=0
grpc_impl::ClientWriteReactor::StartWriteLast
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:391
grpc_impl::internal::CallbackUnaryCallImpl
Definition: channel_interface.h:38
grpc_impl::ClientUnaryReactor::~ClientUnaryReactor
virtual ~ClientUnaryReactor()
Definition: client_callback_impl.h:428
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:38
grpc_impl::internal::ClientReactor
Definition: client_callback_impl.h:106
grpc_impl::internal::ClientCallbackWriterFactory
Definition: channel_interface.h:52
grpc_impl::ClientBidiReactor::OnDone
void OnDone(const ::grpc::Status &) override
Notifies the application that all operations associated with this RPC have completed and all Holds ha...
Definition: client_callback_impl.h:308
core_codegen_interface.h
grpc_impl::internal::ClientCallbackUnaryImpl::StartCall
void StartCall() override
Definition: client_callback_impl.h:1103
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:217
grpc_impl::ClientCallbackWriter::BindReactor
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback_impl.h:189
grpc_impl::ClientBidiReactor::RemoveHold
void RemoveHold()
Definition: client_callback_impl.h:299
grpc_impl::ClientCallbackReaderWriter::StartCall
virtual void StartCall()=0
grpc::internal::CallbackWithStatusTag
Definition: callback_common.h:68
grpc_impl::ClientReadReactor::StartRead
void StartRead(Response *resp)
Definition: client_callback_impl.h:358
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:118
grpc_impl::ClientBidiReactor::StartWriteLast
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback_impl.h:261
grpc_impl::internal::ClientCallbackWriterFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, Response *response, ClientWriteReactor< Request > *reactor)
Definition: client_callback_impl.h:1075
grpc_impl::ClientCallbackReader::StartCall
virtual void StartCall()=0
grpc_impl::ClientCallbackWriter::StartCall
virtual void StartCall()=0
grpc_impl::ClientBidiReactor
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback_impl.h:131
grpc_impl::ClientReadReactor::RemoveHold
void RemoveHold()
Definition: client_callback_impl.h:365
grpc_impl::internal::ClientCallbackReaderImpl::StartCall
void StartCall() override
Definition: client_callback_impl.h:721
grpc_impl::internal::ClientCallbackReaderImpl::AddHold
void AddHold(int holds) override
Definition: client_callback_impl.h:778
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:31
grpc_impl::ClientCallbackReader::Read
virtual void Read(Response *resp)=0
grpc_impl::ClientBidiReactor::StartWrite
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback_impl.h:238
grpc_impl::ClientReadReactor
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback_impl.h:133
grpc_impl::internal::ClientCallbackReaderWriterImpl::Write
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback_impl.h:520
grpc_impl::ClientCallbackReader
Definition: client_callback_impl.h:159
grpc_impl::ClientBidiReactor::OnReadDone
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: client_callback_impl.h:324
grpc_impl::ClientBidiReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:295
grpc_impl::internal::ClientCallbackReaderWriterImpl
Definition: client_callback_impl.h:456
grpc_impl::ClientBidiReactor::~ClientBidiReactor
virtual ~ClientBidiReactor()
Definition: client_callback_impl.h:217
grpc_impl::ClientCallbackUnary
Definition: client_callback_impl.h:194
grpc_impl::internal::ClientCallbackReaderWriterImpl::RemoveHold
void RemoveHold() override
Definition: client_callback_impl.h:571
grpc_impl::ClientCallbackWriter::RemoveHold
virtual void RemoveHold()=0
grpc_impl::ClientWriteReactor::OnDone
void OnDone(const ::grpc::Status &) override
Definition: client_callback_impl.h:403
grpc_impl::ClientReadReactor::~ClientReadReactor
virtual ~ClientReadReactor()
Definition: client_callback_impl.h:355
grpc_impl::ClientUnaryReactor
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback_impl.h:426
grpc_impl::ClientUnaryReactor::OnDone
void OnDone(const ::grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback_impl.h:431
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: port_platform.h:681
grpc_impl::ClientCallbackReaderWriter::Write
virtual void Write(const Request *req, ::grpc::WriteOptions options)=0
grpc::experimental::ClientUnaryReactor
::grpc_impl::ClientUnaryReactor ClientUnaryReactor
Definition: client_callback.h:71
grpc_impl::ClientBidiReactor::AddHold
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback_impl.h:294
grpc::experimental::ClientReadReactor
::grpc_impl::ClientReadReactor< Response > ClientReadReactor
Definition: client_callback.h:63
grpc::ChannelInterface
Codegen interface for grpc::Channel.
Definition: channel_interface.h:74
grpc_impl::internal::ClientCallbackReaderWriterImpl::AddHold
void AddHold(int holds) override
Definition: client_callback_impl.h:568
grpc_impl::ClientWriteReactor::StartCall
void StartCall()
Definition: client_callback_impl.h:384
grpc_impl::ClientWriteReactor::OnWriteDone
virtual void OnWriteDone(bool)
Definition: client_callback_impl.h:405
grpc_impl::ClientBidiReactor::OnWritesDoneDone
virtual void OnWritesDoneDone(bool)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback_impl.h:340
grpc_impl::internal::ClientCallbackWriterImpl::RemoveHold
void RemoveHold() override
Definition: client_callback_impl.h:966
grpc_impl::ClientBidiReactor::StartCall
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call.
Definition: client_callback_impl.h:223
grpc_impl::ClientWriteReactor::RemoveHold
void RemoveHold()
Definition: client_callback_impl.h:401
grpc_impl::ClientCallbackUnary::~ClientCallbackUnary
virtual ~ClientCallbackUnary()
Definition: client_callback_impl.h:196
grpc_impl::ClientReadReactor::StartCall
void StartCall()
Definition: client_callback_impl.h:357
grpc_impl::ClientWriteReactor::StartWritesDone
void StartWritesDone()
Definition: client_callback_impl.h:394
grpc_impl::ClientCallbackReaderWriter::Read
virtual void Read(Response *resp)=0
grpc::protobuf::util::Status
::google::protobuf::util::Status Status
Definition: config_protobuf.h:90
grpc_impl::internal::ClientCallbackReaderImpl
Definition: client_callback_impl.h:707
grpc_impl::internal::ClientCallbackUnaryFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const Request *request, Response *response, ClientUnaryReactor *reactor)
Definition: client_callback_impl.h:1183
grpc_impl::ClientCallbackReader::AddHold
virtual void AddHold(int holds)=0
grpc_impl::ClientReadReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback_impl.h:368
grpc_impl::internal::ClientCallbackReaderWriterImpl::StartCall
void StartCall() override
Definition: client_callback_impl.h:471
grpc::CoreCodegenInterface::grpc_call_unref
virtual void grpc_call_unref(grpc_call *call)=0
grpc_impl::internal::ClientCallbackReaderFactory
Definition: channel_interface.h:50
grpc_impl::ClientBidiReactor::StartWritesDone
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback_impl.h:270
grpc_impl::ClientCallbackReader::RemoveHold
virtual void RemoveHold()=0
grpc_impl::ClientWriteReactor::~ClientWriteReactor
virtual ~ClientWriteReactor()
Definition: client_callback_impl.h:382
grpc_impl::ClientCallbackReaderWriter
Definition: client_callback_impl.h:142
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:79
grpc_impl::ClientReadReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:361
callback_common.h
grpc::internal::CallbackWithSuccessTag::Set
void Set(grpc_call *call, std::function< void(bool)> f, CompletionQueueTag *ops, bool can_inline)
Definition: callback_common.h:164
grpc_impl::ClientWriteReactor
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback_impl.h:135
grpc::internal::MutexLock
Definition: sync.h:69
grpc_impl::ClientCallbackReader::~ClientCallbackReader
virtual ~ClientCallbackReader()
Definition: client_callback_impl.h:161
grpc_impl::CompletionQueue
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue_impl.h:103
channel_interface.h
grpc_impl::internal::ClientCallbackReaderFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const Request *request, ClientReadReactor< Response > *reactor)
Definition: client_callback_impl.h:849
grpc_impl::ClientWriteReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:397
grpc_impl::internal::ClientCallbackUnaryFactory
Definition: client_callback_impl.h:1180
grpc_impl::internal::ClientCallbackReaderWriterFactory::Create
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback_impl.h:691
grpc_impl::ClientCallbackWriter::WriteLast
void WriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:180
grpc::internal::Call::PerformOps
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:68
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:72
grpc_impl::ClientCallbackReader::BindReactor
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback_impl.h:168
call.h
call_op_set.h
grpc::internal::CallOpClientRecvStatus
Definition: call_op_set.h:768
grpc::WriteOptions::set_buffer_hint
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:122
grpc_impl::ClientCallbackUnary::BindReactor
void BindReactor(ClientUnaryReactor *reactor)
Definition: client_callback_impl.h:441
grpc_impl::internal::CallbackUnaryCallImpl::CallbackUnaryCallImpl
CallbackUnaryCallImpl(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Definition: client_callback_impl.h:58
grpc_impl::ClientReadReactor::OnDone
void OnDone(const ::grpc::Status &) override
Definition: client_callback_impl.h:367
grpc_impl::internal::ClientCallbackReaderWriterImpl::WritesDone
void WritesDone() override
Definition: client_callback_impl.h:543
grpc_impl
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
grpc_impl::ClientWriteReactor::OnWritesDoneDone
virtual void OnWritesDoneDone(bool)
Definition: client_callback_impl.h:406
grpc_impl::ClientWriteReactor::StartWrite
void StartWrite(const Request *req)
Definition: client_callback_impl.h:385
grpc_impl::ClientBidiReactor::StartWrite
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback_impl.h:248
grpc_impl::ClientCallbackReaderWriter::AddHold
virtual void AddHold(int holds)=0
grpc_impl::internal::ClientCallbackUnaryImpl
Definition: client_callback_impl.h:1089
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue_impl.h:93
GPR_CODEGEN_ASSERT
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
grpc_impl::internal::CallbackUnaryCall
void CallbackUnaryCall(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Perform a callback-based unary call TODO(vjpai): Combine as much as possible with the blocking unary ...
Definition: client_callback_impl.h:46
grpc::WriteOptions::is_last_message
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:186
grpc_impl::ClientContext
A ClientContext allows the person implementing a service client to:
Definition: client_context_impl.h:184
grpc_impl::ClientWriteReactor::AddHold
void AddHold()
Definition: client_callback_impl.h:396
grpc_impl::internal::ClientCallbackWriterImpl::WritesDone
void WritesDone() override
Definition: client_callback_impl.h:936
grpc_impl::ClientCallbackReaderWriter::BindReactor
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback_impl.h:153
grpc_impl::ClientWriteReactor::StartWrite
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:388
grpc::experimental::ClientBidiReactor
::grpc_impl::ClientBidiReactor< Request, Response > ClientBidiReactor
Definition: client_callback.h:69
grpc_impl::ClientReadReactor::AddHold
void AddHold()
Definition: client_callback_impl.h:360
grpc::internal::CallOpRecvMessage
Definition: byte_buffer.h:58
grpc::ClientContext
::grpc_impl::ClientContext ClientContext
Definition: client_context.h:26
grpc::internal::Mutex
Definition: sync.h:47
grpc_impl::internal::ClientCallbackReaderWriterImpl::Read
void Read(Response *msg) override
Definition: client_callback_impl.h:507
grpc_impl::ClientCallbackWriter::~ClientCallbackWriter
virtual ~ClientCallbackWriter()
Definition: client_callback_impl.h:176
grpc_impl::ClientBidiReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Notifies the application that a read of initial metadata from the server is done.
Definition: client_callback_impl.h:318
grpc_impl::ClientCallbackWriter::AddHold
virtual void AddHold(int holds)=0
grpc::internal::CallOpSet::set_core_cq_tag
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:939
grpc::experimental::ClientWriteReactor
::grpc_impl::ClientWriteReactor< Request > ClientWriteReactor
Definition: client_callback.h:66
grpc_impl::internal::ClientReactor::InternalScheduleOnDone
virtual void InternalScheduleOnDone(::grpc::Status s)
InternalScheduleOnDone is not part of the API and is not meant to be overridden.
grpc_impl::ClientWriteReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback_impl.h:404
grpc_impl::internal::ClientCallbackWriterImpl::AddHold
void AddHold(int holds) override
Definition: client_callback_impl.h:963
grpc::Channel
::grpc_impl::Channel Channel
Definition: channel.h:26
grpc_impl::ClientBidiReactor::OnWriteDone
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite or StartWriteLast operation completed.
Definition: client_callback_impl.h:331
GPR_CODEGEN_DEBUG_ASSERT
#define GPR_CODEGEN_DEBUG_ASSERT(x)
Codegen specific version of GPR_DEBUG_ASSERT.
Definition: core_codegen_interface.h:155