19 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H 20 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H 36 class CompletionQueue;
43 template <
class InputMessage,
class OutputMessage>
46 OutputMessage* result,
47 std::function<
void(
Status)> on_completion) {
49 channel, method, context, request, result, on_completion);
52 template <
class InputMessage,
class OutputMessage>
57 OutputMessage* result,
58 std::function<
void(
Status)> on_completion) {
61 Call call(channel->CreateCall(method, context, cq));
69 call.call(),
sizeof(FullCallOpSet))) FullCallOpSet;
76 Status s = ops->SendMessage(*request);
81 ops->SendInitialMetadata(&context->send_initial_metadata_,
82 context->initial_metadata_flags());
83 ops->RecvInitialMetadata(context);
84 ops->RecvMessage(result);
85 ops->AllowNoMessage();
86 ops->ClientSendClose();
87 ops->ClientRecvStatus(context, tag->status_ptr());
88 ops->set_core_cq_tag(tag);
94 namespace experimental {
97 template <
class Request,
class Response>
99 template <
class Response>
101 template <
class Request>
107 template <
class Request,
class Response>
111 virtual void StartCall() = 0;
112 virtual void Write(
const Request* req,
WriteOptions options) = 0;
113 virtual void WritesDone() = 0;
114 virtual void Read(Response* resp) = 0;
118 reactor->BindStream(
this);
122 template <
class Response>
126 virtual void StartCall() = 0;
127 virtual void Read(Response* resp) = 0;
131 reactor->BindReader(
this);
135 template <
class Request>
139 virtual void StartCall() = 0;
141 virtual void Write(
const Request* req,
WriteOptions options) = 0;
145 virtual void WritesDone() = 0;
149 reactor->BindWriter(
this);
156 template <
class Request,
class Response>
170 stream_->Write(req, std::move(options));
185 template <
class Response>
202 template <
class Request>
214 writer_->Write(req, std::move(options));
232 template <
class Request,
class Response>
233 class ClientCallbackReaderWriterFactory;
234 template <
class Response>
235 class ClientCallbackReaderFactory;
236 template <
class Request>
237 class ClientCallbackWriterFactory;
239 template <
class Request,
class Response>
245 static void operator delete(
void* ptr, std::size_t size) {
254 static void operator delete(
void*,
void*) { assert(0); }
257 if (--callbacks_outstanding_ == 0) {
258 Status s = std::move(finish_status_);
259 auto* reactor = reactor_;
260 auto* call = call_.call();
276 start_tag_.Set(call_.call(),
278 reactor_->OnReadInitialMetadataDone(ok);
282 if (!start_corked_) {
283 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
284 context_->initial_metadata_flags());
286 start_ops_.RecvInitialMetadata(context_);
287 start_ops_.set_core_cq_tag(&start_tag_);
288 call_.PerformOps(&start_ops_);
292 write_tag_.Set(call_.call(),
294 reactor_->OnWriteDone(ok);
298 write_ops_.set_core_cq_tag(&write_tag_);
300 read_tag_.Set(call_.call(),
302 reactor_->OnReadDone(ok);
306 read_ops_.set_core_cq_tag(&read_tag_);
307 if (read_ops_at_start_) {
308 call_.PerformOps(&read_ops_);
311 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
313 finish_ops_.ClientRecvStatus(context_, &finish_status_);
314 finish_ops_.set_core_cq_tag(&finish_tag_);
315 call_.PerformOps(&finish_ops_);
317 if (write_ops_at_start_) {
318 call_.PerformOps(&write_ops_);
321 if (writes_done_ops_at_start_) {
322 call_.PerformOps(&writes_done_ops_);
327 void Read(Response* msg)
override {
328 read_ops_.RecvMessage(msg);
329 callbacks_outstanding_++;
331 call_.PerformOps(&read_ops_);
333 read_ops_at_start_ =
true;
339 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
340 context_->initial_metadata_flags());
341 start_corked_ =
false;
348 write_ops_.ClientSendClose();
350 callbacks_outstanding_++;
352 call_.PerformOps(&write_ops_);
354 write_ops_at_start_ =
true;
359 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
360 context_->initial_metadata_flags());
361 start_corked_ =
false;
363 writes_done_ops_.ClientSendClose();
364 writes_done_tag_.Set(call_.call(),
366 reactor_->OnWritesDoneDone(ok);
370 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
371 callbacks_outstanding_++;
373 call_.PerformOps(&writes_done_ops_);
375 writes_done_ops_at_start_ =
true;
388 start_corked_(context_->initial_metadata_corked_) {
389 this->BindReactor(reactor);
407 bool write_ops_at_start_{
false};
411 bool writes_done_ops_at_start_{
false};
415 bool read_ops_at_start_{
false};
418 std::atomic_int callbacks_outstanding_{3};
419 bool started_{
false};
422 template <
class Request,
class Response>
429 Call call = channel->CreateCall(method, context, channel->CallbackCQ());
439 template <
class Response>
444 static void operator delete(
void* ptr, std::size_t size) {
453 static void operator delete(
void*,
void*) { assert(0); }
456 if (--callbacks_outstanding_ == 0) {
457 Status s = std::move(finish_status_);
458 auto* reactor = reactor_;
459 auto* call = call_.call();
474 start_tag_.Set(call_.call(),
476 reactor_->OnReadInitialMetadataDone(ok);
480 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
481 context_->initial_metadata_flags());
482 start_ops_.RecvInitialMetadata(context_);
483 start_ops_.set_core_cq_tag(&start_tag_);
484 call_.PerformOps(&start_ops_);
487 read_tag_.Set(call_.call(),
489 reactor_->OnReadDone(ok);
493 read_ops_.set_core_cq_tag(&read_tag_);
494 if (read_ops_at_start_) {
495 call_.PerformOps(&read_ops_);
498 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
500 finish_ops_.ClientRecvStatus(context_, &finish_status_);
501 finish_ops_.set_core_cq_tag(&finish_tag_);
502 call_.PerformOps(&finish_ops_);
507 void Read(Response* msg)
override {
508 read_ops_.RecvMessage(msg);
509 callbacks_outstanding_++;
511 call_.PerformOps(&read_ops_);
513 read_ops_at_start_ =
true;
520 template <
class Request>
524 : context_(context), call_(call), reactor_(reactor) {
525 this->BindReactor(reactor);
528 start_ops_.ClientSendClose();
546 bool read_ops_at_start_{
false};
549 std::atomic_int callbacks_outstanding_{3};
550 bool started_{
false};
553 template <
class Response>
556 template <
class Request>
561 Call call = channel->CreateCall(method, context, channel->CallbackCQ());
570 template <
class Request>
575 static void operator delete(
void* ptr, std::size_t size) {
584 static void operator delete(
void*,
void*) { assert(0); }
587 if (--callbacks_outstanding_ == 0) {
588 Status s = std::move(finish_status_);
589 auto* reactor = reactor_;
590 auto* call = call_.
call();
605 start_tag_.Set(call_.call(),
607 reactor_->OnReadInitialMetadataDone(ok);
611 if (!start_corked_) {
612 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
613 context_->initial_metadata_flags());
615 start_ops_.RecvInitialMetadata(context_);
616 start_ops_.set_core_cq_tag(&start_tag_);
617 call_.PerformOps(&start_ops_);
621 write_tag_.Set(call_.call(),
623 reactor_->OnWriteDone(ok);
627 write_ops_.set_core_cq_tag(&write_tag_);
629 finish_tag_.Set(call_.call(), [
this](
bool ok) { MaybeFinish(); },
631 finish_ops_.ClientRecvStatus(context_, &finish_status_);
632 finish_ops_.set_core_cq_tag(&finish_tag_);
633 call_.PerformOps(&finish_ops_);
635 if (write_ops_at_start_) {
636 call_.PerformOps(&write_ops_);
639 if (writes_done_ops_at_start_) {
640 call_.PerformOps(&writes_done_ops_);
648 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
649 context_->initial_metadata_flags());
650 start_corked_ =
false;
657 write_ops_.ClientSendClose();
659 callbacks_outstanding_++;
661 call_.PerformOps(&write_ops_);
663 write_ops_at_start_ =
true;
668 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
669 context_->initial_metadata_flags());
670 start_corked_ =
false;
672 writes_done_ops_.ClientSendClose();
673 writes_done_tag_.Set(call_.call(),
675 reactor_->OnWritesDoneDone(ok);
679 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
680 callbacks_outstanding_++;
682 call_.PerformOps(&writes_done_ops_);
684 writes_done_ops_at_start_ =
true;
691 template <
class Response>
698 start_corked_(context_->initial_metadata_corked_) {
699 this->BindReactor(reactor);
700 finish_ops_.RecvMessage(response);
701 finish_ops_.AllowNoMessage();
719 bool write_ops_at_start_{
false};
723 bool writes_done_ops_at_start_{
false};
726 std::atomic_int callbacks_outstanding_{3};
727 bool started_{
false};
730 template <
class Request>
733 template <
class Response>
738 Call call = channel->CreateCall(method, context, channel->CallbackCQ());
750 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:110
void StartCall() override
Definition: client_callback.h:466
virtual ~ClientWriteReactor()
Definition: client_callback.h:205
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:207
Definition: channel_interface.h:56
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:141
virtual void grpc_call_ref(grpc_call *call)=0
virtual ~ClientCallbackReader()
Definition: client_callback.h:125
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
Definition: client_callback.h:100
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, Response *response, ::grpc::experimental::ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:734
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:759
virtual ~ClientReadReactor()
Definition: client_callback.h:188
virtual void grpc_call_unref(grpc_call *call)=0
void WritesDone() override
Definition: client_callback.h:666
void StartRead(Response *resp)
Definition: client_callback.h:194
CallbackUnaryCallImpl(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(Status)> on_completion)
Definition: client_callback.h:55
Definition: client_callback.h:108
Definition: channel_interface.h:60
void Write(const Request *req)
Definition: client_callback.h:140
void StartCall() override
Definition: client_callback.h:267
void Read(Response *msg) override
Definition: client_callback.h:327
Definition: channel_interface.h:46
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, ::grpc::experimental::ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:425
grpc_call * call() const
Definition: call.h:70
void WriteLast(const Request *req, WriteOptions options)
Definition: client_callback.h:142
void StartWritesDone()
Definition: client_callback.h:175
virtual void OnWriteDone(bool ok)
Definition: client_callback.h:208
virtual void OnDone(const Status &s)
Definition: client_callback.h:206
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:174
void Read(Response *msg) override
Definition: client_callback.h:507
static void Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, const Request *request, ::grpc::experimental::ClientReadReactor< Response > *reactor)
Definition: client_callback.h:557
virtual ~ClientCallbackWriter()
Definition: client_callback.h:138
virtual void OnDone(const Status &s)
Definition: client_callback.h:189
Descriptor of an RPC method.
Definition: rpc_method.h:29
void StartWrite(const Request *req, WriteOptions options)
Definition: client_callback.h:169
Definition: client_callback.h:123
void StartWrite(const Request *req)
Definition: client_callback.h:168
void Write(const Request *msg, WriteOptions options) override
Definition: client_callback.h:646
virtual void OnReadDone(bool ok)
Definition: client_callback.h:191
void StartWriteLast(const Request *req, WriteOptions options)
Definition: client_callback.h:216
Definition: call_op_set.h:293
virtual ~ClientBidiReactor()
Definition: client_callback.h:159
void StartWrite(const Request *req, WriteOptions options)
Definition: client_callback.h:213
void WritesDone() override
Definition: client_callback.h:357
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void StartRead(Response *resp)
Definition: client_callback.h:167
Definition: client_callback.h:102
virtual void OnWritesDoneDone(bool ok)
Definition: client_callback.h:164
void StartCall()
Definition: client_callback.h:166
Codegen interface for grpc::Channel.
Definition: channel_interface.h:65
Definition: client_callback.h:240
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:50
void MaybeFinish()
Definition: client_callback.h:256
Definition: client_callback.h:98
virtual void OnWritesDoneDone(bool ok)
Definition: client_callback.h:209
Definition: byte_buffer.h:41
Per-message write options.
Definition: call_op_set.h:85
Definition: client_callback.h:571
void MaybeFinish()
Definition: client_callback.h:586
Definition: channel_interface.h:58
void StartWritesDone()
Definition: client_callback.h:219
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue.h:95
Definition: call_op_set.h:532
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
void Write(const Request *msg, WriteOptions options) override
Definition: client_callback.h:337
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
bool ok() const
Is the status OK?
Definition: status.h:118
void StartCall() override
Definition: client_callback.h:597
Definition: client_callback.h:136
void StartCall()
Definition: client_callback.h:193
virtual void OnDone(const Status &s)
Definition: client_callback.h:160
Did it work? If it didn't, why?
Definition: status.h:31
virtual void OnReadDone(bool ok)
Definition: client_callback.h:162
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:117
Definition: callback_common.h:68
virtual void OnWriteDone(bool ok)
Definition: client_callback.h:163
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
Definition: client_callback.h:440
Definition: call_op_set.h:682
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:130
void StartWriteLast(const Request *req, WriteOptions options)
Definition: client_callback.h:172
void MaybeFinish()
Definition: client_callback.h:455
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:161
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:148
virtual void OnReadInitialMetadataDone(bool ok)
Definition: client_callback.h:190
void StartWrite(const Request *req)
Definition: client_callback.h:212
Straightforward wrapping of the C call object.
Definition: call.h:36
void StartCall()
Definition: client_callback.h:211
void CallbackUnaryCall(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(Status)> on_completion)
Perform a callback-based unary call TODO(vjpai): Combine as much as possible with the blocking unary ...
Definition: client_callback.h:44