19 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H 20 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H 45 template <
class InputMessage,
class OutputMessage>
48 OutputMessage* result,
49 std::function<
void(
Status)> on_completion) {
51 channel, method, context, request, result, on_completion);
54 template <
class InputMessage,
class OutputMessage>
59 OutputMessage* result,
60 std::function<
void(
Status)> on_completion) {
63 Call call(channel->CreateCall(method, context, cq));
71 call.call(),
sizeof(FullCallOpSet))) FullCallOpSet;
78 Status s = ops->SendMessagePtr(request);
83 ops->SendInitialMetadata(&context->send_initial_metadata_,
84 context->initial_metadata_flags());
85 ops->RecvInitialMetadata(context);
86 ops->RecvMessage(result);
87 ops->AllowNoMessage();
88 ops->ClientSendClose();
89 ops->ClientRecvStatus(context, tag->status_ptr());
90 ops->set_core_cq_tag(tag);
96 namespace experimental {
99 template <
class Request,
class Response>
101 template <
class Response>
103 template <
class Request>
110 template <
class Request,
class Response>
114 virtual void StartCall() = 0;
115 virtual void Write(
const Request* req,
WriteOptions options) = 0;
116 virtual void WritesDone() = 0;
117 virtual void Read(Response* resp) = 0;
118 virtual void AddHold(
int holds) = 0;
119 virtual void RemoveHold() = 0;
123 reactor->BindStream(
this);
127 template <
class Response>
131 virtual void StartCall() = 0;
132 virtual void Read(Response* resp) = 0;
133 virtual void AddHold(
int holds) = 0;
134 virtual void RemoveHold() = 0;
138 reactor->BindReader(
this);
142 template <
class Request>
146 virtual void StartCall() = 0;
148 virtual void Write(
const Request* req,
WriteOptions options) = 0;
152 virtual void WritesDone() = 0;
154 virtual void AddHold(
int holds) = 0;
155 virtual void RemoveHold() = 0;
159 reactor->BindWriter(
this);
166 virtual void StartCall() = 0;
183 template <
class Request,
class Response>
216 stream_->Write(req, std::move(options));
309 template <
class Response>
333 template <
class Request>
341 writer_->Write(req, std::move(options));
390 reactor->BindCall(
this);
398 template <
class Request,
class Response>
399 class ClientCallbackReaderWriterFactory;
400 template <
class Response>
401 class ClientCallbackReaderFactory;
402 template <
class Request>
403 class ClientCallbackWriterFactory;
405 template <
class Request,
class Response>
411 static void operator delete(
void* ptr, std::size_t size) {
420 static void operator delete(
void*,
void*) { assert(0); }
423 if (--callbacks_outstanding_ == 0) {
424 Status s = std::move(finish_status_);
425 auto* reactor = reactor_;
426 auto* call = call_.call();
441 start_tag_.Set(call_.call(),
443 reactor_->OnReadInitialMetadataDone(ok);
447 if (!start_corked_) {
448 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
449 context_->initial_metadata_flags());
451 start_ops_.RecvInitialMetadata(context_);
452 start_ops_.set_core_cq_tag(&start_tag_);
453 call_.PerformOps(&start_ops_);
457 write_tag_.Set(call_.call(),
459 reactor_->OnWriteDone(ok);
463 write_ops_.set_core_cq_tag(&write_tag_);
465 read_tag_.Set(call_.call(),
467 reactor_->OnReadDone(ok);
471 read_ops_.set_core_cq_tag(&read_tag_);
472 if (read_ops_at_start_) {
473 call_.PerformOps(&read_ops_);
476 if (write_ops_at_start_) {
477 call_.PerformOps(&write_ops_);
480 if (writes_done_ops_at_start_) {
481 call_.PerformOps(&writes_done_ops_);
484 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
486 finish_ops_.ClientRecvStatus(context_, &finish_status_);
487 finish_ops_.set_core_cq_tag(&finish_tag_);
488 call_.PerformOps(&finish_ops_);
491 void Read(Response* msg)
override {
492 read_ops_.RecvMessage(msg);
493 callbacks_outstanding_++;
495 call_.PerformOps(&read_ops_);
497 read_ops_at_start_ =
true;
503 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
504 context_->initial_metadata_flags());
505 start_corked_ =
false;
510 write_ops_.ClientSendClose();
514 callbacks_outstanding_++;
516 call_.PerformOps(&write_ops_);
518 write_ops_at_start_ =
true;
523 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
524 context_->initial_metadata_flags());
525 start_corked_ =
false;
527 writes_done_ops_.ClientSendClose();
528 writes_done_tag_.Set(call_.call(),
530 reactor_->OnWritesDoneDone(ok);
534 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
535 callbacks_outstanding_++;
537 call_.PerformOps(&writes_done_ops_);
539 writes_done_ops_at_start_ =
true;
543 virtual void AddHold(
int holds)
override { callbacks_outstanding_ += holds; }
555 start_corked_(context_->initial_metadata_corked_) {
556 this->BindReactor(reactor);
574 bool write_ops_at_start_{
false};
578 bool writes_done_ops_at_start_{
false};
582 bool read_ops_at_start_{
false};
585 std::atomic_int callbacks_outstanding_{2};
586 bool started_{
false};
589 template <
class Request,
class Response>
596 Call call = channel->CreateCall(method, context, channel->CallbackCQ());
606 template <
class Response>
611 static void operator delete(
void* ptr, std::size_t size) {
620 static void operator delete(
void*,
void*) { assert(0); }
623 if (--callbacks_outstanding_ == 0) {
624 Status s = std::move(finish_status_);
625 auto* reactor = reactor_;
626 auto* call = call_.call();
640 start_tag_.Set(call_.call(),
642 reactor_->OnReadInitialMetadataDone(ok);
646 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
647 context_->initial_metadata_flags());
648 start_ops_.RecvInitialMetadata(context_);
649 start_ops_.set_core_cq_tag(&start_tag_);
650 call_.PerformOps(&start_ops_);
653 read_tag_.Set(call_.call(),
655 reactor_->OnReadDone(ok);
659 read_ops_.set_core_cq_tag(&read_tag_);
660 if (read_ops_at_start_) {
661 call_.PerformOps(&read_ops_);
664 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
666 finish_ops_.ClientRecvStatus(context_, &finish_status_);
667 finish_ops_.set_core_cq_tag(&finish_tag_);
668 call_.PerformOps(&finish_ops_);
671 void Read(Response* msg)
override {
672 read_ops_.RecvMessage(msg);
673 callbacks_outstanding_++;
675 call_.PerformOps(&read_ops_);
677 read_ops_at_start_ =
true;
681 virtual void AddHold(
int holds)
override { callbacks_outstanding_ += holds; }
687 template <
class Request>
691 : context_(context), call_(call), reactor_(reactor) {
692 this->BindReactor(reactor);
695 start_ops_.ClientSendClose();
713 bool read_ops_at_start_{
false};
716 std::atomic_int callbacks_outstanding_{2};
717 bool started_{
false};
720 template <
class Response>
723 template <
class Request>
728 Call call = channel->CreateCall(method, context, channel->CallbackCQ());
737 template <
class Request>
742 static void operator delete(
void* ptr, std::size_t size) {
751 static void operator delete(
void*,
void*) { assert(0); }
754 if (--callbacks_outstanding_ == 0) {
755 Status s = std::move(finish_status_);
756 auto* reactor = reactor_;
757 auto* call = call_.
call();
771 start_tag_.Set(call_.call(),
773 reactor_->OnReadInitialMetadataDone(ok);
777 if (!start_corked_) {
778 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
779 context_->initial_metadata_flags());
781 start_ops_.RecvInitialMetadata(context_);
782 start_ops_.set_core_cq_tag(&start_tag_);
783 call_.PerformOps(&start_ops_);
787 write_tag_.Set(call_.call(),
789 reactor_->OnWriteDone(ok);
793 write_ops_.set_core_cq_tag(&write_tag_);
795 if (write_ops_at_start_) {
796 call_.PerformOps(&write_ops_);
799 if (writes_done_ops_at_start_) {
800 call_.PerformOps(&writes_done_ops_);
803 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
805 finish_ops_.ClientRecvStatus(context_, &finish_status_);
806 finish_ops_.set_core_cq_tag(&finish_tag_);
807 call_.PerformOps(&finish_ops_);
812 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
813 context_->initial_metadata_flags());
814 start_corked_ =
false;
819 write_ops_.ClientSendClose();
823 callbacks_outstanding_++;
825 call_.PerformOps(&write_ops_);
827 write_ops_at_start_ =
true;
832 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
833 context_->initial_metadata_flags());
834 start_corked_ =
false;
836 writes_done_ops_.ClientSendClose();
837 writes_done_tag_.Set(call_.call(),
839 reactor_->OnWritesDoneDone(ok);
843 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
844 callbacks_outstanding_++;
846 call_.PerformOps(&writes_done_ops_);
848 writes_done_ops_at_start_ =
true;
852 virtual void AddHold(
int holds)
override { callbacks_outstanding_ += holds; }
858 template <
class Response>
865 start_corked_(context_->initial_metadata_corked_) {
866 this->BindReactor(reactor);
867 finish_ops_.RecvMessage(response);
868 finish_ops_.AllowNoMessage();
886 bool write_ops_at_start_{
false};
890 bool writes_done_ops_at_start_{
false};
893 std::atomic_int callbacks_outstanding_{2};
894 bool started_{
false};
897 template <
class Request>
900 template <
class Response>
905 Call call = channel->CreateCall(method, context, channel->CallbackCQ());
918 static void operator delete(
void* ptr, std::size_t size) {
927 static void operator delete(
void*,
void*) { assert(0); }
935 start_tag_.Set(call_.call(),
937 reactor_->OnReadInitialMetadataDone(ok);
941 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
942 context_->initial_metadata_flags());
943 start_ops_.RecvInitialMetadata(context_);
944 start_ops_.set_core_cq_tag(&start_tag_);
945 call_.PerformOps(&start_ops_);
947 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
949 finish_ops_.ClientRecvStatus(context_, &finish_status_);
950 finish_ops_.set_core_cq_tag(&finish_tag_);
951 call_.PerformOps(&finish_ops_);
955 if (--callbacks_outstanding_ == 0) {
956 Status s = std::move(finish_status_);
957 auto* reactor = reactor_;
958 auto* call = call_.
call();
968 template <
class Request,
class Response>
972 : context_(context), call_(call), reactor_(reactor) {
973 this->BindReactor(reactor);
976 start_ops_.ClientSendClose();
977 finish_ops_.RecvMessage(response);
978 finish_ops_.AllowNoMessage();
995 std::atomic_int callbacks_outstanding_{2};
996 bool started_{
false};
1001 template <
class Request,
class Response>
1003 const ::grpc::internal::RpcMethod& method,
1007 Call call = channel->CreateCall(method, context, channel->CallbackCQ());
1020 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
virtual void AddHold(int holds) override
Definition: client_callback.h:681
virtual ~ClientCallbackUnary()
Definition: client_callback.h:165
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:113
void StartCall() override
Definition: client_callback.h:633
virtual ~ClientWriteReactor()
Definition: client_callback.h:336
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:353
Definition: channel_interface.h:59
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:145
virtual void grpc_call_ref(grpc_call *call)=0
Definition: client_callback.h:914
virtual ~ClientCallbackReader()
Definition: client_callback.h:130
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback.h:260
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
virtual void RemoveHold() override
Definition: client_callback.h:544
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback.h:102
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, Response *response, ::grpc::experimental::ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:901
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:826
virtual ~ClientReadReactor()
Definition: client_callback.h:312
virtual void grpc_call_unref(grpc_call *call)=0
void WritesDone() override
Definition: client_callback.h:830
void StartRead(Response *resp)
Definition: client_callback.h:315
CallbackUnaryCallImpl(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(Status)> on_completion)
Definition: client_callback.h:57
Definition: client_callback.h:111
Definition: channel_interface.h:63
void Write(const Request *req)
Definition: client_callback.h:147
void StartCall() override
Definition: client_callback.h:433
void Read(Response *msg) override
Definition: client_callback.h:491
Definition: channel_interface.h:49
void RemoveHold()
Definition: client_callback.h:262
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, ::grpc::experimental::ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:592
grpc_call * call() const
Definition: call.h:72
void WriteLast(const Request *req, WriteOptions options)
Definition: client_callback.h:149
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback.h:237
virtual void OnWriteDone(bool ok)
Definition: client_callback.h:354
virtual void OnDone(const Status &s)
Definition: client_callback.h:352
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:179
void Read(Response *msg) override
Definition: client_callback.h:671
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, const Request *request, ::grpc::experimental::ClientReadReactor< Response > *reactor)
Definition: client_callback.h:724
virtual ~ClientCallbackWriter()
Definition: client_callback.h:145
virtual void RemoveHold() override
Definition: client_callback.h:853
virtual void OnDone(const Status &s)
Definition: client_callback.h:321
Descriptor of an RPC method.
Definition: rpc_method.h:29
void StartWrite(const Request *req, WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback.h:215
Definition: client_callback.h:163
void AddMultipleHolds(int holds)
Definition: client_callback.h:261
virtual void AddHold(int holds) override
Definition: client_callback.h:852
::grpc_impl::Channel Channel
Definition: channel.h:26
Definition: client_callback.h:128
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback.h:207
void Write(const Request *msg, WriteOptions options) override
Definition: client_callback.h:810
Definition: client_callback.h:999
virtual void OnReadDone(bool ok)
Definition: client_callback.h:323
void StartWriteLast(const Request *req, WriteOptions options)
Definition: client_callback.h:343
Definition: call_op_set.h:293
virtual ~ClientBidiReactor()
Definition: client_callback.h:186
void StartWrite(const Request *req, WriteOptions options)
Definition: client_callback.h:340
virtual ~ClientUnaryReactor()
Definition: client_callback.h:376
void WritesDone() override
Definition: client_callback.h:521
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback.h:199
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback.h:104
virtual void OnWritesDoneDone(bool ok)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback.h:297
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call...
Definition: client_callback.h:192
Codegen interface for grpc::Channel.
Definition: channel_interface.h:69
Definition: client_callback.h:406
virtual void AddHold(int holds) override
Definition: client_callback.h:543
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:51
virtual void RemoveHold() override
Definition: client_callback.h:682
void MaybeFinish()
Definition: client_callback.h:422
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback.h:100
void AddMultipleHolds(int holds)
Definition: client_callback.h:349
virtual void OnWritesDoneDone(bool ok)
Definition: client_callback.h:355
Definition: byte_buffer.h:41
Per-message write options.
Definition: call_op_set.h:85
Definition: client_callback.h:738
void MaybeFinish()
Definition: client_callback.h:753
Definition: channel_interface.h:61
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback.h:374
void StartWritesDone()
Definition: client_callback.h:346
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
Definition: call_op_set.h:599
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
virtual void OnDone(const Status &s)
Definition: client_callback.h:379
void Write(const Request *msg, WriteOptions options) override
Definition: client_callback.h:501
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
bool ok() const
Is the status OK?
Definition: status.h:118
void StartCall() override
Definition: client_callback.h:764
Definition: client_callback.h:143
void StartCall()
Definition: client_callback.h:314
virtual void OnDone(const Status &s)
Notifies the application that all operations associated with this RPC have completed and provides the...
Definition: client_callback.h:268
void AddMultipleHolds(int holds)
Definition: client_callback.h:318
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue_impl.h:103
Did it work? If it didn't, why?
Definition: status.h:31
virtual void OnReadDone(bool ok)
Notifies the application that a StartRead operation completed.
Definition: client_callback.h:283
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:122
Definition: callback_common.h:68
virtual void OnWriteDone(bool ok)
Notifies the application that a StartWrite operation completed.
Definition: client_callback.h:289
void AddHold()
Definition: client_callback.h:348
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
Definition: client_callback.h:607
void RemoveHold()
Definition: client_callback.h:350
Definition: call_op_set.h:749
void StartCall()
Definition: client_callback.h:378
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:137
void AddHold()
Definition: client_callback.h:317
void StartWriteLast(const Request *req, WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback.h:228
void MaybeFinish()
Definition: client_callback.h:622
virtual void OnReadInitialMetadataDone(bool ok)
Notifies the application that a read of initial metadata from the server is done. ...
Definition: client_callback.h:277
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:158
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:380
void RemoveHold()
Definition: client_callback.h:319
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:322
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, const Request *request, Response *response, ::grpc::experimental::ClientUnaryReactor *reactor)
Definition: client_callback.h:1002
void StartCall() override
Definition: client_callback.h:929
void StartWrite(const Request *req)
Definition: client_callback.h:339
Straightforward wrapping of the C call object.
Definition: call.h:38
void MaybeFinish()
Definition: client_callback.h:954
void StartCall()
Definition: client_callback.h:338
void CallbackUnaryCall(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(Status)> on_completion)
Perform a callback-based unary call TODO(vjpai): Combine as much as possible with the blocking unary ...
Definition: client_callback.h:46