Go to the documentation of this file.
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
40 template <
class InputMessage,
class OutputMessage>
42 const ::grpc::internal::RpcMethod& method,
44 const InputMessage* request, OutputMessage* result,
47 channel, method, context, request, result, on_completion);
50 template <
class InputMessage,
class OutputMessage>
51 class CallbackUnaryCallImpl {
54 const ::grpc::internal::RpcMethod& method,
56 const InputMessage* request, OutputMessage* result,
74 const size_t alloc_sz =
sizeof(OpSetAndTag);
75 auto*
const alloced =
static_cast<OpSetAndTag*
>(
78 auto* ops =
new (&alloced->opset) FullCallOpSet;
79 auto* tag =
new (&alloced->tag)
88 ops->SendInitialMetadata(&context->send_initial_metadata_,
89 context->initial_metadata_flags());
90 ops->RecvInitialMetadata(context);
91 ops->RecvMessage(result);
92 ops->AllowNoMessage();
93 ops->ClientSendClose();
94 ops->ClientRecvStatus(context, tag->status_ptr());
95 ops->set_core_cq_tag(tag);
125 template <
class Request,
class Response>
127 template <
class Response>
129 template <
class Request>
136 template <
class Request,
class Response>
143 virtual void Read(Response* resp) = 0;
144 virtual void AddHold(
int holds) = 0;
149 reactor->BindStream(
this);
153 template <
class Response>
158 virtual void Read(Response* resp) = 0;
159 virtual void AddHold(
int holds) = 0;
164 reactor->BindReader(
this);
168 template <
class Request>
180 virtual void AddHold(
int holds) = 0;
185 reactor->BindWriter(
this);
209 template <
class Request,
class Response>
244 stream_->Write(req, std::move(options));
292 stream_->AddHold(holds);
347 template <
class Response>
358 reader_->AddHold(holds);
374 template <
class Request>
384 writer_->Write(req, std::move(options));
394 writer_->AddHold(holds);
437 reactor->BindCall(
this);
443 template <
class Request,
class Response>
444 class ClientCallbackReaderWriterFactory;
445 template <
class Response>
446 class ClientCallbackReaderFactory;
447 template <
class Request>
448 class ClientCallbackWriterFactory;
450 template <
class Request,
class Response>
455 static void operator delete(
void* , std::size_t size) {
472 if (!start_corked_) {
473 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
474 context_->initial_metadata_flags());
482 if (backlog_.read_ops) {
485 if (backlog_.write_ops) {
488 if (backlog_.writes_done_ops) {
494 started_.store(
true, std::memory_order_release);
499 this->MaybeFinish(
false);
502 void Read(Response* msg)
override {
503 read_ops_.RecvMessage(msg);
504 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
505 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
507 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
508 backlog_.read_ops =
true;
518 write_ops_.ClientSendClose();
522 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
524 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
525 context_->initial_metadata_flags());
526 corked_write_needed_ =
false;
529 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
531 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
532 backlog_.write_ops =
true;
539 writes_done_ops_.ClientSendClose();
540 writes_done_tag_.
Set(call_.
call(),
542 reactor_->OnWritesDoneDone(ok);
545 &writes_done_ops_,
false);
547 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
549 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
550 context_->initial_metadata_flags());
551 corked_write_needed_ =
false;
553 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
555 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
556 backlog_.writes_done_ops =
true;
564 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
577 start_corked_(context_->initial_metadata_corked_),
578 corked_write_needed_(start_corked_) {
584 reactor_->OnReadInitialMetadataDone(ok);
588 start_ops_.RecvInitialMetadata(context_);
593 reactor_->OnWriteDone(ok);
601 reactor_->OnReadDone(ok);
610 [
this](
bool ) { MaybeFinish(true); },
613 finish_ops_.ClientRecvStatus(context_, &finish_status_);
623 void MaybeFinish(
bool from_reaction) {
625 1, std::memory_order_acq_rel) == 1)) {
627 auto* reactor = reactor_;
628 auto* call = call_.
call();
629 this->~ClientCallbackReaderWriterImpl();
634 reactor->InternalScheduleOnDone(std::move(s));
641 ClientBidiReactor<Request, Response>*
const reactor_;
647 const bool start_corked_;
648 bool corked_write_needed_;
670 struct StartCallBacklog {
671 bool write_ops =
false;
672 bool writes_done_ops =
false;
673 bool read_ops =
false;
675 StartCallBacklog backlog_ ;
678 std::atomic<intptr_t> callbacks_outstanding_{3};
679 std::atomic_bool started_{
false};
683 template <
class Request,
class Response>
684 class ClientCallbackReaderWriterFactory {
687 const ::grpc::internal::RpcMethod& method,
691 channel->CreateCall(method, context, channel->CallbackCQ());
701 template <
class Response>
705 static void operator delete(
void* , std::size_t size) {
724 reactor_->OnReadInitialMetadataDone(ok);
728 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
729 context_->initial_metadata_flags());
730 start_ops_.RecvInitialMetadata(context_);
737 reactor_->OnReadDone(ok);
745 if (backlog_.read_ops) {
748 started_.store(
true, std::memory_order_release);
753 [
this](
bool ) { MaybeFinish(true); },
754 &finish_ops_,
false);
755 finish_ops_.ClientRecvStatus(context_, &finish_status_);
760 void Read(Response* msg)
override {
761 read_ops_.RecvMessage(msg);
762 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
763 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
765 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
766 backlog_.read_ops =
true;
774 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
781 template <
class Request>
785 : context_(context), call_(call), reactor_(reactor) {
789 start_ops_.ClientSendClose();
793 void MaybeFinish(
bool from_reaction) {
795 1, std::memory_order_acq_rel) == 1)) {
797 auto* reactor = reactor_;
798 auto* call = call_.
call();
799 this->~ClientCallbackReaderImpl();
804 reactor->InternalScheduleOnDone(std::move(s));
811 ClientReadReactor<Response>*
const reactor_;
828 struct StartCallBacklog {
829 bool read_ops =
false;
831 StartCallBacklog backlog_ ;
834 std::atomic<intptr_t> callbacks_outstanding_{2};
835 std::atomic_bool started_{
false};
839 template <
class Response>
840 class ClientCallbackReaderFactory {
842 template <
class Request>
844 const ::grpc::internal::RpcMethod& method,
848 channel->CreateCall(method, context, channel->CallbackCQ());
857 template <
class Request>
861 static void operator delete(
void* , std::size_t size) {
878 if (!start_corked_) {
879 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
880 context_->initial_metadata_flags());
887 if (backlog_.write_ops) {
890 if (backlog_.writes_done_ops) {
896 started_.store(
true, std::memory_order_release);
901 this->MaybeFinish(
false);
907 write_ops_.ClientSendClose();
911 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
914 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
915 context_->initial_metadata_flags());
916 corked_write_needed_ =
false;
919 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
921 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
922 backlog_.write_ops =
true;
930 writes_done_ops_.ClientSendClose();
931 writes_done_tag_.
Set(call_.
call(),
933 reactor_->OnWritesDoneDone(ok);
936 &writes_done_ops_,
false);
938 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
941 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
942 context_->initial_metadata_flags());
943 corked_write_needed_ =
false;
946 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
948 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
949 backlog_.writes_done_ops =
true;
957 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
964 template <
class Response>
971 start_corked_(context_->initial_metadata_corked_),
972 corked_write_needed_(start_corked_) {
978 reactor_->OnReadInitialMetadataDone(ok);
982 start_ops_.RecvInitialMetadata(context_);
987 reactor_->OnWriteDone(ok);
994 finish_ops_.RecvMessage(response);
995 finish_ops_.AllowNoMessage();
998 [
this](
bool ) { MaybeFinish(true); },
1001 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1006 void MaybeFinish(
bool from_reaction) {
1008 1, std::memory_order_acq_rel) == 1)) {
1010 auto* reactor = reactor_;
1011 auto* call = call_.
call();
1012 this->~ClientCallbackWriterImpl();
1017 reactor->InternalScheduleOnDone(std::move(s));
1024 ClientWriteReactor<Request>*
const reactor_;
1030 const bool start_corked_;
1031 bool corked_write_needed_;
1051 struct StartCallBacklog {
1052 bool write_ops =
false;
1053 bool writes_done_ops =
false;
1055 StartCallBacklog backlog_ ;
1058 std::atomic<intptr_t> callbacks_outstanding_{3};
1059 std::atomic_bool started_{
false};
1063 template <
class Request>
1064 class ClientCallbackWriterFactory {
1066 template <
class Response>
1068 const ::grpc::internal::RpcMethod& method,
1072 channel->CreateCall(method, context, channel->CallbackCQ());
1084 static void operator delete(
void* , std::size_t size) {
1102 reactor_->OnReadInitialMetadataDone(ok);
1105 &start_ops_,
false);
1106 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1107 context_->initial_metadata_flags());
1108 start_ops_.RecvInitialMetadata(context_);
1112 finish_tag_.
Set(call_.
call(), [
this](
bool ) { MaybeFinish(); },
1115 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1123 template <
class Request,
class Response>
1127 : context_(context), call_(call), reactor_(reactor) {
1131 start_ops_.ClientSendClose();
1132 finish_ops_.RecvMessage(response);
1133 finish_ops_.AllowNoMessage();
1139 void MaybeFinish() {
1141 1, std::memory_order_acq_rel) == 1)) {
1143 auto* reactor = reactor_;
1144 auto* call = call_.
call();
1169 std::atomic<intptr_t> callbacks_outstanding_{2};
1174 template <
class Request,
class Response>
1176 const ::grpc::internal::RpcMethod& method,
1180 channel->CreateCall(method, context, channel->CallbackCQ());
1193 namespace experimental {
1195 template <
class Response>
1198 template <
class Request>
1201 template <
class Request,
class Response>
1205 template <
class Response>
1208 template <
class Request>
1211 template <
class Request,
class Response>
1219 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
Definition: client_callback.h:1081
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:136
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:363
virtual void Write(const Request *req, ::grpc::WriteOptions options)=0
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:399
void StartWritesDone()
Definition: client_callback.h:389
virtual ~ClientCallbackWriter()
Definition: client_callback.h:171
void Read(Response *msg) override
Definition: client_callback.h:502
void AddHold(int holds) override
Definition: client_callback.h:956
::grpc::ClientBidiReactor< Request, Response > ClientBidiReactor
Definition: client_callback.h:1212
Definition: call_op_set.h:618
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:233
Definition: call_op_set.h:525
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback.h:289
Definition: client_callback.h:451
virtual void OnDone(const ::grpc::Status &)=0
Called by the library when all operations associated with this RPC have completed and all Holds have ...
virtual void OnReadDone(bool)
Definition: client_callback.h:364
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:850
void RemoveHold() override
Definition: client_callback.h:776
virtual void grpc_call_ref(grpc_call *call)=0
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:386
Definition: call_op_set.h:287
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:161
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback.h:421
void CallbackUnaryCall(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Perform a callback-based unary call TODO(vjpai): Combine as much as possible with the blocking unary ...
Definition: client_callback.h:41
virtual void Read(Response *resp)=0
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
void StartCall()
Definition: client_callback.h:379
void RemoveHold() override
Definition: client_callback.h:566
virtual ~ClientUnaryReactor()
Definition: client_callback.h:423
Straightforward wrapping of the C call object.
Definition: call.h:35
virtual void WritesDone()=0
void RemoveHold() override
Definition: client_callback.h:959
CallbackUnaryCallImpl(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Definition: client_callback.h:53
::grpc::ClientUnaryReactor ClientUnaryReactor
Definition: client_callback.h:1214
virtual void OnReadInitialMetadataDone(bool)
Notifies the application that a read of initial metadata from the server is done.
Definition: client_callback.h:313
Definition: callback_common.h:68
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:148
bool ok() const
Is the status OK?
Definition: status.h:118
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, ClientReadReactor< Response > *reactor)
Definition: client_callback.h:843
void BindReactor(ClientUnaryReactor *reactor)
Definition: client_callback.h:436
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:225
void AddMultipleHolds(int holds)
Definition: client_callback.h:356
void StartCall() override
Definition: client_callback.h:872
::grpc::ClientReadReactor< Response > ClientReadReactor
Definition: client_callback.h:1206
virtual void StartCall()=0
virtual void AddHold(int holds)=0
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: client_callback.h:319
Did it work? If it didn't, why?
Definition: status.h:31
virtual ~ClientReadReactor()
Definition: client_callback.h:350
virtual void WritesDone()=0
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback.h:515
Definition: channel_interface.h:50
void StartCall()
Definition: client_callback.h:352
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite or StartWriteLast operation completed.
Definition: client_callback.h:326
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:191
void OnDone(const ::grpc::Status &) override
Notifies the application that all operations associated with this RPC have completed and all Holds ha...
Definition: client_callback.h:303
void StartCall() override
Definition: client_callback.h:716
Definition: channel_interface.h:46
Definition: channel_interface.h:36
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:427
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:383
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:184
virtual void RemoveHold()=0
Definition: client_callback.h:137
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback.h:265
virtual void Read(Response *resp)=0
void StartRead(Response *resp)
Definition: client_callback.h:353
Codegen interface for grpc::Channel.
Definition: channel_interface.h:72
Definition: client_callback.h:101
Definition: client_callback.h:1172
void Read(Response *msg) override
Definition: client_callback.h:760
Definition: client_callback.h:858
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback.h:130
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:686
::google::protobuf::util::Status Status
Definition: config_protobuf.h:90
Definition: client_callback.h:189
void AddMultipleHolds(int holds)
Definition: client_callback.h:290
void OnDone(const ::grpc::Status &) override
Definition: client_callback.h:398
virtual void grpc_call_unref(grpc_call *call)=0
void RemoveHold()
Definition: client_callback.h:360
Per-message write options.
Definition: call_op_set.h:79
virtual void RemoveHold()=0
virtual void OnWritesDoneDone(bool)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback.h:335
virtual ~ClientCallbackReader()
Definition: client_callback.h:156
void Set(grpc_call *call, std::function< void(bool)> f, CompletionQueueTag *ops, bool can_inline)
Definition: callback_common.h:164
void AddHold(int holds) override
Definition: client_callback.h:773
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:163
Definition: client_callback.h:169
virtual void AddHold(int holds)=0
void RemoveHold()
Definition: client_callback.h:294
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:139
virtual void RemoveHold()=0
void Write(const Request *req)
Definition: client_callback.h:173
virtual void StartCall()=0
void AddHold()
Definition: client_callback.h:391
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:65
grpc_call * call() const
Definition: call.h:69
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback.h:904
void StartCall()
Definition: client_callback.h:425
virtual void StartCall()=0
Definition: call_op_set.h:768
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:122
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback.h:243
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:96
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback.h:126
Definition: channel_interface.h:48
Definition: client_callback.h:702
virtual void StartCall()=0
void WritesDone() override
Definition: client_callback.h:929
void StartCall() override
Definition: client_callback.h:1095
void StartWrite(const Request *req)
Definition: client_callback.h:380
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call.
Definition: client_callback.h:218
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:90
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
virtual void OnWriteDone(bool)
Definition: client_callback.h:400
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:186
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback.h:256
Definition: byte_buffer.h:51
void OnDone(const ::grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback.h:426
virtual void AddHold(int holds)=0
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, Response *response, ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:1067
void AddMultipleHolds(int holds)
Definition: client_callback.h:392
virtual ~ClientWriteReactor()
Definition: client_callback.h:377
virtual void InternalScheduleOnDone(::grpc::Status s)
InternalScheduleOnDone is not part of the API and is not meant to be overridden.
void StartCall() override
Definition: client_callback.h:466
Definition: client_callback.h:154
virtual ~ClientCallbackUnary()
Definition: client_callback.h:191
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:939
void AddHold(int holds) override
Definition: client_callback.h:563
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, Response *response, ClientUnaryReactor *reactor)
Definition: client_callback.h:1175
void RemoveHold()
Definition: client_callback.h:396
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback.h:128
void WritesDone() override
Definition: client_callback.h:538
virtual ~ClientBidiReactor()
Definition: client_callback.h:212
void AddHold()
Definition: client_callback.h:355
void WriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:175
#define GPR_CODEGEN_DEBUG_ASSERT(x)
Codegen specific version of GPR_DEBUG_ASSERT.
Definition: core_codegen_interface.h:155
::grpc::ClientWriteReactor< Request > ClientWriteReactor
Definition: client_callback.h:1209
void OnDone(const ::grpc::Status &) override
Definition: client_callback.h:362
virtual void OnWritesDoneDone(bool)
Definition: client_callback.h:401