18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H 19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H 45 template <
class InputMessage,
class OutputMessage>
47 const ::grpc::internal::RpcMethod& method,
49 const InputMessage* request, OutputMessage* result,
52 channel, method, context, request, result, on_completion);
55 template <
class InputMessage,
class OutputMessage>
59 const ::grpc::internal::RpcMethod& method,
61 const InputMessage* request, OutputMessage* result,
79 const size_t alloc_sz =
sizeof(OpSetAndTag);
80 auto*
const alloced =
static_cast<OpSetAndTag*
>(
83 auto* ops =
new (&alloced->opset) FullCallOpSet;
84 auto* tag =
new (&alloced->tag)
93 ops->SendInitialMetadata(&context->send_initial_metadata_,
94 context->initial_metadata_flags());
95 ops->RecvInitialMetadata(context);
96 ops->RecvMessage(result);
97 ops->AllowNoMessage();
98 ops->ClientSendClose();
99 ops->ClientRecvStatus(context, tag->status_ptr());
100 ops->set_core_cq_tag(tag);
101 call.PerformOps(ops);
107 template <
class Request,
class Response>
109 template <
class Response>
111 template <
class Request>
118 template <
class Request,
class Response>
122 virtual void StartCall() = 0;
124 virtual void WritesDone() = 0;
125 virtual void Read(Response* resp) = 0;
126 virtual void AddHold(
int holds) = 0;
127 virtual void RemoveHold() = 0;
131 reactor->BindStream(
this);
135 template <
class Response>
139 virtual void StartCall() = 0;
140 virtual void Read(Response* resp) = 0;
141 virtual void AddHold(
int holds) = 0;
142 virtual void RemoveHold() = 0;
146 reactor->BindReader(
this);
150 template <
class Request>
154 virtual void StartCall() = 0;
160 virtual void WritesDone() = 0;
162 virtual void AddHold(
int holds) = 0;
163 virtual void RemoveHold() = 0;
167 reactor->BindWriter(
this);
174 virtual void StartCall() = 0;
191 template <
class Request,
class Response>
226 stream_->Write(req, std::move(options));
319 template <
class Response>
343 template <
class Request>
353 writer_->Write(req, std::move(options));
402 reactor->BindCall(
this);
408 template <
class Request,
class Response>
409 class ClientCallbackReaderWriterFactory;
410 template <
class Response>
411 class ClientCallbackReaderFactory;
412 template <
class Request>
413 class ClientCallbackWriterFactory;
415 template <
class Request,
class Response>
420 static void operator delete(
void* , std::size_t size) {
433 1, std::memory_order_acq_rel) == 1)) {
435 auto* reactor = reactor_;
436 auto* call = call_.call();
451 start_tag_.Set(call_.call(),
453 reactor_->OnReadInitialMetadataDone(ok);
457 if (!start_corked_) {
458 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
459 context_->initial_metadata_flags());
461 start_ops_.RecvInitialMetadata(context_);
462 start_ops_.set_core_cq_tag(&start_tag_);
463 call_.PerformOps(&start_ops_);
467 write_tag_.Set(call_.call(),
469 reactor_->OnWriteDone(ok);
473 write_ops_.set_core_cq_tag(&write_tag_);
475 read_tag_.Set(call_.call(),
477 reactor_->OnReadDone(ok);
481 read_ops_.set_core_cq_tag(&read_tag_);
482 if (read_ops_at_start_) {
483 call_.PerformOps(&read_ops_);
486 if (write_ops_at_start_) {
487 call_.PerformOps(&write_ops_);
490 if (writes_done_ops_at_start_) {
491 call_.PerformOps(&writes_done_ops_);
494 finish_tag_.Set(call_.call(), [
this](
bool ) { MaybeFinish(); },
495 &finish_ops_,
false);
496 finish_ops_.ClientRecvStatus(context_, &finish_status_);
497 finish_ops_.set_core_cq_tag(&finish_tag_);
498 call_.PerformOps(&finish_ops_);
501 void Read(Response* msg)
override {
502 read_ops_.RecvMessage(msg);
503 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
505 call_.PerformOps(&read_ops_);
507 read_ops_at_start_ =
true;
513 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
514 context_->initial_metadata_flags());
515 start_corked_ =
false;
520 write_ops_.ClientSendClose();
524 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
526 call_.PerformOps(&write_ops_);
528 write_ops_at_start_ =
true;
533 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
534 context_->initial_metadata_flags());
535 start_corked_ =
false;
537 writes_done_ops_.ClientSendClose();
538 writes_done_tag_.Set(call_.call(),
540 reactor_->OnWritesDoneDone(ok);
543 &writes_done_ops_,
false);
544 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
545 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
547 call_.PerformOps(&writes_done_ops_);
549 writes_done_ops_at_start_ =
true;
554 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
567 start_corked_(context_->initial_metadata_corked_) {
568 this->BindReactor(reactor);
590 bool write_ops_at_start_{
false};
596 bool writes_done_ops_at_start_{
false};
601 bool read_ops_at_start_{
false};
604 std::atomic<intptr_t> callbacks_outstanding_{2};
605 bool started_{
false};
608 template <
class Request,
class Response>
612 const ::grpc::internal::RpcMethod& method,
616 channel->CreateCall(method, context, channel->CallbackCQ());
626 template <
class Response>
630 static void operator delete(
void* , std::size_t size) {
643 1, std::memory_order_acq_rel) == 1)) {
645 auto* reactor = reactor_;
646 auto* call = call_.call();
660 start_tag_.Set(call_.call(),
662 reactor_->OnReadInitialMetadataDone(ok);
666 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
667 context_->initial_metadata_flags());
668 start_ops_.RecvInitialMetadata(context_);
669 start_ops_.set_core_cq_tag(&start_tag_);
670 call_.PerformOps(&start_ops_);
673 read_tag_.Set(call_.call(),
675 reactor_->OnReadDone(ok);
679 read_ops_.set_core_cq_tag(&read_tag_);
680 if (read_ops_at_start_) {
681 call_.PerformOps(&read_ops_);
684 finish_tag_.Set(call_.call(), [
this](
bool ) { MaybeFinish(); },
685 &finish_ops_,
false);
686 finish_ops_.ClientRecvStatus(context_, &finish_status_);
687 finish_ops_.set_core_cq_tag(&finish_tag_);
688 call_.PerformOps(&finish_ops_);
691 void Read(Response* msg)
override {
692 read_ops_.RecvMessage(msg);
693 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
695 call_.PerformOps(&read_ops_);
697 read_ops_at_start_ =
true;
702 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
709 template <
class Request>
714 : context_(context), call_(call), reactor_(reactor) {
715 this->BindReactor(reactor);
718 start_ops_.ClientSendClose();
739 bool read_ops_at_start_{
false};
742 std::atomic<intptr_t> callbacks_outstanding_{2};
743 bool started_{
false};
746 template <
class Response>
749 template <
class Request>
751 const ::grpc::internal::RpcMethod& method,
753 const Request* request,
756 channel->CreateCall(method, context, channel->CallbackCQ());
765 template <
class Request>
769 static void operator delete(
void* , std::size_t size) {
782 1, std::memory_order_acq_rel) == 1)) {
784 auto* reactor = reactor_;
785 auto* call = call_.
call();
799 start_tag_.Set(call_.call(),
801 reactor_->OnReadInitialMetadataDone(ok);
805 if (!start_corked_) {
806 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
807 context_->initial_metadata_flags());
809 start_ops_.RecvInitialMetadata(context_);
810 start_ops_.set_core_cq_tag(&start_tag_);
811 call_.PerformOps(&start_ops_);
815 write_tag_.Set(call_.call(),
817 reactor_->OnWriteDone(ok);
821 write_ops_.set_core_cq_tag(&write_tag_);
823 if (write_ops_at_start_) {
824 call_.PerformOps(&write_ops_);
827 if (writes_done_ops_at_start_) {
828 call_.PerformOps(&writes_done_ops_);
831 finish_tag_.Set(call_.call(), [
this](
bool ) { MaybeFinish(); },
832 &finish_ops_,
false);
833 finish_ops_.ClientRecvStatus(context_, &finish_status_);
834 finish_ops_.set_core_cq_tag(&finish_tag_);
835 call_.PerformOps(&finish_ops_);
840 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
841 context_->initial_metadata_flags());
842 start_corked_ =
false;
847 write_ops_.ClientSendClose();
851 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
853 call_.PerformOps(&write_ops_);
855 write_ops_at_start_ =
true;
860 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
861 context_->initial_metadata_flags());
862 start_corked_ =
false;
864 writes_done_ops_.ClientSendClose();
865 writes_done_tag_.Set(call_.call(),
867 reactor_->OnWritesDoneDone(ok);
870 &writes_done_ops_,
false);
871 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
872 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
874 call_.PerformOps(&writes_done_ops_);
876 writes_done_ops_at_start_ =
true;
881 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
888 template <
class Response>
896 start_corked_(context_->initial_metadata_corked_) {
897 this->BindReactor(reactor);
898 finish_ops_.RecvMessage(response);
899 finish_ops_.AllowNoMessage();
923 bool write_ops_at_start_{
false};
929 bool writes_done_ops_at_start_{
false};
932 std::atomic<intptr_t> callbacks_outstanding_{2};
933 bool started_{
false};
936 template <
class Request>
939 template <
class Response>
941 const ::grpc::internal::RpcMethod& method,
945 channel->CreateCall(method, context, channel->CallbackCQ());
957 static void operator delete(
void* , std::size_t size) {
974 start_tag_.Set(call_.call(),
976 reactor_->OnReadInitialMetadataDone(ok);
980 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
981 context_->initial_metadata_flags());
982 start_ops_.RecvInitialMetadata(context_);
983 start_ops_.set_core_cq_tag(&start_tag_);
984 call_.PerformOps(&start_ops_);
986 finish_tag_.Set(call_.call(), [
this](
bool ) { MaybeFinish(); },
987 &finish_ops_,
false);
988 finish_ops_.ClientRecvStatus(context_, &finish_status_);
989 finish_ops_.set_core_cq_tag(&finish_tag_);
990 call_.PerformOps(&finish_ops_);
995 1, std::memory_order_acq_rel) == 1)) {
997 auto* reactor = reactor_;
998 auto* call = call_.
call();
1008 template <
class Request,
class Response>
1012 : context_(context), call_(call), reactor_(reactor) {
1013 this->BindReactor(reactor);
1016 start_ops_.ClientSendClose();
1017 finish_ops_.RecvMessage(response);
1018 finish_ops_.AllowNoMessage();
1039 std::atomic<intptr_t> callbacks_outstanding_{2};
1040 bool started_{
false};
1045 template <
class Request,
class Response>
1047 const ::grpc::internal::RpcMethod& method,
1049 const Request* request, Response* response,
1052 channel->CreateCall(method, context, channel->CallbackCQ());
1064 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
CallbackUnaryCallImpl(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Definition: client_callback_impl.h:58
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:355
::grpc_impl::ClientContext ClientContext
Definition: client_context.h:26
void Write(const Request *req)
Definition: client_callback_impl.h:155
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:122
void WriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:157
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
void StartCall() override
Definition: client_callback_impl.h:968
virtual void grpc_call_ref(grpc_call *call)=0
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, Response *response, ClientWriteReactor< Request > *reactor)
Definition: client_callback_impl.h:940
void AddHold()
Definition: client_callback_impl.h:327
void StartRead(Response *resp)
Definition: client_callback_impl.h:325
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:328
void Read(Response *msg) override
Definition: client_callback_impl.h:501
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:161
Definition: client_callback_impl.h:1043
Definition: client_callback_impl.h:119
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:824
Definition: client_callback_impl.h:151
void Read(Response *msg) override
Definition: client_callback_impl.h:691
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback_impl.h:392
Definition: channel_interface.h:38
virtual void grpc_call_unref(grpc_call *call)=0
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const Request *request, Response *response, ClientUnaryReactor *reactor)
Definition: client_callback_impl.h:1046
virtual ~ClientCallbackReader()
Definition: client_callback_impl.h:138
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback_impl.h:511
Definition: channel_interface.h:50
virtual void OnReadDone(bool)
Definition: client_callback_impl.h:333
virtual void OnDone(const ::grpc::Status &)
Definition: client_callback_impl.h:364
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback_impl.h:611
virtual ~ClientCallbackWriter()
Definition: client_callback_impl.h:153
grpc_call * call() const
Definition: call.h:72
Definition: client_callback_impl.h:766
virtual ~ClientUnaryReactor()
Definition: client_callback_impl.h:388
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:271
::google::protobuf::util::Status Status
Definition: config_protobuf.h:90
void RemoveHold() override
Definition: client_callback_impl.h:704
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback_impl.h:332
void Write(const Request *msg, ::grpc::WriteOptions options) override
Definition: client_callback_impl.h:838
void AddHold(int holds) override
Definition: client_callback_impl.h:701
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback_impl.h:386
void StartCall() override
Definition: client_callback_impl.h:792
::grpc_impl::Channel Channel
Definition: channel.h:26
void StartCall()
Definition: client_callback_impl.h:348
void RemoveHold()
Definition: client_callback_impl.h:272
virtual ~ClientCallbackUnary()
Definition: client_callback_impl.h:173
Definition: call_op_set.h:286
void WritesDone() override
Definition: client_callback_impl.h:858
void RemoveHold()
Definition: client_callback_impl.h:362
Definition: client_callback_impl.h:136
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback_impl.h:225
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback_impl.h:108
Codegen interface for grpc::Channel.
Definition: channel_interface.h:74
void AddMultipleHolds(int holds)
Definition: client_callback_impl.h:361
void AddHold(int holds) override
Definition: client_callback_impl.h:880
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue_impl.h:90
virtual void OnDone(const ::grpc::Status &)
Notifies the application that all operations associated with this RPC have completed and provides the...
Definition: client_callback_impl.h:278
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback_impl.h:215
virtual ~ClientWriteReactor()
Definition: client_callback_impl.h:346
Definition: byte_buffer.h:58
virtual void OnWriteDone(bool)
Definition: client_callback_impl.h:366
virtual void OnWritesDoneDone(bool)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback_impl.h:307
virtual void OnWritesDoneDone(bool)
Definition: client_callback_impl.h:367
void MaybeFinish()
Definition: client_callback_impl.h:431
Per-message write options.
Definition: call_op_set.h:79
virtual void OnReadInitialMetadataDone(bool)
Notifies the application that a read of initial metadata from the server is done. ...
Definition: client_callback_impl.h:287
void StartWrite(const Request *req)
Definition: client_callback_impl.h:349
void MaybeFinish()
Definition: client_callback_impl.h:993
virtual ~ClientReadReactor()
Definition: client_callback_impl.h:322
void RemoveHold() override
Definition: client_callback_impl.h:556
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
Definition: call_op_set.h:592
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
void MaybeFinish()
Definition: client_callback_impl.h:641
Definition: channel_interface.h:48
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:136
bool ok() const
Is the status OK?
Definition: status.h:118
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback_impl.h:352
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const Request *request, ClientReadReactor< Response > *reactor)
Definition: client_callback_impl.h:750
virtual ~ClientCallbackReaderWriter()
Definition: client_callback_impl.h:121
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback_impl.h:238
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback_impl.h:207
void StartWritesDone()
Definition: client_callback_impl.h:358
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback_impl.h:110
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue_impl.h:100
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call...
Definition: client_callback_impl.h:200
void AddHold(int holds) override
Definition: client_callback_impl.h:553
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: client_callback_impl.h:293
A ClientContext allows the person implementing a service client to:
Definition: client_context_impl.h:184
Did it work? If it didn't, why?
Definition: status.h:31
void StartCall() override
Definition: client_callback_impl.h:653
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback_impl.h:112
Definition: callback_common.h:68
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback_impl.h:270
virtual void OnDone(const ::grpc::Status &)
Definition: client_callback_impl.h:331
void RemoveHold() override
Definition: client_callback_impl.h:883
void RemoveHold()
Definition: client_callback_impl.h:329
Definition: channel_interface.h:52
Definition: call_op_set.h:514
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:186
void WritesDone() override
Definition: client_callback_impl.h:531
Definition: call_op_set.h:742
virtual void OnDone(const ::grpc::Status &)
Definition: client_callback_impl.h:391
void AddHold()
Definition: client_callback_impl.h:360
Definition: client_callback_impl.h:627
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback_impl.h:145
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback_impl.h:247
void StartCall() override
Definition: client_callback_impl.h:443
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback_impl.h:365
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite operation completed.
Definition: client_callback_impl.h:299
void StartCall()
Definition: client_callback_impl.h:324
Definition: client_callback_impl.h:171
Definition: client_callback_impl.h:954
virtual ~ClientBidiReactor()
Definition: client_callback_impl.h:194
Definition: client_callback_impl.h:416
Straightforward wrapping of the C call object.
Definition: call.h:38
void CallbackUnaryCall(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc_impl::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Perform a callback-based unary call TODO(vjpai): Combine as much as possible with the blocking unary ...
Definition: client_callback_impl.h:46
void StartCall()
Definition: client_callback_impl.h:390
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback_impl.h:130
void MaybeFinish()
Definition: client_callback_impl.h:780
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback_impl.h:166