Go to the documentation of this file.
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
45 template <
class InputMessage,
class OutputMessage>
47 const ::grpc::internal::RpcMethod& method,
49 const InputMessage* request, OutputMessage* result,
52 channel, method, context, request, result, on_completion);
55 template <
class InputMessage,
class OutputMessage>
56 class CallbackUnaryCallImpl {
59 const ::grpc::internal::RpcMethod& method,
61 const InputMessage* request, OutputMessage* result,
79 const size_t alloc_sz =
sizeof(OpSetAndTag);
80 auto*
const alloced =
static_cast<OpSetAndTag*
>(
83 auto* ops =
new (&alloced->opset) FullCallOpSet;
84 auto* tag =
new (&alloced->tag)
93 ops->SendInitialMetadata(&context->send_initial_metadata_,
94 context->initial_metadata_flags());
95 ops->RecvInitialMetadata(context);
96 ops->RecvMessage(result);
97 ops->AllowNoMessage();
98 ops->ClientSendClose();
99 ops->ClientRecvStatus(context, tag->status_ptr());
100 ops->set_core_cq_tag(tag);
130 template <
class Request,
class Response>
132 template <
class Response>
134 template <
class Request>
141 template <
class Request,
class Response>
148 virtual void Read(Response* resp) = 0;
149 virtual void AddHold(
int holds) = 0;
154 reactor->BindStream(
this);
158 template <
class Response>
163 virtual void Read(Response* resp) = 0;
164 virtual void AddHold(
int holds) = 0;
169 reactor->BindReader(
this);
173 template <
class Request>
185 virtual void AddHold(
int holds) = 0;
190 reactor->BindWriter(
this);
214 template <
class Request,
class Response>
249 stream_->Write(req, std::move(options));
297 stream_->AddHold(holds);
352 template <
class Response>
363 reader_->AddHold(holds);
379 template <
class Request>
389 writer_->Write(req, std::move(options));
399 writer_->AddHold(holds);
442 reactor->BindCall(
this);
448 template <
class Request,
class Response>
449 class ClientCallbackReaderWriterFactory;
450 template <
class Response>
451 class ClientCallbackReaderFactory;
452 template <
class Request>
453 class ClientCallbackWriterFactory;
455 template <
class Request,
class Response>
460 static void operator delete(
void* , std::size_t size) {
477 if (!start_corked_) {
478 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
479 context_->initial_metadata_flags());
487 if (backlog_.read_ops) {
490 if (backlog_.write_ops) {
493 if (backlog_.writes_done_ops) {
499 started_.store(
true, std::memory_order_release);
504 this->MaybeFinish(
false);
507 void Read(Response* msg)
override {
508 read_ops_.RecvMessage(msg);
509 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
510 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
512 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
513 backlog_.read_ops =
true;
523 write_ops_.ClientSendClose();
527 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
529 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
530 context_->initial_metadata_flags());
531 corked_write_needed_ =
false;
534 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
536 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
537 backlog_.write_ops =
true;
544 writes_done_ops_.ClientSendClose();
545 writes_done_tag_.
Set(call_.
call(),
547 reactor_->OnWritesDoneDone(ok);
550 &writes_done_ops_,
false);
552 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
554 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
555 context_->initial_metadata_flags());
556 corked_write_needed_ =
false;
558 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
560 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
561 backlog_.writes_done_ops =
true;
569 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
582 start_corked_(context_->initial_metadata_corked_),
583 corked_write_needed_(start_corked_) {
589 reactor_->OnReadInitialMetadataDone(ok);
593 start_ops_.RecvInitialMetadata(context_);
598 reactor_->OnWriteDone(ok);
606 reactor_->OnReadDone(ok);
615 [
this](
bool ) { MaybeFinish(true); },
618 finish_ops_.ClientRecvStatus(context_, &finish_status_);
628 void MaybeFinish(
bool from_reaction) {
630 1, std::memory_order_acq_rel) == 1)) {
632 auto* reactor = reactor_;
633 auto* call = call_.
call();
634 this->~ClientCallbackReaderWriterImpl();
639 reactor->InternalScheduleOnDone(std::move(s));
646 ClientBidiReactor<Request, Response>*
const reactor_;
652 const bool start_corked_;
653 bool corked_write_needed_;
675 struct StartCallBacklog {
676 bool write_ops =
false;
677 bool writes_done_ops =
false;
678 bool read_ops =
false;
680 StartCallBacklog backlog_ ;
683 std::atomic<intptr_t> callbacks_outstanding_{3};
684 std::atomic_bool started_{
false};
688 template <
class Request,
class Response>
689 class ClientCallbackReaderWriterFactory {
692 const ::grpc::internal::RpcMethod& method,
696 channel->CreateCall(method, context, channel->CallbackCQ());
706 template <
class Response>
710 static void operator delete(
void* , std::size_t size) {
729 reactor_->OnReadInitialMetadataDone(ok);
733 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
734 context_->initial_metadata_flags());
735 start_ops_.RecvInitialMetadata(context_);
742 reactor_->OnReadDone(ok);
750 if (backlog_.read_ops) {
753 started_.store(
true, std::memory_order_release);
758 [
this](
bool ) { MaybeFinish(true); },
759 &finish_ops_,
false);
760 finish_ops_.ClientRecvStatus(context_, &finish_status_);
765 void Read(Response* msg)
override {
766 read_ops_.RecvMessage(msg);
767 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
768 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
770 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
771 backlog_.read_ops =
true;
779 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
786 template <
class Request>
790 : context_(context), call_(call), reactor_(reactor) {
794 start_ops_.ClientSendClose();
798 void MaybeFinish(
bool from_reaction) {
800 1, std::memory_order_acq_rel) == 1)) {
802 auto* reactor = reactor_;
803 auto* call = call_.
call();
804 this->~ClientCallbackReaderImpl();
809 reactor->InternalScheduleOnDone(std::move(s));
816 ClientReadReactor<Response>*
const reactor_;
833 struct StartCallBacklog {
834 bool read_ops =
false;
836 StartCallBacklog backlog_ ;
839 std::atomic<intptr_t> callbacks_outstanding_{2};
840 std::atomic_bool started_{
false};
844 template <
class Response>
845 class ClientCallbackReaderFactory {
847 template <
class Request>
849 const ::grpc::internal::RpcMethod& method,
853 channel->CreateCall(method, context, channel->CallbackCQ());
862 template <
class Request>
866 static void operator delete(
void* , std::size_t size) {
883 if (!start_corked_) {
884 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
885 context_->initial_metadata_flags());
892 if (backlog_.write_ops) {
895 if (backlog_.writes_done_ops) {
901 started_.store(
true, std::memory_order_release);
906 this->MaybeFinish(
false);
912 write_ops_.ClientSendClose();
916 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
919 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
920 context_->initial_metadata_flags());
921 corked_write_needed_ =
false;
924 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
926 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
927 backlog_.write_ops =
true;
935 writes_done_ops_.ClientSendClose();
936 writes_done_tag_.
Set(call_.
call(),
938 reactor_->OnWritesDoneDone(ok);
941 &writes_done_ops_,
false);
943 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
946 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
947 context_->initial_metadata_flags());
948 corked_write_needed_ =
false;
951 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
953 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
954 backlog_.writes_done_ops =
true;
962 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
969 template <
class Response>
976 start_corked_(context_->initial_metadata_corked_),
977 corked_write_needed_(start_corked_) {
983 reactor_->OnReadInitialMetadataDone(ok);
987 start_ops_.RecvInitialMetadata(context_);
992 reactor_->OnWriteDone(ok);
999 finish_ops_.RecvMessage(response);
1000 finish_ops_.AllowNoMessage();
1003 [
this](
bool ) { MaybeFinish(true); },
1006 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1011 void MaybeFinish(
bool from_reaction) {
1013 1, std::memory_order_acq_rel) == 1)) {
1015 auto* reactor = reactor_;
1016 auto* call = call_.
call();
1017 this->~ClientCallbackWriterImpl();
1022 reactor->InternalScheduleOnDone(std::move(s));
1029 ClientWriteReactor<Request>*
const reactor_;
1035 const bool start_corked_;
1036 bool corked_write_needed_;
1056 struct StartCallBacklog {
1057 bool write_ops =
false;
1058 bool writes_done_ops =
false;
1060 StartCallBacklog backlog_ ;
1063 std::atomic<intptr_t> callbacks_outstanding_{3};
1064 std::atomic_bool started_{
false};
1068 template <
class Request>
1069 class ClientCallbackWriterFactory {
1071 template <
class Response>
1073 const ::grpc::internal::RpcMethod& method,
1077 channel->CreateCall(method, context, channel->CallbackCQ());
1089 static void operator delete(
void* , std::size_t size) {
1107 reactor_->OnReadInitialMetadataDone(ok);
1110 &start_ops_,
false);
1111 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1112 context_->initial_metadata_flags());
1113 start_ops_.RecvInitialMetadata(context_);
1117 finish_tag_.
Set(call_.
call(), [
this](
bool ) { MaybeFinish(); },
1120 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1128 template <
class Request,
class Response>
1132 : context_(context), call_(call), reactor_(reactor) {
1136 start_ops_.ClientSendClose();
1137 finish_ops_.RecvMessage(response);
1138 finish_ops_.AllowNoMessage();
1144 void MaybeFinish() {
1146 1, std::memory_order_acq_rel) == 1)) {
1148 auto* reactor = reactor_;
1149 auto* call = call_.
call();
1174 std::atomic<intptr_t> callbacks_outstanding_{2};
1179 template <
class Request,
class Response>
1181 const ::grpc::internal::RpcMethod& method,
1185 channel->CreateCall(method, context, channel->CallbackCQ());
1197 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:136
virtual void WritesDone()=0
void StartCall() override
Definition: client_callback_impl.h:877
void RemoveHold() override
Definition: client_callback_impl.h:781
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback_impl.h:909
Definition: channel_interface.h:46
Definition: client_callback_impl.h:174
Definition: call_op_set.h:618
void StartCall()
Definition: client_callback_impl.h:430
Definition: client_callback_impl.h:863
virtual void OnReadDone(bool)
Definition: client_callback_impl.h:369
void Read(Response *msg) override
Definition: client_callback_impl.h:765
Definition: call_op_set.h:525
void Write(const Request *req)
Definition: client_callback_impl.h:178
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback_impl.h:230
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:850
virtual void grpc_call_ref(grpc_call *call)=0
Definition: call_op_set.h:287
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:161
virtual void StartCall()=0
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
virtual void RemoveHold()=0
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback_impl.h:432
virtual void OnDone(const ::grpc::Status &)=0
Called by the library when all operations associated with this RPC have completed and all Holds have ...
virtual ~ClientCallbackReaderWriter()
Definition: client_callback_impl.h:144
virtual void WritesDone()=0
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:391
Definition: channel_interface.h:36
virtual ~ClientUnaryReactor()
Definition: client_callback_impl.h:428
Straightforward wrapping of the C call object.
Definition: call.h:35
Definition: client_callback_impl.h:106
Definition: channel_interface.h:50
void CallbackUnaryCall(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Perform a callback-based unary call TODO(vjpai): Combine as much as possible with the blocking unary ...
Definition: client_callback_impl.h:46
void OnDone(const ::grpc::Status &) override
Notifies the application that all operations associated with this RPC have completed and all Holds ha...
Definition: client_callback_impl.h:308
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, ClientReadReactor< Response > *reactor)
Definition: client_callback_impl.h:848
void StartCall() override
Definition: client_callback_impl.h:1100
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback_impl.h:189
void RemoveHold()
Definition: client_callback_impl.h:299
virtual void StartCall()=0
Definition: callback_common.h:68
void StartRead(Response *resp)
Definition: client_callback_impl.h:358
bool ok() const
Is the status OK?
Definition: status.h:118
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback_impl.h:261
virtual void StartCall()=0
virtual void StartCall()=0
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback_impl.h:131
void RemoveHold()
Definition: client_callback_impl.h:365
void StartCall() override
Definition: client_callback_impl.h:721
void AddHold(int holds) override
Definition: client_callback_impl.h:778
Did it work? If it didn't, why?
Definition: status.h:31
virtual void Read(Response *resp)=0
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback_impl.h:238
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback_impl.h:133
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback_impl.h:520
Definition: client_callback_impl.h:159
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: client_callback_impl.h:324
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:295
Definition: client_callback_impl.h:456
virtual ~ClientBidiReactor()
Definition: client_callback_impl.h:217
Definition: client_callback_impl.h:194
void RemoveHold() override
Definition: client_callback_impl.h:571
virtual void RemoveHold()=0
void OnDone(const ::grpc::Status &) override
Definition: client_callback_impl.h:403
virtual ~ClientReadReactor()
Definition: client_callback_impl.h:355
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:195
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback_impl.h:426
void OnDone(const ::grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback_impl.h:431
virtual void Write(const Request *req, ::grpc::WriteOptions options)=0
::grpc_impl::ClientUnaryReactor ClientUnaryReactor
Definition: client_callback.h:71
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback_impl.h:294
::grpc_impl::ClientReadReactor< Response > ClientReadReactor
Definition: client_callback.h:63
Codegen interface for grpc::Channel.
Definition: channel_interface.h:74
void AddHold(int holds) override
Definition: client_callback_impl.h:568
void StartCall()
Definition: client_callback_impl.h:384
virtual void OnWriteDone(bool)
Definition: client_callback_impl.h:405
CallbackUnaryCallImpl(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Definition: client_callback_impl.h:58
virtual void OnWritesDoneDone(bool)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback_impl.h:340
void RemoveHold() override
Definition: client_callback_impl.h:964
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call.
Definition: client_callback_impl.h:223
void RemoveHold()
Definition: client_callback_impl.h:401
virtual ~ClientCallbackUnary()
Definition: client_callback_impl.h:196
void StartCall()
Definition: client_callback_impl.h:357
void StartWritesDone()
Definition: client_callback_impl.h:394
virtual void Read(Response *resp)=0
::google::protobuf::util::Status Status
Definition: config_protobuf.h:90
Definition: client_callback_impl.h:707
virtual void AddHold(int holds)=0
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback_impl.h:368
void StartCall() override
Definition: client_callback_impl.h:471
virtual void grpc_call_unref(grpc_call *call)=0
Definition: channel_interface.h:48
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback_impl.h:270
virtual void RemoveHold()=0
virtual ~ClientWriteReactor()
Definition: client_callback_impl.h:382
Definition: client_callback_impl.h:142
Per-message write options.
Definition: call_op_set.h:79
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:361
void Set(grpc_call *call, std::function< void(bool)> f, CompletionQueueTag *ops, bool can_inline)
Definition: callback_common.h:164
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback_impl.h:135
virtual ~ClientCallbackReader()
Definition: client_callback_impl.h:161
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:397
Definition: client_callback_impl.h:1177
void WriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:180
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:65
grpc_call * call() const
Definition: call.h:69
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback_impl.h:168
Definition: call_op_set.h:768
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:122
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, Response *response, ClientWriteReactor< Request > *reactor)
Definition: client_callback_impl.h:1072
void BindReactor(ClientUnaryReactor *reactor)
Definition: client_callback_impl.h:441
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, Response *response, ClientUnaryReactor *reactor)
Definition: client_callback_impl.h:1180
void OnDone(const ::grpc::Status &) override
Definition: client_callback_impl.h:367
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:99
void WritesDone() override
Definition: client_callback_impl.h:543
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
virtual void OnWritesDoneDone(bool)
Definition: client_callback_impl.h:406
void StartWrite(const Request *req)
Definition: client_callback_impl.h:385
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback_impl.h:248
virtual void AddHold(int holds)=0
Definition: client_callback_impl.h:1086
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:93
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:186
void AddHold()
Definition: client_callback_impl.h:396
void WritesDone() override
Definition: client_callback_impl.h:934
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback_impl.h:153
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:388
::grpc_impl::ClientBidiReactor< Request, Response > ClientBidiReactor
Definition: client_callback.h:69
void AddHold()
Definition: client_callback_impl.h:360
Definition: byte_buffer.h:58
void Read(Response *msg) override
Definition: client_callback_impl.h:507
virtual ~ClientCallbackWriter()
Definition: client_callback_impl.h:176
virtual void OnReadInitialMetadataDone(bool)
Notifies the application that a read of initial metadata from the server is done.
Definition: client_callback_impl.h:318
virtual void AddHold(int holds)=0
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:939
::grpc_impl::ClientWriteReactor< Request > ClientWriteReactor
Definition: client_callback.h:66
virtual void InternalScheduleOnDone(::grpc::Status s)
InternalScheduleOnDone is not part of the API and is not meant to be overridden.
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback_impl.h:691
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback_impl.h:404
void AddHold(int holds) override
Definition: client_callback_impl.h:961
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite or StartWriteLast operation completed.
Definition: client_callback_impl.h:331
#define GPR_CODEGEN_DEBUG_ASSERT(x)
Codegen specific version of GPR_DEBUG_ASSERT.
Definition: core_codegen_interface.h:155