19 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H 20 #define GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H 31 class CompletionQueue;
101 virtual void Read(R* msg,
void* tag) = 0;
122 virtual void Write(
const W& msg,
void* tag) = 0;
139 virtual void Write(
const W& msg,
WriteOptions options,
void* tag) = 0;
185 const ::grpc::internal::RpcMethod& method,
187 bool start,
void* tag) {
203 static void operator delete(
void* ptr, std::size_t size) {
212 static void operator delete(
void*,
void*) { assert(0); }
217 StartCallInternal(tag);
232 meta_ops_.set_output_tag(tag);
233 meta_ops_.RecvInitialMetadata(context_);
234 call_.PerformOps(&meta_ops_);
237 void Read(R* msg,
void* tag)
override {
239 read_ops_.set_output_tag(tag);
240 if (!context_->initial_metadata_received_) {
241 read_ops_.RecvInitialMetadata(context_);
243 read_ops_.RecvMessage(msg);
244 call_.PerformOps(&read_ops_);
254 finish_ops_.set_output_tag(tag);
255 if (!context_->initial_metadata_received_) {
256 finish_ops_.RecvInitialMetadata(context_);
258 finish_ops_.ClientRecvStatus(context_, status);
259 call_.PerformOps(&finish_ops_);
266 const W& request,
bool start,
void* tag)
267 : context_(context), call_(call), started_(start) {
270 init_ops_.ClientSendClose();
272 StartCallInternal(tag);
274 assert(tag ==
nullptr);
278 void StartCallInternal(
void* tag) {
279 init_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
280 context_->initial_metadata_flags());
281 init_ops_.set_output_tag(tag);
282 call_.PerformOps(&init_ops_);
312 virtual void WritesDone(
void* tag) = 0;
333 const ::grpc::internal::RpcMethod& method,
335 bool start,
void* tag) {
351 static void operator delete(
void* ptr, std::size_t size) {
360 static void operator delete(
void*,
void*) { assert(0); }
365 StartCallInternal(tag);
379 meta_ops_.set_output_tag(tag);
380 meta_ops_.RecvInitialMetadata(context_);
381 call_.PerformOps(&meta_ops_);
384 void Write(
const W& msg,
void* tag)
override {
386 write_ops_.set_output_tag(tag);
389 call_.PerformOps(&write_ops_);
394 write_ops_.set_output_tag(tag);
397 write_ops_.ClientSendClose();
401 call_.PerformOps(&write_ops_);
406 write_ops_.set_output_tag(tag);
407 write_ops_.ClientSendClose();
408 call_.PerformOps(&write_ops_);
420 finish_ops_.set_output_tag(tag);
421 if (!context_->initial_metadata_received_) {
422 finish_ops_.RecvInitialMetadata(context_);
424 finish_ops_.ClientRecvStatus(context_, status);
425 call_.PerformOps(&finish_ops_);
432 R* response,
bool start,
void* tag)
433 : context_(context), call_(call), started_(start) {
434 finish_ops_.RecvMessage(response);
435 finish_ops_.AllowNoMessage();
437 StartCallInternal(tag);
439 assert(tag ==
nullptr);
443 void StartCallInternal(
void* tag) {
444 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
445 context_->initial_metadata_flags());
448 if (!context_->initial_metadata_corked_) {
449 write_ops_.set_output_tag(tag);
450 call_.PerformOps(&write_ops_);
472 template <
class W,
class R>
482 virtual void WritesDone(
void* tag) = 0;
486 template <
class W,
class R>
498 const ::grpc::internal::RpcMethod& method,
ClientContext* context,
499 bool start,
void* tag) {
513 template <
class W,
class R>
518 static void operator delete(
void* ptr, std::size_t size) {
527 static void operator delete(
void*,
void*) { assert(0); }
532 StartCallInternal(tag);
546 meta_ops_.set_output_tag(tag);
547 meta_ops_.RecvInitialMetadata(context_);
548 call_.PerformOps(&meta_ops_);
551 void Read(R* msg,
void* tag)
override {
553 read_ops_.set_output_tag(tag);
554 if (!context_->initial_metadata_received_) {
555 read_ops_.RecvInitialMetadata(context_);
557 read_ops_.RecvMessage(msg);
558 call_.PerformOps(&read_ops_);
561 void Write(
const W& msg,
void* tag)
override {
563 write_ops_.set_output_tag(tag);
566 call_.PerformOps(&write_ops_);
571 write_ops_.set_output_tag(tag);
574 write_ops_.ClientSendClose();
578 call_.PerformOps(&write_ops_);
583 write_ops_.set_output_tag(tag);
584 write_ops_.ClientSendClose();
585 call_.PerformOps(&write_ops_);
594 finish_ops_.set_output_tag(tag);
595 if (!context_->initial_metadata_received_) {
596 finish_ops_.RecvInitialMetadata(context_);
598 finish_ops_.ClientRecvStatus(context_, status);
599 call_.PerformOps(&finish_ops_);
605 bool start,
void* tag)
606 : context_(context), call_(call), started_(start) {
608 StartCallInternal(tag);
610 assert(tag ==
nullptr);
614 void StartCallInternal(
void* tag) {
615 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
616 context_->initial_metadata_flags());
619 if (!context_->initial_metadata_corked_) {
620 write_ops_.set_output_tag(tag);
621 call_.PerformOps(&write_ops_);
642 template <
class W,
class R>
669 virtual void Finish(
const W& msg,
const Status& status,
void* tag) = 0;
692 virtual void FinishWithError(
const Status& status,
void* tag) = 0;
698 template <
class W,
class R>
702 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
712 meta_ops_.set_output_tag(tag);
713 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
714 ctx_->initial_metadata_flags());
715 if (ctx_->compression_level_set()) {
716 meta_ops_.set_compression_level(ctx_->compression_level());
718 ctx_->sent_initial_metadata_ =
true;
719 call_.PerformOps(&meta_ops_);
722 void Read(R* msg,
void* tag)
override {
723 read_ops_.set_output_tag(tag);
724 read_ops_.RecvMessage(msg);
725 call_.PerformOps(&read_ops_);
740 finish_ops_.set_output_tag(tag);
741 if (!ctx_->sent_initial_metadata_) {
742 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
743 ctx_->initial_metadata_flags());
744 if (ctx_->compression_level_set()) {
745 finish_ops_.set_compression_level(ctx_->compression_level());
747 ctx_->sent_initial_metadata_ =
true;
751 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
752 finish_ops_.SendMessage(msg));
754 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
756 call_.PerformOps(&finish_ops_);
770 finish_ops_.set_output_tag(tag);
771 if (!ctx_->sent_initial_metadata_) {
772 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
773 ctx_->initial_metadata_flags());
774 if (ctx_->compression_level_set()) {
775 finish_ops_.set_compression_level(ctx_->compression_level());
777 ctx_->sent_initial_metadata_ =
true;
779 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
780 call_.PerformOps(&finish_ops_);
823 virtual void Finish(
const Status& status,
void* tag) = 0;
839 virtual void WriteAndFinish(
const W& msg,
WriteOptions options,
840 const Status& status,
void* tag) = 0;
849 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
861 meta_ops_.set_output_tag(tag);
862 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
863 ctx_->initial_metadata_flags());
864 if (ctx_->compression_level_set()) {
865 meta_ops_.set_compression_level(ctx_->compression_level());
867 ctx_->sent_initial_metadata_ =
true;
868 call_.PerformOps(&meta_ops_);
871 void Write(
const W& msg,
void* tag)
override {
872 write_ops_.set_output_tag(tag);
873 EnsureInitialMetadataSent(&write_ops_);
876 call_.PerformOps(&write_ops_);
880 write_ops_.set_output_tag(tag);
885 EnsureInitialMetadataSent(&write_ops_);
888 call_.PerformOps(&write_ops_);
902 void* tag)
override {
903 write_ops_.set_output_tag(tag);
904 EnsureInitialMetadataSent(&write_ops_);
907 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
908 call_.PerformOps(&write_ops_);
923 finish_ops_.set_output_tag(tag);
924 EnsureInitialMetadataSent(&finish_ops_);
925 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
926 call_.PerformOps(&finish_ops_);
933 void EnsureInitialMetadataSent(T* ops) {
934 if (!ctx_->sent_initial_metadata_) {
935 ops->SendInitialMetadata(&ctx_->initial_metadata_,
936 ctx_->initial_metadata_flags());
937 if (ctx_->compression_level_set()) {
938 ops->set_compression_level(ctx_->compression_level());
940 ctx_->sent_initial_metadata_ =
true;
958 template <
class W,
class R>
986 virtual void Finish(
const Status& status,
void* tag) = 0;
1002 virtual void WriteAndFinish(
const W& msg,
WriteOptions options,
1003 const Status& status,
void* tag) = 0;
1010 template <
class W,
class R>
1015 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
1027 meta_ops_.set_output_tag(tag);
1028 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1029 ctx_->initial_metadata_flags());
1030 if (ctx_->compression_level_set()) {
1031 meta_ops_.set_compression_level(ctx_->compression_level());
1033 ctx_->sent_initial_metadata_ =
true;
1034 call_.PerformOps(&meta_ops_);
1037 void Read(R* msg,
void* tag)
override {
1038 read_ops_.set_output_tag(tag);
1039 read_ops_.RecvMessage(msg);
1040 call_.PerformOps(&read_ops_);
1043 void Write(
const W& msg,
void* tag)
override {
1044 write_ops_.set_output_tag(tag);
1045 EnsureInitialMetadataSent(&write_ops_);
1048 call_.PerformOps(&write_ops_);
1052 write_ops_.set_output_tag(tag);
1056 EnsureInitialMetadataSent(&write_ops_);
1058 call_.PerformOps(&write_ops_);
1073 void* tag)
override {
1074 write_ops_.set_output_tag(tag);
1075 EnsureInitialMetadataSent(&write_ops_);
1078 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1079 call_.PerformOps(&write_ops_);
1094 finish_ops_.set_output_tag(tag);
1095 EnsureInitialMetadataSent(&finish_ops_);
1097 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1098 call_.PerformOps(&finish_ops_);
1102 friend class ::grpc::Server;
1107 void EnsureInitialMetadataSent(T* ops) {
1108 if (!ctx_->sent_initial_metadata_) {
1109 ops->SendInitialMetadata(&ctx_->initial_metadata_,
1110 ctx_->initial_metadata_flags());
1111 if (ctx_->compression_level_set()) {
1112 ops->set_compression_level(ctx_->compression_level());
1114 ctx_->sent_initial_metadata_ =
true;
1134 #endif // GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H Async client-side interface for bi-directional streaming, where the outgoing message stream going to ...
Definition: async_stream.h:514
virtual ~AsyncReaderInterface()
Definition: async_stream.h:86
void Read(R *msg, void *tag) override
Read a message of type R into msg.
Definition: async_stream.h:722
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:141
void Read(R *msg, void *tag) override
Read a message of type R into msg.
Definition: async_stream.h:237
static ClientAsyncWriter< W > * Create(ChannelInterface *channel, CompletionQueue *cq, const ::grpc::internal::RpcMethod &method, ClientContext *context, R *response, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:331
static ClientAsyncReaderWriter< W, R > * Create(ChannelInterface *channel, CompletionQueue *cq, const ::grpc::internal::RpcMethod &method, ClientContext *context, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:496
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:827
Async server-side API for doing bidirectional streaming RPCs, where the incoming message stream comin...
Definition: async_stream.h:1011
void FinishWithError(const Status &status, void *tag) override
See the ServerAsyncReaderInterface.Read method for semantics.
Definition: async_stream.h:768
void Write(const W &msg, WriteOptions options, void *tag) override
Request the writing of msg using WriteOptions options with identifying tag tag.
Definition: async_stream.h:1051
void Finish(Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:592
Definition: async_stream.h:643
Definition: service_type.h:39
virtual void StartCall(void *tag)=0
Start the call that was set up by the constructor, but only if the constructor was invoked through th...
void Finish(Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:252
grpc_call * call() const
Definition: call.h:70
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:174
Definition: async_stream.h:317
Definition: async_stream.h:173
void Write(const W &msg, void *tag) override
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:871
Async server-side API for doing server streaming RPCs, where the outgoing message stream from the ser...
Definition: async_stream.h:846
static ClientAsyncReader< R > * Create(ChannelInterface *channel, CompletionQueue *cq, const ::grpc::internal::RpcMethod &method, ClientContext *context, const W &request, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:183
void Finish(const W &msg, const Status &status, void *tag) override
See the ServerAsyncReaderInterface.Read method for semantics.
Definition: async_stream.h:739
An interface that yields a sequence of messages of type R.
Definition: async_stream.h:84
void Finish(const Status &status, void *tag) override
See the ServerAsyncReaderWriterInterface.Finish method for semantics.
Definition: async_stream.h:1093
Definition: call_op_set.h:635
void Write(const W &msg, WriteOptions options, void *tag) override
Request the writing of msg using WriteOptions options with identifying tag tag.
Definition: async_stream.h:569
ServerAsyncReaderWriter(ServerContext *ctx)
Definition: async_stream.h:1014
Definition: async_stream.h:167
Definition: async_stream.h:487
Definition: call_op_set.h:293
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:1024
virtual void Finish(Status *status, void *tag)=0
Indicate that the stream is to be finished and request notification for when the call has been ended...
Async client-side interface for bi-directional streaming, where the client-to-server message stream h...
Definition: async_stream.h:473
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void StartCall(void *tag) override
Start the call that was set up by the constructor, but only if the constructor was invoked through th...
Definition: async_stream.h:362
void StartCall(void *tag) override
Start the call that was set up by the constructor, but only if the constructor was invoked through th...
Definition: async_stream.h:214
Codegen interface for grpc::Channel.
Definition: channel_interface.h:65
ServerAsyncWriter(ServerContext *ctx)
Definition: async_stream.h:848
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:50
virtual void ReadInitialMetadata(void *tag)=0
Request notification of the reading of the initial metadata.
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:858
void WriteLast(const W &msg, WriteOptions options, void *tag)
Request the writing of msg and coalesce it with the writing of trailing metadata, using WriteOptions ...
Definition: async_stream.h:159
Async API on the client side for doing client-streaming RPCs, where the outgoing message stream going...
Definition: async_stream.h:348
Server-side interface for asynchronous bi-directional streaming.
Definition: async_stream.h:959
Definition: byte_buffer.h:41
void WritesDone(void *tag) override
Signal the client is done with the writes (half-close the client stream).
Definition: async_stream.h:581
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:109
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:709
Per-message write options.
Definition: call_op_set.h:85
void Write(const W &msg, WriteOptions options, void *tag) override
Request the writing of msg using WriteOptions options with identifying tag tag.
Definition: async_stream.h:879
void Write(const W &msg, void *tag) override
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:384
void WriteAndFinish(const W &msg, WriteOptions options, const Status &status, void *tag) override
See the ServerAsyncReaderWriterInterface.WriteAndFinish method for semantics.
Definition: async_stream.h:1072
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue.h:95
Definition: call_op_set.h:600
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
bool ok() const
Is the status OK?
Definition: status.h:118
An interface that can be fed a sequence of messages of type W.
Definition: async_stream.h:106
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics.
Definition: async_stream.h:228
virtual ~ClientAsyncStreamingInterface()
Definition: async_stream.h:37
void Write(const W &msg, WriteOptions options, void *tag) override
Request the writing of msg using WriteOptions options with identifying tag tag.
Definition: async_stream.h:392
Did it work? If it didn't, why?
Definition: status.h:31
void Finish(const Status &status, void *tag) override
See the ServerAsyncWriterInterface.Finish method for semantics.
Definition: async_stream.h:922
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics of this method...
Definition: async_stream.h:542
Definition: async_stream.h:798
void Finish(Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:418
void WritesDone(void *tag) override
Signal the client is done with the writes (half-close the client stream).
Definition: async_stream.h:404
Definition: call_op_set.h:522
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
void Write(const W &msg, void *tag) override
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:561
Definition: call_op_set.h:750
Async client-side API for doing server-streaming RPCs, where the incoming message stream coming from ...
Definition: async_stream.h:200
virtual ~AsyncWriterInterface()
Definition: async_stream.h:108
void Read(R *msg, void *tag) override
Read a message of type R into msg.
Definition: async_stream.h:1037
void Write(const W &msg, void *tag) override
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:1043
void Read(R *msg, void *tag) override
Read a message of type R into msg.
Definition: async_stream.h:551
void WriteAndFinish(const W &msg, WriteOptions options, const Status &status, void *tag) override
See the ServerAsyncWriterInterface.WriteAndFinish method for semantics.
Definition: async_stream.h:901
void StartCall(void *tag) override
Start the call that was set up by the constructor, but only if the constructor was invoked through th...
Definition: async_stream.h:529
Async server-side API for doing client-streaming RPCs, where the incoming message stream from the cli...
Definition: async_stream.h:699
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics.
Definition: async_stream.h:375
Common interface for all client side asynchronous streaming.
Definition: async_stream.h:35
Straightforward wrapping of the C call object.
Definition: call.h:36
ServerAsyncReader(ServerContext *ctx)
Definition: async_stream.h:701
Common interface for client side asynchronous writing.
Definition: async_stream.h:304