19 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H 20 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H 36 class CompletionQueue;
43 template <
class InputMessage,
class OutputMessage>
46 OutputMessage* result,
47 std::function<
void(
Status)> on_completion) {
49 channel, method, context, request, result, on_completion);
52 template <
class InputMessage,
class OutputMessage>
57 OutputMessage* result,
58 std::function<
void(
Status)> on_completion) {
61 Call call(channel->CreateCall(method, context, cq));
69 call.call(),
sizeof(FullCallOpSet))) FullCallOpSet;
76 Status s = ops->SendMessagePtr(request);
81 ops->SendInitialMetadata(&context->send_initial_metadata_,
82 context->initial_metadata_flags());
83 ops->RecvInitialMetadata(context);
84 ops->RecvMessage(result);
85 ops->AllowNoMessage();
86 ops->ClientSendClose();
87 ops->ClientRecvStatus(context, tag->status_ptr());
88 ops->set_core_cq_tag(tag);
94 namespace experimental {
97 template <
class Request,
class Response>
99 template <
class Response>
101 template <
class Request>
107 template <
class Request,
class Response>
111 virtual void StartCall() = 0;
112 virtual void Write(
const Request* req,
WriteOptions options) = 0;
113 virtual void WritesDone() = 0;
114 virtual void Read(Response* resp) = 0;
115 virtual void AddHold(
int holds) = 0;
116 virtual void RemoveHold() = 0;
120 reactor->BindStream(
this);
124 template <
class Response>
128 virtual void StartCall() = 0;
129 virtual void Read(Response* resp) = 0;
130 virtual void AddHold(
int holds) = 0;
131 virtual void RemoveHold() = 0;
135 reactor->BindReader(
this);
139 template <
class Request>
143 virtual void StartCall() = 0;
145 virtual void Write(
const Request* req,
WriteOptions options) = 0;
149 virtual void WritesDone() = 0;
151 virtual void AddHold(
int holds) = 0;
152 virtual void RemoveHold() = 0;
156 reactor->BindWriter(
this);
169 template <
class Request,
class Response>
202 stream_->Write(req, std::move(options));
295 template <
class Response>
319 template <
class Request>
327 writer_->Write(req, std::move(options));
354 template <
class Request,
class Response>
355 class ClientCallbackReaderWriterFactory;
356 template <
class Response>
357 class ClientCallbackReaderFactory;
358 template <
class Request>
359 class ClientCallbackWriterFactory;
361 template <
class Request,
class Response>
367 static void operator delete(
void* ptr, std::size_t size) {
376 static void operator delete(
void*,
void*) { assert(0); }
379 if (--callbacks_outstanding_ == 0) {
380 Status s = std::move(finish_status_);
381 auto* reactor = reactor_;
382 auto* call = call_.call();
397 start_tag_.Set(call_.call(),
399 reactor_->OnReadInitialMetadataDone(ok);
403 if (!start_corked_) {
404 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
405 context_->initial_metadata_flags());
407 start_ops_.RecvInitialMetadata(context_);
408 start_ops_.set_core_cq_tag(&start_tag_);
409 call_.PerformOps(&start_ops_);
413 write_tag_.Set(call_.call(),
415 reactor_->OnWriteDone(ok);
419 write_ops_.set_core_cq_tag(&write_tag_);
421 read_tag_.Set(call_.call(),
423 reactor_->OnReadDone(ok);
427 read_ops_.set_core_cq_tag(&read_tag_);
428 if (read_ops_at_start_) {
429 call_.PerformOps(&read_ops_);
432 if (write_ops_at_start_) {
433 call_.PerformOps(&write_ops_);
436 if (writes_done_ops_at_start_) {
437 call_.PerformOps(&writes_done_ops_);
440 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
442 finish_ops_.ClientRecvStatus(context_, &finish_status_);
443 finish_ops_.set_core_cq_tag(&finish_tag_);
444 call_.PerformOps(&finish_ops_);
447 void Read(Response* msg)
override {
448 read_ops_.RecvMessage(msg);
449 callbacks_outstanding_++;
451 call_.PerformOps(&read_ops_);
453 read_ops_at_start_ =
true;
459 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
460 context_->initial_metadata_flags());
461 start_corked_ =
false;
466 write_ops_.ClientSendClose();
470 callbacks_outstanding_++;
472 call_.PerformOps(&write_ops_);
474 write_ops_at_start_ =
true;
479 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
480 context_->initial_metadata_flags());
481 start_corked_ =
false;
483 writes_done_ops_.ClientSendClose();
484 writes_done_tag_.Set(call_.call(),
486 reactor_->OnWritesDoneDone(ok);
490 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
491 callbacks_outstanding_++;
493 call_.PerformOps(&writes_done_ops_);
495 writes_done_ops_at_start_ =
true;
499 virtual void AddHold(
int holds)
override { callbacks_outstanding_ += holds; }
511 start_corked_(context_->initial_metadata_corked_) {
512 this->BindReactor(reactor);
530 bool write_ops_at_start_{
false};
534 bool writes_done_ops_at_start_{
false};
538 bool read_ops_at_start_{
false};
541 std::atomic_int callbacks_outstanding_{2};
542 bool started_{
false};
545 template <
class Request,
class Response>
552 Call call = channel->CreateCall(method, context, channel->CallbackCQ());
562 template <
class Response>
567 static void operator delete(
void* ptr, std::size_t size) {
576 static void operator delete(
void*,
void*) { assert(0); }
579 if (--callbacks_outstanding_ == 0) {
580 Status s = std::move(finish_status_);
581 auto* reactor = reactor_;
582 auto* call = call_.call();
596 start_tag_.Set(call_.call(),
598 reactor_->OnReadInitialMetadataDone(ok);
602 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
603 context_->initial_metadata_flags());
604 start_ops_.RecvInitialMetadata(context_);
605 start_ops_.set_core_cq_tag(&start_tag_);
606 call_.PerformOps(&start_ops_);
609 read_tag_.Set(call_.call(),
611 reactor_->OnReadDone(ok);
615 read_ops_.set_core_cq_tag(&read_tag_);
616 if (read_ops_at_start_) {
617 call_.PerformOps(&read_ops_);
620 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
622 finish_ops_.ClientRecvStatus(context_, &finish_status_);
623 finish_ops_.set_core_cq_tag(&finish_tag_);
624 call_.PerformOps(&finish_ops_);
627 void Read(Response* msg)
override {
628 read_ops_.RecvMessage(msg);
629 callbacks_outstanding_++;
631 call_.PerformOps(&read_ops_);
633 read_ops_at_start_ =
true;
637 virtual void AddHold(
int holds)
override { callbacks_outstanding_ += holds; }
643 template <
class Request>
647 : context_(context), call_(call), reactor_(reactor) {
648 this->BindReactor(reactor);
651 start_ops_.ClientSendClose();
669 bool read_ops_at_start_{
false};
672 std::atomic_int callbacks_outstanding_{2};
673 bool started_{
false};
676 template <
class Response>
679 template <
class Request>
684 Call call = channel->CreateCall(method, context, channel->CallbackCQ());
693 template <
class Request>
698 static void operator delete(
void* ptr, std::size_t size) {
707 static void operator delete(
void*,
void*) { assert(0); }
710 if (--callbacks_outstanding_ == 0) {
711 Status s = std::move(finish_status_);
712 auto* reactor = reactor_;
713 auto* call = call_.
call();
727 start_tag_.Set(call_.call(),
729 reactor_->OnReadInitialMetadataDone(ok);
733 if (!start_corked_) {
734 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
735 context_->initial_metadata_flags());
737 start_ops_.RecvInitialMetadata(context_);
738 start_ops_.set_core_cq_tag(&start_tag_);
739 call_.PerformOps(&start_ops_);
743 write_tag_.Set(call_.call(),
745 reactor_->OnWriteDone(ok);
749 write_ops_.set_core_cq_tag(&write_tag_);
751 if (write_ops_at_start_) {
752 call_.PerformOps(&write_ops_);
755 if (writes_done_ops_at_start_) {
756 call_.PerformOps(&writes_done_ops_);
759 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
761 finish_ops_.ClientRecvStatus(context_, &finish_status_);
762 finish_ops_.set_core_cq_tag(&finish_tag_);
763 call_.PerformOps(&finish_ops_);
768 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
769 context_->initial_metadata_flags());
770 start_corked_ =
false;
775 write_ops_.ClientSendClose();
779 callbacks_outstanding_++;
781 call_.PerformOps(&write_ops_);
783 write_ops_at_start_ =
true;
788 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
789 context_->initial_metadata_flags());
790 start_corked_ =
false;
792 writes_done_ops_.ClientSendClose();
793 writes_done_tag_.Set(call_.call(),
795 reactor_->OnWritesDoneDone(ok);
799 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
800 callbacks_outstanding_++;
802 call_.PerformOps(&writes_done_ops_);
804 writes_done_ops_at_start_ =
true;
808 virtual void AddHold(
int holds)
override { callbacks_outstanding_ += holds; }
814 template <
class Response>
821 start_corked_(context_->initial_metadata_corked_) {
822 this->BindReactor(reactor);
823 finish_ops_.RecvMessage(response);
824 finish_ops_.AllowNoMessage();
842 bool write_ops_at_start_{
false};
846 bool writes_done_ops_at_start_{
false};
849 std::atomic_int callbacks_outstanding_{2};
850 bool started_{
false};
853 template <
class Request>
856 template <
class Response>
861 Call call = channel->CreateCall(method, context, channel->CallbackCQ());
873 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
virtual void AddHold(int holds) override
Definition: client_callback.h:637
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:110
void StartCall() override
Definition: client_callback.h:589
virtual ~ClientWriteReactor()
Definition: client_callback.h:322
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:339
Definition: channel_interface.h:56
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:126
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:144
virtual void grpc_call_ref(grpc_call *call)=0
virtual ~ClientCallbackReader()
Definition: client_callback.h:127
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback.h:246
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:165
virtual void RemoveHold() override
Definition: client_callback.h:500
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback.h:100
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, Response *response, ::grpc::experimental::ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:857
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:828
virtual ~ClientReadReactor()
Definition: client_callback.h:298
virtual void grpc_call_unref(grpc_call *call)=0
void WritesDone() override
Definition: client_callback.h:786
void StartRead(Response *resp)
Definition: client_callback.h:301
CallbackUnaryCallImpl(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(Status)> on_completion)
Definition: client_callback.h:55
Definition: client_callback.h:108
Definition: channel_interface.h:60
void Write(const Request *req)
Definition: client_callback.h:144
void StartCall() override
Definition: client_callback.h:389
void Read(Response *msg) override
Definition: client_callback.h:447
Definition: channel_interface.h:46
void RemoveHold()
Definition: client_callback.h:248
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, ::grpc::experimental::ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:548
grpc_call * call() const
Definition: call.h:70
void WriteLast(const Request *req, WriteOptions options)
Definition: client_callback.h:146
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback.h:223
virtual void OnWriteDone(bool ok)
Definition: client_callback.h:340
virtual void OnDone(const Status &s)
Definition: client_callback.h:338
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:174
void Read(Response *msg) override
Definition: client_callback.h:627
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, const Request *request, ::grpc::experimental::ClientReadReactor< Response > *reactor)
Definition: client_callback.h:680
virtual ~ClientCallbackWriter()
Definition: client_callback.h:142
virtual void RemoveHold() override
Definition: client_callback.h:809
virtual void OnDone(const Status &s)
Definition: client_callback.h:307
Descriptor of an RPC method.
Definition: rpc_method.h:29
void StartWrite(const Request *req, WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback.h:201
void AddMultipleHolds(int holds)
Definition: client_callback.h:247
virtual void AddHold(int holds) override
Definition: client_callback.h:808
Definition: client_callback.h:125
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback.h:193
void Write(const Request *msg, WriteOptions options) override
Definition: client_callback.h:766
virtual void OnReadDone(bool ok)
Definition: client_callback.h:309
void StartWriteLast(const Request *req, WriteOptions options)
Definition: client_callback.h:329
Definition: call_op_set.h:294
virtual ~ClientBidiReactor()
Definition: client_callback.h:172
void StartWrite(const Request *req, WriteOptions options)
Definition: client_callback.h:326
void WritesDone() override
Definition: client_callback.h:477
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked)...
Definition: client_callback.h:185
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback.h:102
virtual void OnWritesDoneDone(bool ok)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback.h:283
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call...
Definition: client_callback.h:178
Codegen interface for grpc::Channel.
Definition: channel_interface.h:65
Definition: client_callback.h:362
virtual void AddHold(int holds) override
Definition: client_callback.h:499
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:51
virtual void RemoveHold() override
Definition: client_callback.h:638
void MaybeFinish()
Definition: client_callback.h:378
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback.h:98
void AddMultipleHolds(int holds)
Definition: client_callback.h:335
virtual void OnWritesDoneDone(bool ok)
Definition: client_callback.h:341
Definition: byte_buffer.h:41
Per-message write options.
Definition: call_op_set.h:86
Definition: client_callback.h:694
void MaybeFinish()
Definition: client_callback.h:709
Definition: channel_interface.h:58
void StartWritesDone()
Definition: client_callback.h:332
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue.h:97
Definition: call_op_set.h:601
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
void Write(const Request *msg, WriteOptions options) override
Definition: client_callback.h:457
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
bool ok() const
Is the status OK?
Definition: status.h:118
void StartCall() override
Definition: client_callback.h:720
Definition: client_callback.h:140
void StartCall()
Definition: client_callback.h:300
virtual void OnDone(const Status &s)
Notifies the application that all operations associated with this RPC have completed and provides the...
Definition: client_callback.h:254
void AddMultipleHolds(int holds)
Definition: client_callback.h:304
Did it work? If it didn't, why?
Definition: status.h:31
virtual void OnReadDone(bool ok)
Notifies the application that a StartRead operation completed.
Definition: client_callback.h:269
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:119
Definition: callback_common.h:68
virtual void OnWriteDone(bool ok)
Notifies the application that a StartWrite operation completed.
Definition: client_callback.h:275
void AddHold()
Definition: client_callback.h:334
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:190
Definition: client_callback.h:563
void RemoveHold()
Definition: client_callback.h:336
Definition: call_op_set.h:751
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:134
void AddHold()
Definition: client_callback.h:303
void StartWriteLast(const Request *req, WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback.h:214
void MaybeFinish()
Definition: client_callback.h:578
virtual void OnReadInitialMetadataDone(bool ok)
Notifies the application that a read of initial metadata from the server is done. ...
Definition: client_callback.h:263
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:155
void RemoveHold()
Definition: client_callback.h:305
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:308
void StartWrite(const Request *req)
Definition: client_callback.h:325
Straightforward wrapping of the C call object.
Definition: call.h:36
void StartCall()
Definition: client_callback.h:324
void CallbackUnaryCall(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(Status)> on_completion)
Perform a callback-based unary call TODO(vjpai): Combine as much as possible with the blocking unary ...
Definition: client_callback.h:44