Go to the documentation of this file.
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
44 template <
class InputMessage,
class OutputMessage,
45 class BaseInputMessage = InputMessage,
46 class BaseOutputMessage = OutputMessage>
48 const ::grpc::internal::RpcMethod& method,
50 const InputMessage* request, OutputMessage* result,
52 static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
53 "Invalid input message specification");
54 static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
55 "Invalid output message specification");
57 channel, method, context, request, result, on_completion);
60 template <
class InputMessage,
class OutputMessage>
61 class CallbackUnaryCallImpl {
64 const ::grpc::internal::RpcMethod& method,
66 const InputMessage* request, OutputMessage* result,
84 const size_t alloc_sz =
sizeof(OpSetAndTag);
85 auto*
const alloced =
static_cast<OpSetAndTag*
>(
88 auto* ops =
new (&alloced->opset) FullCallOpSet;
89 auto* tag =
new (&alloced->tag)
98 ops->SendInitialMetadata(&context->send_initial_metadata_,
99 context->initial_metadata_flags());
100 ops->RecvInitialMetadata(context);
101 ops->RecvMessage(result);
102 ops->AllowNoMessage();
103 ops->ClientSendClose();
104 ops->ClientRecvStatus(context, tag->status_ptr());
105 ops->set_core_cq_tag(tag);
135 template <
class Request,
class Response>
137 template <
class Response>
139 template <
class Request>
146 template <
class Request,
class Response>
153 virtual void Read(Response* resp) = 0;
154 virtual void AddHold(
int holds) = 0;
159 reactor->BindStream(
this);
163 template <
class Response>
168 virtual void Read(Response* resp) = 0;
169 virtual void AddHold(
int holds) = 0;
174 reactor->BindReader(
this);
178 template <
class Request>
190 virtual void AddHold(
int holds) = 0;
195 reactor->BindWriter(
this);
219 template <
class Request,
class Response>
254 stream_->Write(req, options);
302 stream_->AddHold(holds);
357 template <
class Response>
368 reader_->AddHold(holds);
384 template <
class Request>
394 writer_->Write(req, options);
404 writer_->AddHold(holds);
447 reactor->BindCall(
this);
453 template <
class Request,
class Response>
454 class ClientCallbackReaderWriterFactory;
455 template <
class Response>
456 class ClientCallbackReaderFactory;
457 template <
class Request>
458 class ClientCallbackWriterFactory;
460 template <
class Request,
class Response>
465 static void operator delete(
void* , std::size_t size) {
482 if (!start_corked_) {
483 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
484 context_->initial_metadata_flags());
492 if (backlog_.read_ops) {
495 if (backlog_.write_ops) {
498 if (backlog_.writes_done_ops) {
504 started_.store(
true, std::memory_order_release);
509 this->MaybeFinish(
false);
512 void Read(Response* msg)
override {
513 read_ops_.RecvMessage(msg);
514 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
515 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
517 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
518 backlog_.read_ops =
true;
528 write_ops_.ClientSendClose();
532 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
534 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
535 context_->initial_metadata_flags());
536 corked_write_needed_ =
false;
539 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
541 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
542 backlog_.write_ops =
true;
549 writes_done_ops_.ClientSendClose();
550 writes_done_tag_.
Set(
553 reactor_->OnWritesDoneDone(ok);
556 &writes_done_ops_,
false);
558 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
560 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
561 context_->initial_metadata_flags());
562 corked_write_needed_ =
false;
564 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
566 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
567 backlog_.writes_done_ops =
true;
575 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
588 start_corked_(context_->initial_metadata_corked_),
589 corked_write_needed_(start_corked_) {
596 reactor_->OnReadInitialMetadataDone(ok);
600 start_ops_.RecvInitialMetadata(context_);
606 reactor_->OnWriteDone(ok);
615 reactor_->OnReadDone(ok);
624 [
this](
bool ) { MaybeFinish(true); },
627 finish_ops_.ClientRecvStatus(context_, &finish_status_);
637 void MaybeFinish(
bool from_reaction) {
639 1, std::memory_order_acq_rel) == 1)) {
641 auto* reactor = reactor_;
642 auto* call = call_.
call();
643 this->~ClientCallbackReaderWriterImpl();
648 reactor->InternalScheduleOnDone(std::move(s));
655 ClientBidiReactor<Request, Response>*
const reactor_;
661 const bool start_corked_;
662 bool corked_write_needed_;
684 struct StartCallBacklog {
685 bool write_ops =
false;
686 bool writes_done_ops =
false;
687 bool read_ops =
false;
689 StartCallBacklog backlog_ ;
692 std::atomic<intptr_t> callbacks_outstanding_{3};
693 std::atomic_bool started_{
false};
697 template <
class Request,
class Response>
698 class ClientCallbackReaderWriterFactory {
701 const ::grpc::internal::RpcMethod& method,
705 channel->CreateCall(method, context, channel->CallbackCQ());
715 template <
class Response>
719 static void operator delete(
void* , std::size_t size) {
739 reactor_->OnReadInitialMetadataDone(ok);
743 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
744 context_->initial_metadata_flags());
745 start_ops_.RecvInitialMetadata(context_);
753 reactor_->OnReadDone(ok);
761 if (backlog_.read_ops) {
764 started_.store(
true, std::memory_order_release);
769 [
this](
bool ) { MaybeFinish(true); },
770 &finish_ops_,
false);
771 finish_ops_.ClientRecvStatus(context_, &finish_status_);
776 void Read(Response* msg)
override {
777 read_ops_.RecvMessage(msg);
778 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
779 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
781 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
782 backlog_.read_ops =
true;
790 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
797 template <
class Request>
801 : context_(context), call_(call), reactor_(reactor) {
805 start_ops_.ClientSendClose();
809 void MaybeFinish(
bool from_reaction) {
811 1, std::memory_order_acq_rel) == 1)) {
813 auto* reactor = reactor_;
814 auto* call = call_.
call();
815 this->~ClientCallbackReaderImpl();
820 reactor->InternalScheduleOnDone(std::move(s));
827 ClientReadReactor<Response>*
const reactor_;
844 struct StartCallBacklog {
845 bool read_ops =
false;
847 StartCallBacklog backlog_ ;
850 std::atomic<intptr_t> callbacks_outstanding_{2};
851 std::atomic_bool started_{
false};
855 template <
class Response>
856 class ClientCallbackReaderFactory {
858 template <
class Request>
860 const ::grpc::internal::RpcMethod& method,
864 channel->CreateCall(method, context, channel->CallbackCQ());
873 template <
class Request>
877 static void operator delete(
void* , std::size_t size) {
894 if (!start_corked_) {
895 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
896 context_->initial_metadata_flags());
903 if (backlog_.write_ops) {
906 if (backlog_.writes_done_ops) {
912 started_.store(
true, std::memory_order_release);
917 this->MaybeFinish(
false);
923 write_ops_.ClientSendClose();
927 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
930 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
931 context_->initial_metadata_flags());
932 corked_write_needed_ =
false;
935 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
937 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
938 backlog_.write_ops =
true;
946 writes_done_ops_.ClientSendClose();
947 writes_done_tag_.
Set(
950 reactor_->OnWritesDoneDone(ok);
953 &writes_done_ops_,
false);
955 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
958 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
959 context_->initial_metadata_flags());
960 corked_write_needed_ =
false;
963 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
965 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
966 backlog_.writes_done_ops =
true;
974 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
981 template <
class Response>
988 start_corked_(context_->initial_metadata_corked_),
989 corked_write_needed_(start_corked_) {
996 reactor_->OnReadInitialMetadataDone(ok);
1000 start_ops_.RecvInitialMetadata(context_);
1006 reactor_->OnWriteDone(ok);
1009 &write_ops_,
false);
1013 finish_ops_.RecvMessage(response);
1014 finish_ops_.AllowNoMessage();
1017 [
this](
bool ) { MaybeFinish(true); },
1020 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1025 void MaybeFinish(
bool from_reaction) {
1027 1, std::memory_order_acq_rel) == 1)) {
1029 auto* reactor = reactor_;
1030 auto* call = call_.
call();
1031 this->~ClientCallbackWriterImpl();
1036 reactor->InternalScheduleOnDone(std::move(s));
1043 ClientWriteReactor<Request>*
const reactor_;
1049 const bool start_corked_;
1050 bool corked_write_needed_;
1070 struct StartCallBacklog {
1071 bool write_ops =
false;
1072 bool writes_done_ops =
false;
1074 StartCallBacklog backlog_ ;
1077 std::atomic<intptr_t> callbacks_outstanding_{3};
1078 std::atomic_bool started_{
false};
1082 template <
class Request>
1083 class ClientCallbackWriterFactory {
1085 template <
class Response>
1087 const ::grpc::internal::RpcMethod& method,
1091 channel->CreateCall(method, context, channel->CallbackCQ());
1103 static void operator delete(
void* , std::size_t size) {
1122 reactor_->OnReadInitialMetadataDone(ok);
1125 &start_ops_,
false);
1126 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1127 context_->initial_metadata_flags());
1128 start_ops_.RecvInitialMetadata(context_);
1133 call_.
call(), [
this](
bool ) { MaybeFinish(); }, &finish_ops_,
1135 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1143 template <
class Request,
class Response>
1147 : context_(context), call_(call), reactor_(reactor) {
1151 start_ops_.ClientSendClose();
1152 finish_ops_.RecvMessage(response);
1153 finish_ops_.AllowNoMessage();
1159 void MaybeFinish() {
1161 1, std::memory_order_acq_rel) == 1)) {
1163 auto* reactor = reactor_;
1164 auto* call = call_.
call();
1189 std::atomic<intptr_t> callbacks_outstanding_{2};
1194 template <
class Request,
class Response,
class BaseRequest = Request,
1195 class BaseResponse = Response>
1197 const ::grpc::internal::RpcMethod& method,
1201 channel->CreateCall(method, context, channel->CallbackCQ());
1208 static_cast<const BaseRequest*
>(request),
1209 static_cast<BaseResponse*
>(response), reactor);
1216 namespace experimental {
1218 template <
class Response>
1221 template <
class Request>
1224 template <
class Request,
class Response>
1228 template <
class Response>
1231 template <
class Request>
1234 template <
class Request,
class Response>
1242 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
Definition: client_callback.h:1100
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:137
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:373
virtual void Write(const Request *req, ::grpc::WriteOptions options)=0
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:409
void StartWritesDone()
Definition: client_callback.h:399
virtual ~ClientCallbackWriter()
Definition: client_callback.h:181
void Read(Response *msg) override
Definition: client_callback.h:512
void AddHold(int holds) override
Definition: client_callback.h:973
::grpc::ClientBidiReactor< Request, Response > ClientBidiReactor
Definition: client_callback.h:1235
Definition: call_op_set.h:619
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:243
Definition: call_op_set.h:526
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback.h:299
Definition: client_callback.h:461
virtual void OnDone(const ::grpc::Status &)=0
Called by the library when all operations associated with this RPC have completed and all Holds have ...
virtual void OnReadDone(bool)
Definition: client_callback.h:374
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:852
void RemoveHold() override
Definition: client_callback.h:792
virtual void grpc_call_ref(grpc_call *call)=0
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:396
Definition: call_op_set.h:282
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:156
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback.h:431
virtual void Read(Response *resp)=0
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
void StartCall()
Definition: client_callback.h:389
void RemoveHold() override
Definition: client_callback.h:577
virtual ~ClientUnaryReactor()
Definition: client_callback.h:433
Straightforward wrapping of the C call object.
Definition: call.h:35
virtual void WritesDone()=0
void RemoveHold() override
Definition: client_callback.h:976
CallbackUnaryCallImpl(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Definition: client_callback.h:63
::grpc::ClientUnaryReactor ClientUnaryReactor
Definition: client_callback.h:1237
virtual void OnReadInitialMetadataDone(bool)
Notifies the application that a read of initial metadata from the server is done.
Definition: client_callback.h:323
Definition: callback_common.h:69
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:158
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, Response *response, ClientUnaryReactor *reactor)
Definition: client_callback.h:1196
bool ok() const
Is the status OK?
Definition: status.h:118
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, ClientReadReactor< Response > *reactor)
Definition: client_callback.h:859
void BindReactor(ClientUnaryReactor *reactor)
Definition: client_callback.h:446
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:235
void AddMultipleHolds(int holds)
Definition: client_callback.h:366
void StartCall() override
Definition: client_callback.h:888
::grpc::ClientReadReactor< Response > ClientReadReactor
Definition: client_callback.h:1229
virtual void StartCall()=0
virtual void AddHold(int holds)=0
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: client_callback.h:329
Did it work? If it didn't, why?
Definition: status.h:31
virtual ~ClientReadReactor()
Definition: client_callback.h:360
virtual void WritesDone()=0
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback.h:525
Definition: channel_interface.h:49
void StartCall()
Definition: client_callback.h:362
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite or StartWriteLast operation completed.
Definition: client_callback.h:336
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:192
void OnDone(const ::grpc::Status &) override
Notifies the application that all operations associated with this RPC have completed and all Holds ha...
Definition: client_callback.h:313
void StartCall() override
Definition: client_callback.h:730
Definition: channel_interface.h:45
Definition: channel_interface.h:36
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:437
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:393
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:194
virtual void RemoveHold()=0
Definition: client_callback.h:147
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback.h:275
virtual void Read(Response *resp)=0
void StartRead(Response *resp)
Definition: client_callback.h:363
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
Definition: client_callback.h:111
Definition: client_callback.h:1192
void Read(Response *msg) override
Definition: client_callback.h:776
Definition: client_callback.h:874
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback.h:140
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:700
::google::protobuf::util::Status Status
Definition: config_protobuf.h:91
Definition: client_callback.h:199
void AddMultipleHolds(int holds)
Definition: client_callback.h:300
void OnDone(const ::grpc::Status &) override
Definition: client_callback.h:408
virtual void grpc_call_unref(grpc_call *call)=0
void RemoveHold()
Definition: client_callback.h:370
void CallbackUnaryCall(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Perform a callback-based unary call.
Definition: client_callback.h:47
Per-message write options.
Definition: call_op_set.h:79
virtual void RemoveHold()=0
virtual void OnWritesDoneDone(bool)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback.h:345
virtual ~ClientCallbackReader()
Definition: client_callback.h:166
void Set(grpc_call *call, std::function< void(bool)> f, CompletionQueueTag *ops, bool can_inline)
Definition: callback_common.h:165
void AddHold(int holds) override
Definition: client_callback.h:789
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:173
Definition: client_callback.h:179
virtual void AddHold(int holds)=0
void RemoveHold()
Definition: client_callback.h:304
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:149
virtual void RemoveHold()=0
void Write(const Request *req)
Definition: client_callback.h:183
virtual void StartCall()=0
void AddHold()
Definition: client_callback.h:401
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:65
grpc_call * call() const
Definition: call.h:69
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback.h:920
void StartCall()
Definition: client_callback.h:435
virtual void StartCall()=0
Definition: call_op_set.h:769
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:117
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback.h:253
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:102
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback.h:136
Definition: channel_interface.h:47
Definition: client_callback.h:716
virtual void StartCall()=0
void WritesDone() override
Definition: client_callback.h:945
void StartCall() override
Definition: client_callback.h:1114
void StartWrite(const Request *req)
Definition: client_callback.h:390
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call.
Definition: client_callback.h:228
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:96
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
virtual void OnWriteDone(bool)
Definition: client_callback.h:410
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:181
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback.h:266
Definition: byte_buffer.h:52
void OnDone(const ::grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback.h:436
virtual void AddHold(int holds)=0
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, Response *response, ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:1086
void AddMultipleHolds(int holds)
Definition: client_callback.h:402
virtual ~ClientWriteReactor()
Definition: client_callback.h:387
virtual void InternalScheduleOnDone(::grpc::Status s)
InternalScheduleOnDone is not part of the API and is not meant to be overridden.
void StartCall() override
Definition: client_callback.h:476
Definition: client_callback.h:164
virtual ~ClientCallbackUnary()
Definition: client_callback.h:201
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:941
void AddHold(int holds) override
Definition: client_callback.h:574
void RemoveHold()
Definition: client_callback.h:406
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback.h:138
void WritesDone() override
Definition: client_callback.h:548
virtual ~ClientBidiReactor()
Definition: client_callback.h:222
void AddHold()
Definition: client_callback.h:365
void WriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:185
#define GPR_CODEGEN_DEBUG_ASSERT(x)
Codegen specific version of GPR_DEBUG_ASSERT.
Definition: core_codegen_interface.h:155
::grpc::ClientWriteReactor< Request > ClientWriteReactor
Definition: client_callback.h:1232
void OnDone(const ::grpc::Status &) override
Definition: client_callback.h:372
virtual void OnWritesDoneDone(bool)
Definition: client_callback.h:411