19 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H 20 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H 44 template <
class InputMessage,
class OutputMessage>
47 OutputMessage* result,
48 std::function<
void(
Status)> on_completion) {
50 channel, method, context, request, result, on_completion);
53 template <
class InputMessage,
class OutputMessage>
58 OutputMessage* result,
59 std::function<
void(
Status)> on_completion) {
62 Call call(channel->CreateCall(method, context, cq));
70 call.call(),
sizeof(FullCallOpSet))) FullCallOpSet;
77 Status s = ops->SendMessagePtr(request);
82 ops->SendInitialMetadata(&context->send_initial_metadata_,
83 context->initial_metadata_flags());
84 ops->RecvInitialMetadata(context);
85 ops->RecvMessage(result);
86 ops->AllowNoMessage();
87 ops->ClientSendClose();
88 ops->ClientRecvStatus(context, tag->status_ptr());
89 ops->set_core_cq_tag(tag);
95 namespace experimental {
98 template <
class Request,
class Response>
100 template <
class Response>
102 template <
class Request>
109 template <
class Request,
class Response>
113 virtual void StartCall() = 0;
114 virtual void Write(
const Request* req,
WriteOptions options) = 0;
115 virtual void WritesDone() = 0;
116 virtual void Read(Response* resp) = 0;
117 virtual void AddHold(
int holds) = 0;
118 virtual void RemoveHold() = 0;
122 reactor->BindStream(
this);
126 template <
class Response>
130 virtual void StartCall() = 0;
131 virtual void Read(Response* resp) = 0;
132 virtual void AddHold(
int holds) = 0;
133 virtual void RemoveHold() = 0;
137 reactor->BindReader(
this);
141 template <
class Request>
145 virtual void StartCall() = 0;
147 virtual void Write(
const Request* req,
WriteOptions options) = 0;
151 virtual void WritesDone() = 0;
153 virtual void AddHold(
int holds) = 0;
154 virtual void RemoveHold() = 0;
158 reactor->BindWriter(
this);
165 virtual void StartCall() = 0;
182 template <
class Request,
class Response>
215 stream_->Write(req, std::move(options));
308 template <
class Response>
332 template <
class Request>
340 writer_->Write(req, std::move(options));
389 reactor->BindCall(
this);
397 template <
class Request,
class Response>
398 class ClientCallbackReaderWriterFactory;
399 template <
class Response>
400 class ClientCallbackReaderFactory;
401 template <
class Request>
402 class ClientCallbackWriterFactory;
404 template <
class Request,
class Response>
410 static void operator delete(
void* ptr, std::size_t size) {
419 static void operator delete(
void*,
void*) { assert(0); }
422 if (--callbacks_outstanding_ == 0) {
423 Status s = std::move(finish_status_);
424 auto* reactor = reactor_;
425 auto* call = call_.call();
440 start_tag_.Set(call_.call(),
442 reactor_->OnReadInitialMetadataDone(ok);
446 if (!start_corked_) {
447 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
448 context_->initial_metadata_flags());
450 start_ops_.RecvInitialMetadata(context_);
451 start_ops_.set_core_cq_tag(&start_tag_);
452 call_.PerformOps(&start_ops_);
456 write_tag_.Set(call_.call(),
458 reactor_->OnWriteDone(ok);
462 write_ops_.set_core_cq_tag(&write_tag_);
464 read_tag_.Set(call_.call(),
466 reactor_->OnReadDone(ok);
470 read_ops_.set_core_cq_tag(&read_tag_);
471 if (read_ops_at_start_) {
472 call_.PerformOps(&read_ops_);
475 if (write_ops_at_start_) {
476 call_.PerformOps(&write_ops_);
479 if (writes_done_ops_at_start_) {
480 call_.PerformOps(&writes_done_ops_);
483 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
485 finish_ops_.ClientRecvStatus(context_, &finish_status_);
486 finish_ops_.set_core_cq_tag(&finish_tag_);
487 call_.PerformOps(&finish_ops_);
490 void Read(Response* msg)
override {
491 read_ops_.RecvMessage(msg);
492 callbacks_outstanding_++;
494 call_.PerformOps(&read_ops_);
496 read_ops_at_start_ =
true;
502 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
503 context_->initial_metadata_flags());
504 start_corked_ =
false;
509 write_ops_.ClientSendClose();
513 callbacks_outstanding_++;
515 call_.PerformOps(&write_ops_);
517 write_ops_at_start_ =
true;
522 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
523 context_->initial_metadata_flags());
524 start_corked_ =
false;
526 writes_done_ops_.ClientSendClose();
527 writes_done_tag_.Set(call_.call(),
529 reactor_->OnWritesDoneDone(ok);
533 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
534 callbacks_outstanding_++;
536 call_.PerformOps(&writes_done_ops_);
538 writes_done_ops_at_start_ =
true;
542 virtual void AddHold(
int holds)
override { callbacks_outstanding_ += holds; }
554 start_corked_(context_->initial_metadata_corked_) {
555 this->BindReactor(reactor);
573 bool write_ops_at_start_{
false};
577 bool writes_done_ops_at_start_{
false};
581 bool read_ops_at_start_{
false};
584 std::atomic_int callbacks_outstanding_{2};
585 bool started_{
false};
588 template <
class Request,
class Response>
595 Call call = channel->CreateCall(method, context, channel->CallbackCQ());
605 template <
class Response>
610 static void operator delete(
void* ptr, std::size_t size) {
619 static void operator delete(
void*,
void*) { assert(0); }
622 if (--callbacks_outstanding_ == 0) {
623 Status s = std::move(finish_status_);
624 auto* reactor = reactor_;
625 auto* call = call_.call();
639 start_tag_.Set(call_.call(),
641 reactor_->OnReadInitialMetadataDone(ok);
645 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
646 context_->initial_metadata_flags());
647 start_ops_.RecvInitialMetadata(context_);
648 start_ops_.set_core_cq_tag(&start_tag_);
649 call_.PerformOps(&start_ops_);
652 read_tag_.Set(call_.call(),
654 reactor_->OnReadDone(ok);
658 read_ops_.set_core_cq_tag(&read_tag_);
659 if (read_ops_at_start_) {
660 call_.PerformOps(&read_ops_);
663 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
665 finish_ops_.ClientRecvStatus(context_, &finish_status_);
666 finish_ops_.set_core_cq_tag(&finish_tag_);
667 call_.PerformOps(&finish_ops_);
670 void Read(Response* msg)
override {
671 read_ops_.RecvMessage(msg);
672 callbacks_outstanding_++;
674 call_.PerformOps(&read_ops_);
676 read_ops_at_start_ =
true;
680 virtual void AddHold(
int holds)
override { callbacks_outstanding_ += holds; }
686 template <
class Request>
690 : context_(context), call_(call), reactor_(reactor) {
691 this->BindReactor(reactor);
694 start_ops_.ClientSendClose();
712 bool read_ops_at_start_{
false};
715 std::atomic_int callbacks_outstanding_{2};
716 bool started_{
false};
719 template <
class Response>
722 template <
class Request>
727 Call call = channel->CreateCall(method, context, channel->CallbackCQ());
736 template <
class Request>
741 static void operator delete(
void* ptr, std::size_t size) {
750 static void operator delete(
void*,
void*) { assert(0); }
753 if (--callbacks_outstanding_ == 0) {
754 Status s = std::move(finish_status_);
755 auto* reactor = reactor_;
756 auto* call = call_.
call();
770 start_tag_.Set(call_.call(),
772 reactor_->OnReadInitialMetadataDone(ok);
776 if (!start_corked_) {
777 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
778 context_->initial_metadata_flags());
780 start_ops_.RecvInitialMetadata(context_);
781 start_ops_.set_core_cq_tag(&start_tag_);
782 call_.PerformOps(&start_ops_);
786 write_tag_.Set(call_.call(),
788 reactor_->OnWriteDone(ok);
792 write_ops_.set_core_cq_tag(&write_tag_);
794 if (write_ops_at_start_) {
795 call_.PerformOps(&write_ops_);
798 if (writes_done_ops_at_start_) {
799 call_.PerformOps(&writes_done_ops_);
802 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
804 finish_ops_.ClientRecvStatus(context_, &finish_status_);
805 finish_ops_.set_core_cq_tag(&finish_tag_);
806 call_.PerformOps(&finish_ops_);
811 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
812 context_->initial_metadata_flags());
813 start_corked_ =
false;
818 write_ops_.ClientSendClose();
822 callbacks_outstanding_++;
824 call_.PerformOps(&write_ops_);
826 write_ops_at_start_ =
true;
831 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
832 context_->initial_metadata_flags());
833 start_corked_ =
false;
835 writes_done_ops_.ClientSendClose();
836 writes_done_tag_.Set(call_.call(),
838 reactor_->OnWritesDoneDone(ok);
842 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
843 callbacks_outstanding_++;
845 call_.PerformOps(&writes_done_ops_);
847 writes_done_ops_at_start_ =
true;
851 virtual void AddHold(
int holds)
override { callbacks_outstanding_ += holds; }
857 template <
class Response>
864 start_corked_(context_->initial_metadata_corked_) {
865 this->BindReactor(reactor);
866 finish_ops_.RecvMessage(response);
867 finish_ops_.AllowNoMessage();
885 bool write_ops_at_start_{
false};
889 bool writes_done_ops_at_start_{
false};
892 std::atomic_int callbacks_outstanding_{2};
893 bool started_{
false};
896 template <
class Request>
899 template <
class Response>
904 Call call = channel->CreateCall(method, context, channel->CallbackCQ());
917 static void operator delete(
void* ptr, std::size_t size) {
926 static void operator delete(
void*,
void*) { assert(0); }
934 start_tag_.Set(call_.call(),
936 reactor_->OnReadInitialMetadataDone(ok);
940 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
941 context_->initial_metadata_flags());
942 start_ops_.RecvInitialMetadata(context_);
943 start_ops_.set_core_cq_tag(&start_tag_);
944 call_.PerformOps(&start_ops_);
946 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
948 finish_ops_.ClientRecvStatus(context_, &finish_status_);
949 finish_ops_.set_core_cq_tag(&finish_tag_);
950 call_.PerformOps(&finish_ops_);
954 if (--callbacks_outstanding_ == 0) {
955 Status s = std::move(finish_status_);
956 auto* reactor = reactor_;
957 auto* call = call_.
call();
967 template <
class Request,
class Response>
971 : context_(context), call_(call), reactor_(reactor) {
972 this->BindReactor(reactor);
975 start_ops_.ClientSendClose();
976 finish_ops_.RecvMessage(response);
977 finish_ops_.AllowNoMessage();
994 std::atomic_int callbacks_outstanding_{2};
995 bool started_{
false};
1000 template <
class Request,
class Response>
1002 const ::grpc::internal::RpcMethod& method,
1006 Call call = channel->CreateCall(method, context, channel->CallbackCQ());
1019 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
virtual void AddHold(int holds) override
Definition: client_callback.h:680
virtual ~ClientCallbackUnary()
Definition: client_callback.h:164
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:112
void StartCall() override
Definition: client_callback.h:632
::grpc_impl::ClientContext ClientContext
Definition: client_context.h:26
virtual ~ClientWriteReactor()
Definition: client_callback.h:335
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:352
Definition: channel_interface.h:59
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:145
virtual void grpc_call_ref(grpc_call *call)=0
Definition: client_callback.h:913
virtual ~ClientCallbackReader()
Definition: client_callback.h:129
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback.h:259
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
virtual void RemoveHold() override
Definition: client_callback.h:543
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback.h:101
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, Response *response, ::grpc::experimental::ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:900
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:821
virtual ~ClientReadReactor()
Definition: client_callback.h:311
virtual void grpc_call_unref(grpc_call *call)=0
void WritesDone() override
Definition: client_callback.h:829
void StartRead(Response *resp)
Definition: client_callback.h:314
CallbackUnaryCallImpl(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(Status)> on_completion)
Definition: client_callback.h:56
Definition: client_callback.h:110
Definition: channel_interface.h:63
void Write(const Request *req)
Definition: client_callback.h:146
void StartCall() override
Definition: client_callback.h:432
void Read(Response *msg) override
Definition: client_callback.h:490
Definition: channel_interface.h:49
void RemoveHold()
Definition: client_callback.h:261
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, ::grpc::experimental::ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:591
grpc_call * call() const
Definition: call.h:72
void WriteLast(const Request *req, WriteOptions options)
Definition: client_callback.h:148
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback.h:236
virtual void OnWriteDone(bool ok)
Definition: client_callback.h:353
virtual void OnDone(const Status &s)
Definition: client_callback.h:351
void Read(Response *msg) override
Definition: client_callback.h:670
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, const Request *request, ::grpc::experimental::ClientReadReactor< Response > *reactor)
Definition: client_callback.h:723
virtual ~ClientCallbackWriter()
Definition: client_callback.h:144
virtual void RemoveHold() override
Definition: client_callback.h:852
virtual void OnDone(const Status &s)
Definition: client_callback.h:320
Descriptor of an RPC method.
Definition: rpc_method.h:29
void StartWrite(const Request *req, WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback.h:214
Definition: client_callback.h:162
void AddMultipleHolds(int holds)
Definition: client_callback.h:260
virtual void AddHold(int holds) override
Definition: client_callback.h:851
::grpc_impl::Channel Channel
Definition: channel.h:26
Definition: client_callback.h:127
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback.h:206
void Write(const Request *msg, WriteOptions options) override
Definition: client_callback.h:809
Definition: client_callback.h:998
virtual void OnReadDone(bool ok)
Definition: client_callback.h:322
void StartWriteLast(const Request *req, WriteOptions options)
Definition: client_callback.h:342
Definition: call_op_set.h:288
virtual ~ClientBidiReactor()
Definition: client_callback.h:185
void StartWrite(const Request *req, WriteOptions options)
Definition: client_callback.h:339
virtual ~ClientUnaryReactor()
Definition: client_callback.h:375
void WritesDone() override
Definition: client_callback.h:520
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback.h:198
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback.h:103
virtual void OnWritesDoneDone(bool ok)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback.h:296
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call...
Definition: client_callback.h:191
Codegen interface for grpc::Channel.
Definition: channel_interface.h:69
Definition: client_callback.h:405
virtual void AddHold(int holds) override
Definition: client_callback.h:542
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:51
virtual void RemoveHold() override
Definition: client_callback.h:681
void MaybeFinish()
Definition: client_callback.h:421
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback.h:99
void AddMultipleHolds(int holds)
Definition: client_callback.h:348
virtual void OnWritesDoneDone(bool ok)
Definition: client_callback.h:354
Definition: byte_buffer.h:41
Per-message write options.
Definition: call_op_set.h:85
Definition: client_callback.h:737
void MaybeFinish()
Definition: client_callback.h:752
Definition: channel_interface.h:61
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback.h:373
void StartWritesDone()
Definition: client_callback.h:345
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
Definition: call_op_set.h:594
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
virtual void OnDone(const Status &s)
Definition: client_callback.h:378
void Write(const Request *msg, WriteOptions options) override
Definition: client_callback.h:500
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
bool ok() const
Is the status OK?
Definition: status.h:118
void StartCall() override
Definition: client_callback.h:763
Definition: client_callback.h:142
void StartCall()
Definition: client_callback.h:313
virtual void OnDone(const Status &s)
Notifies the application that all operations associated with this RPC have completed and provides the...
Definition: client_callback.h:267
void AddMultipleHolds(int holds)
Definition: client_callback.h:317
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue_impl.h:102
A ClientContext allows the person implementing a service client to:
Definition: client_context_impl.h:178
Did it work? If it didn't, why?
Definition: status.h:31
virtual void OnReadDone(bool ok)
Notifies the application that a StartRead operation completed.
Definition: client_callback.h:282
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:121
Definition: callback_common.h:68
virtual void OnWriteDone(bool ok)
Notifies the application that a StartWrite operation completed.
Definition: client_callback.h:288
void AddHold()
Definition: client_callback.h:347
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
Definition: client_callback.h:606
void RemoveHold()
Definition: client_callback.h:349
Definition: call_op_set.h:744
void StartCall()
Definition: client_callback.h:377
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:136
void AddHold()
Definition: client_callback.h:316
void StartWriteLast(const Request *req, WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback.h:227
void MaybeFinish()
Definition: client_callback.h:621
virtual void OnReadInitialMetadataDone(bool ok)
Notifies the application that a read of initial metadata from the server is done. ...
Definition: client_callback.h:276
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:157
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:379
void RemoveHold()
Definition: client_callback.h:318
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:321
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, const Request *request, Response *response, ::grpc::experimental::ClientUnaryReactor *reactor)
Definition: client_callback.h:1001
void StartCall() override
Definition: client_callback.h:928
void StartWrite(const Request *req)
Definition: client_callback.h:338
Straightforward wrapping of the C call object.
Definition: call.h:38
void MaybeFinish()
Definition: client_callback.h:953
void StartCall()
Definition: client_callback.h:337
void CallbackUnaryCall(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(Status)> on_completion)
Perform a callback-based unary call TODO(vjpai): Combine as much as possible with the blocking unary ...
Definition: client_callback.h:45