18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H 19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H 45 template <
class InputMessage,
class OutputMessage>
47 const ::grpc::internal::RpcMethod& method,
49 const InputMessage* request, OutputMessage* result,
52 channel, method, context, request, result, on_completion);
55 template <
class InputMessage,
class OutputMessage>
59 const ::grpc::internal::RpcMethod& method,
61 const InputMessage* request, OutputMessage* result,
76 call.call(),
sizeof(FullCallOpSet))) FullCallOpSet;
88 ops->SendInitialMetadata(&context->send_initial_metadata_,
89 context->initial_metadata_flags());
90 ops->RecvInitialMetadata(context);
91 ops->RecvMessage(result);
92 ops->AllowNoMessage();
93 ops->ClientSendClose();
94 ops->ClientRecvStatus(context, tag->status_ptr());
95 ops->set_core_cq_tag(tag);
101 namespace experimental {
104 template <
class Request,
class Response>
106 template <
class Response>
108 template <
class Request>
115 template <
class Request,
class Response>
119 virtual void StartCall() = 0;
121 virtual void WritesDone() = 0;
122 virtual void Read(Response* resp) = 0;
123 virtual void AddHold(
int holds) = 0;
124 virtual void RemoveHold() = 0;
128 reactor->BindStream(
this);
132 template <
class Response>
136 virtual void StartCall() = 0;
137 virtual void Read(Response* resp) = 0;
138 virtual void AddHold(
int holds) = 0;
139 virtual void RemoveHold() = 0;
143 reactor->BindReader(
this);
147 template <
class Request>
151 virtual void StartCall() = 0;
157 virtual void WritesDone() = 0;
159 virtual void AddHold(
int holds) = 0;
160 virtual void RemoveHold() = 0;
164 reactor->BindWriter(
this);
171 virtual void StartCall() = 0;
188 template <
class Request,
class Response>
223 stream_->Write(req, std::move(options));
316 template <
class Response>
340 template <
class Request>
350 writer_->Write(req, std::move(options));
399 reactor->BindCall(
this);
407 template <
class Request,
class Response>
408 class ClientCallbackReaderWriterFactory;
409 template <
class Response>
410 class ClientCallbackReaderFactory;
411 template <
class Request>
412 class ClientCallbackWriterFactory;
414 template <
class Request,
class Response>
419 static void operator delete(
void* ptr, std::size_t size) {
428 static void operator delete(
void*,
void*) { assert(0); }
432 1, std::memory_order_acq_rel) == 1)) {
434 auto* reactor = reactor_;
435 auto* call = call_.call();
450 start_tag_.Set(call_.call(),
452 reactor_->OnReadInitialMetadataDone(ok);
456 if (!start_corked_) {
457 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
458 context_->initial_metadata_flags());
460 start_ops_.RecvInitialMetadata(context_);
461 start_ops_.set_core_cq_tag(&start_tag_);
462 call_.PerformOps(&start_ops_);
466 write_tag_.Set(call_.call(),
468 reactor_->OnWriteDone(ok);
472 write_ops_.set_core_cq_tag(&write_tag_);
474 read_tag_.Set(call_.call(),
476 reactor_->OnReadDone(ok);
480 read_ops_.set_core_cq_tag(&read_tag_);
481 if (read_ops_at_start_) {
482 call_.PerformOps(&read_ops_);
485 if (write_ops_at_start_) {
486 call_.PerformOps(&write_ops_);
489 if (writes_done_ops_at_start_) {
490 call_.PerformOps(&writes_done_ops_);
493 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
495 finish_ops_.ClientRecvStatus(context_, &finish_status_);
496 finish_ops_.set_core_cq_tag(&finish_tag_);
497 call_.PerformOps(&finish_ops_);
500 void Read(Response* msg)
override {
501 read_ops_.RecvMessage(msg);
502 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
504 call_.PerformOps(&read_ops_);
506 read_ops_at_start_ =
true;
512 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
513 context_->initial_metadata_flags());
514 start_corked_ =
false;
519 write_ops_.ClientSendClose();
523 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
525 call_.PerformOps(&write_ops_);
527 write_ops_at_start_ =
true;
532 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
533 context_->initial_metadata_flags());
534 start_corked_ =
false;
536 writes_done_ops_.ClientSendClose();
537 writes_done_tag_.Set(call_.call(),
539 reactor_->OnWritesDoneDone(ok);
543 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
544 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
546 call_.PerformOps(&writes_done_ops_);
548 writes_done_ops_at_start_ =
true;
553 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
566 start_corked_(context_->initial_metadata_corked_) {
567 this->BindReactor(reactor);
589 bool write_ops_at_start_{
false};
595 bool writes_done_ops_at_start_{
false};
600 bool read_ops_at_start_{
false};
603 std::atomic<intptr_t> callbacks_outstanding_{2};
604 bool started_{
false};
607 template <
class Request,
class Response>
612 const ::grpc::internal::RpcMethod& method,
616 channel->CreateCall(method, context, channel->CallbackCQ());
626 template <
class Response>
631 static void operator delete(
void* ptr, std::size_t size) {
640 static void operator delete(
void*,
void*) { assert(0); }
644 1, std::memory_order_acq_rel) == 1)) {
646 auto* reactor = reactor_;
647 auto* call = call_.call();
661 start_tag_.Set(call_.call(),
663 reactor_->OnReadInitialMetadataDone(ok);
667 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
668 context_->initial_metadata_flags());
669 start_ops_.RecvInitialMetadata(context_);
670 start_ops_.set_core_cq_tag(&start_tag_);
671 call_.PerformOps(&start_ops_);
674 read_tag_.Set(call_.call(),
676 reactor_->OnReadDone(ok);
680 read_ops_.set_core_cq_tag(&read_tag_);
681 if (read_ops_at_start_) {
682 call_.PerformOps(&read_ops_);
685 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
687 finish_ops_.ClientRecvStatus(context_, &finish_status_);
688 finish_ops_.set_core_cq_tag(&finish_tag_);
689 call_.PerformOps(&finish_ops_);
692 void Read(Response* msg)
override {
693 read_ops_.RecvMessage(msg);
694 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
696 call_.PerformOps(&read_ops_);
698 read_ops_at_start_ =
true;
703 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
710 template <
class Request>
715 : context_(context), call_(call), reactor_(reactor) {
716 this->BindReactor(reactor);
719 start_ops_.ClientSendClose();
740 bool read_ops_at_start_{
false};
743 std::atomic<intptr_t> callbacks_outstanding_{2};
744 bool started_{
false};
747 template <
class Response>
750 template <
class Request>
752 const ::grpc::internal::RpcMethod& method,
754 const Request* request,
757 channel->CreateCall(method, context, channel->CallbackCQ());
766 template <
class Request>
771 static void operator delete(
void* ptr, std::size_t size) {
780 static void operator delete(
void*,
void*) { assert(0); }
784 1, std::memory_order_acq_rel) == 1)) {
786 auto* reactor = reactor_;
787 auto* call = call_.
call();
801 start_tag_.Set(call_.call(),
803 reactor_->OnReadInitialMetadataDone(ok);
807 if (!start_corked_) {
808 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
809 context_->initial_metadata_flags());
811 start_ops_.RecvInitialMetadata(context_);
812 start_ops_.set_core_cq_tag(&start_tag_);
813 call_.PerformOps(&start_ops_);
817 write_tag_.Set(call_.call(),
819 reactor_->OnWriteDone(ok);
823 write_ops_.set_core_cq_tag(&write_tag_);
825 if (write_ops_at_start_) {
826 call_.PerformOps(&write_ops_);
829 if (writes_done_ops_at_start_) {
830 call_.PerformOps(&writes_done_ops_);
833 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
835 finish_ops_.ClientRecvStatus(context_, &finish_status_);
836 finish_ops_.set_core_cq_tag(&finish_tag_);
837 call_.PerformOps(&finish_ops_);
842 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
843 context_->initial_metadata_flags());
844 start_corked_ =
false;
849 write_ops_.ClientSendClose();
853 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
855 call_.PerformOps(&write_ops_);
857 write_ops_at_start_ =
true;
862 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
863 context_->initial_metadata_flags());
864 start_corked_ =
false;
866 writes_done_ops_.ClientSendClose();
867 writes_done_tag_.Set(call_.call(),
869 reactor_->OnWritesDoneDone(ok);
873 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
874 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
876 call_.PerformOps(&writes_done_ops_);
878 writes_done_ops_at_start_ =
true;
883 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
890 template <
class Response>
898 start_corked_(context_->initial_metadata_corked_) {
899 this->BindReactor(reactor);
900 finish_ops_.RecvMessage(response);
901 finish_ops_.AllowNoMessage();
925 bool write_ops_at_start_{
false};
931 bool writes_done_ops_at_start_{
false};
934 std::atomic<intptr_t> callbacks_outstanding_{2};
935 bool started_{
false};
938 template <
class Request>
941 template <
class Response>
943 const ::grpc::internal::RpcMethod& method,
947 channel->CreateCall(method, context, channel->CallbackCQ());
959 static void operator delete(
void* ptr, std::size_t size) {
968 static void operator delete(
void*,
void*) { assert(0); }
976 start_tag_.Set(call_.call(),
978 reactor_->OnReadInitialMetadataDone(ok);
982 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
983 context_->initial_metadata_flags());
984 start_ops_.RecvInitialMetadata(context_);
985 start_ops_.set_core_cq_tag(&start_tag_);
986 call_.PerformOps(&start_ops_);
988 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
990 finish_ops_.ClientRecvStatus(context_, &finish_status_);
991 finish_ops_.set_core_cq_tag(&finish_tag_);
992 call_.PerformOps(&finish_ops_);
997 1, std::memory_order_acq_rel) == 1)) {
999 auto* reactor = reactor_;
1000 auto* call = call_.
call();
1010 template <
class Request,
class Response>
1015 : context_(context), call_(call), reactor_(reactor) {
1016 this->BindReactor(reactor);
1019 start_ops_.ClientSendClose();
1020 finish_ops_.RecvMessage(response);
1021 finish_ops_.AllowNoMessage();
1042 std::atomic<intptr_t> callbacks_outstanding_{2};
1043 bool started_{
false};
1048 template <
class Request,
class Response>
1050 const ::grpc::internal::RpcMethod& method,
1052 const Request* request, Response* response,
1055 channel->CreateCall(method, context, channel->CallbackCQ());
1067 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
virtual void OnReadDone(bool ok)
Notifies the application that a StartRead operation completed.
Definition: client_callback_impl.h:290
CallbackUnaryCallImpl(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Definition: client_callback_impl.h:58
::grpc_impl::ClientContext ClientContext
Definition: client_context.h:26
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback_impl.h:127
void Write(const Request *req)
Definition: client_callback_impl.h:152
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:145
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback_impl.h:163
void StartCall() override
Definition: client_callback_impl.h:970
virtual void grpc_call_ref(grpc_call *call)=0
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback_impl.h:329
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback_impl.h:212
void StartWrite(const Request *req)
Definition: client_callback_impl.h:346
virtual ~ClientUnaryReactor()
Definition: client_callback_impl.h:385
void Read(Response *msg) override
Definition: client_callback_impl.h:500
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
void RemoveHold()
Definition: client_callback_impl.h:326
Definition: client_callback_impl.h:1046
virtual void OnReadDone(bool ok)
Definition: client_callback_impl.h:330
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:821
void Read(Response *msg) override
Definition: client_callback_impl.h:692
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:352
virtual void OnWriteDone(bool ok)
Notifies the application that a StartWrite operation completed.
Definition: client_callback_impl.h:296
Definition: channel_interface.h:38
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback_impl.h:105
virtual void grpc_call_unref(grpc_call *call)=0
Definition: client_callback_impl.h:148
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback_impl.h:510
virtual ~ClientReadReactor()
Definition: client_callback_impl.h:319
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback_impl.h:142
Definition: channel_interface.h:50
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback_impl.h:383
grpc_call * call() const
Definition: call.h:72
virtual ~ClientCallbackReader()
Definition: client_callback_impl.h:135
Definition: client_callback_impl.h:133
void StartCall()
Definition: client_callback_impl.h:345
Definition: client_callback_impl.h:767
::google::protobuf::util::Status Status
Definition: config_protobuf.h:96
Definition: client_callback_impl.h:168
void AddHold()
Definition: client_callback_impl.h:357
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:349
void RemoveHold() override
Definition: client_callback_impl.h:705
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback_impl.h:840
void AddHold(int holds) override
Definition: client_callback_impl.h:702
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const Request *request, Response *response, experimental::ClientUnaryReactor *reactor)
Definition: client_callback_impl.h:1049
virtual ~ClientCallbackWriter()
Definition: client_callback_impl.h:150
void StartCall() override
Definition: client_callback_impl.h:794
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback_impl.h:222
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback_impl.h:107
::grpc_impl::Channel Channel
Definition: channel.h:26
virtual void OnDone(const ::grpc::Status &s)
Definition: client_callback_impl.h:328
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:358
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback_impl.h:389
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const Request *request, experimental::ClientReadReactor< Response > *reactor)
Definition: client_callback_impl.h:751
virtual ~ClientWriteReactor()
Definition: client_callback_impl.h:343
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback_impl.h:362
Definition: call_op_set.h:288
void WritesDone() override
Definition: client_callback_impl.h:860
virtual void OnDone(const ::grpc::Status &s)
Notifies the application that all operations associated with this RPC have completed and provides the...
Definition: client_callback_impl.h:275
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback_impl.h:109
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void RemoveHold()
Definition: client_callback_impl.h:359
Codegen interface for grpc::Channel.
Definition: channel_interface.h:70
void StartCall()
Definition: client_callback_impl.h:387
void AddHold(int holds) override
Definition: client_callback_impl.h:882
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue_impl.h:91
virtual void OnDone(const ::grpc::Status &s)
Definition: client_callback_impl.h:361
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback_impl.h:204
Definition: byte_buffer.h:52
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:325
void MaybeFinish()
Definition: client_callback_impl.h:430
Per-message write options.
Definition: call_op_set.h:85
void MaybeFinish()
Definition: client_callback_impl.h:995
virtual void OnWriteDone(bool ok)
Definition: client_callback_impl.h:363
void WriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:154
virtual void OnWritesDoneDone(bool ok)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback_impl.h:304
void RemoveHold() override
Definition: client_callback_impl.h:555
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
Definition: call_op_set.h:594
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
void MaybeFinish()
Definition: client_callback_impl.h:642
Definition: channel_interface.h:48
virtual ~ClientCallbackReaderWriter()
Definition: client_callback_impl.h:118
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
virtual ~ClientCallbackUnary()
Definition: client_callback_impl.h:170
bool ok() const
Is the status OK?
Definition: status.h:118
virtual void OnReadInitialMetadataDone(bool ok)
Notifies the application that a read of initial metadata from the server is done. ...
Definition: client_callback_impl.h:284
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback_impl.h:244
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue_impl.h:101
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:268
void AddHold(int holds) override
Definition: client_callback_impl.h:552
void AddHold()
Definition: client_callback_impl.h:324
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call...
Definition: client_callback_impl.h:197
A ClientContext allows the person implementing a service client to:
Definition: client_context_impl.h:180
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback_impl.h:235
Did it work? If it didn't, why?
Definition: status.h:31
void StartRead(Response *resp)
Definition: client_callback_impl.h:322
void StartCall() override
Definition: client_callback_impl.h:654
Definition: callback_common.h:68
void RemoveHold()
Definition: client_callback_impl.h:269
void RemoveHold() override
Definition: client_callback_impl.h:885
Definition: channel_interface.h:52
Definition: call_op_set.h:516
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
void StartWritesDone()
Definition: client_callback_impl.h:355
void WritesDone() override
Definition: client_callback_impl.h:530
Definition: call_op_set.h:744
virtual void OnDone(const ::grpc::Status &s)
Definition: client_callback_impl.h:388
Definition: client_callback_impl.h:627
virtual void OnWritesDoneDone(bool ok)
Definition: client_callback_impl.h:364
void StartCall()
Definition: client_callback_impl.h:321
void StartCall() override
Definition: client_callback_impl.h:442
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, experimental::ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback_impl.h:610
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, Response *response, experimental::ClientWriteReactor< Request > *reactor)
Definition: client_callback_impl.h:942
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback_impl.h:267
virtual ~ClientBidiReactor()
Definition: client_callback_impl.h:191
Definition: client_callback_impl.h:956
Definition: client_callback_impl.h:415
Straightforward wrapping of the C call object.
Definition: call.h:38
void CallbackUnaryCall(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Perform a callback-based unary call TODO(vjpai): Combine as much as possible with the blocking unary ...
Definition: client_callback_impl.h:46
Definition: client_callback_impl.h:116
void MaybeFinish()
Definition: client_callback_impl.h:782