19 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H 20 #define GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H 31 class CompletionQueue;
101 virtual void Read(R* msg,
void* tag) = 0;
119 virtual void Write(
const W& msg,
void* tag) = 0;
133 virtual void Write(
const W& msg,
WriteOptions options,
void* tag) = 0;
176 const ::grpc::internal::RpcMethod& method,
178 bool start,
void* tag) {
194 static void operator delete(
void* ptr, std::size_t size) {
203 static void operator delete(
void*,
void*) { assert(0); }
208 StartCallInternal(tag);
223 meta_ops_.set_output_tag(tag);
224 meta_ops_.RecvInitialMetadata(context_);
225 call_.PerformOps(&meta_ops_);
228 void Read(R* msg,
void* tag)
override {
230 read_ops_.set_output_tag(tag);
231 if (!context_->initial_metadata_received_) {
232 read_ops_.RecvInitialMetadata(context_);
234 read_ops_.RecvMessage(msg);
235 call_.PerformOps(&read_ops_);
245 finish_ops_.set_output_tag(tag);
246 if (!context_->initial_metadata_received_) {
247 finish_ops_.RecvInitialMetadata(context_);
249 finish_ops_.ClientRecvStatus(context_, status);
250 call_.PerformOps(&finish_ops_);
257 const W& request,
bool start,
void* tag)
258 : context_(context), call_(call), started_(start) {
261 init_ops_.ClientSendClose();
263 StartCallInternal(tag);
265 assert(tag ==
nullptr);
269 void StartCallInternal(
void* tag) {
270 init_ops_.SendInitialMetadata(context_->send_initial_metadata_,
271 context_->initial_metadata_flags());
272 init_ops_.set_output_tag(tag);
273 call_.PerformOps(&init_ops_);
303 virtual void WritesDone(
void* tag) = 0;
324 const ::grpc::internal::RpcMethod& method,
326 bool start,
void* tag) {
342 static void operator delete(
void* ptr, std::size_t size) {
351 static void operator delete(
void*,
void*) { assert(0); }
356 StartCallInternal(tag);
370 meta_ops_.set_output_tag(tag);
371 meta_ops_.RecvInitialMetadata(context_);
372 call_.PerformOps(&meta_ops_);
375 void Write(
const W& msg,
void* tag)
override {
377 write_ops_.set_output_tag(tag);
380 call_.PerformOps(&write_ops_);
385 write_ops_.set_output_tag(tag);
388 write_ops_.ClientSendClose();
392 call_.PerformOps(&write_ops_);
397 write_ops_.set_output_tag(tag);
398 write_ops_.ClientSendClose();
399 call_.PerformOps(&write_ops_);
411 finish_ops_.set_output_tag(tag);
412 if (!context_->initial_metadata_received_) {
413 finish_ops_.RecvInitialMetadata(context_);
415 finish_ops_.ClientRecvStatus(context_, status);
416 call_.PerformOps(&finish_ops_);
423 R* response,
bool start,
void* tag)
424 : context_(context), call_(call), started_(start) {
425 finish_ops_.RecvMessage(response);
426 finish_ops_.AllowNoMessage();
428 StartCallInternal(tag);
430 assert(tag ==
nullptr);
434 void StartCallInternal(
void* tag) {
435 write_ops_.SendInitialMetadata(context_->send_initial_metadata_,
436 context_->initial_metadata_flags());
439 if (!context_->initial_metadata_corked_) {
440 write_ops_.set_output_tag(tag);
441 call_.PerformOps(&write_ops_);
463 template <
class W,
class R>
473 virtual void WritesDone(
void* tag) = 0;
477 template <
class W,
class R>
489 const ::grpc::internal::RpcMethod& method,
ClientContext* context,
490 bool start,
void* tag) {
504 template <
class W,
class R>
509 static void operator delete(
void* ptr, std::size_t size) {
518 static void operator delete(
void*,
void*) { assert(0); }
523 StartCallInternal(tag);
537 meta_ops_.set_output_tag(tag);
538 meta_ops_.RecvInitialMetadata(context_);
539 call_.PerformOps(&meta_ops_);
542 void Read(R* msg,
void* tag)
override {
544 read_ops_.set_output_tag(tag);
545 if (!context_->initial_metadata_received_) {
546 read_ops_.RecvInitialMetadata(context_);
548 read_ops_.RecvMessage(msg);
549 call_.PerformOps(&read_ops_);
552 void Write(
const W& msg,
void* tag)
override {
554 write_ops_.set_output_tag(tag);
557 call_.PerformOps(&write_ops_);
562 write_ops_.set_output_tag(tag);
565 write_ops_.ClientSendClose();
569 call_.PerformOps(&write_ops_);
574 write_ops_.set_output_tag(tag);
575 write_ops_.ClientSendClose();
576 call_.PerformOps(&write_ops_);
585 finish_ops_.set_output_tag(tag);
586 if (!context_->initial_metadata_received_) {
587 finish_ops_.RecvInitialMetadata(context_);
589 finish_ops_.ClientRecvStatus(context_, status);
590 call_.PerformOps(&finish_ops_);
596 bool start,
void* tag)
597 : context_(context), call_(call), started_(start) {
599 StartCallInternal(tag);
601 assert(tag ==
nullptr);
605 void StartCallInternal(
void* tag) {
606 write_ops_.SendInitialMetadata(context_->send_initial_metadata_,
607 context_->initial_metadata_flags());
610 if (!context_->initial_metadata_corked_) {
611 write_ops_.set_output_tag(tag);
612 call_.PerformOps(&write_ops_);
633 template <
class W,
class R>
657 virtual void Finish(
const W& msg,
const Status& status,
void* tag) = 0;
677 virtual void FinishWithError(
const Status& status,
void* tag) = 0;
683 template <
class W,
class R>
687 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
697 meta_ops_.set_output_tag(tag);
698 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
699 ctx_->initial_metadata_flags());
700 if (ctx_->compression_level_set()) {
701 meta_ops_.set_compression_level(ctx_->compression_level());
703 ctx_->sent_initial_metadata_ =
true;
704 call_.PerformOps(&meta_ops_);
707 void Read(R* msg,
void* tag)
override {
708 read_ops_.set_output_tag(tag);
709 read_ops_.RecvMessage(msg);
710 call_.PerformOps(&read_ops_);
722 finish_ops_.set_output_tag(tag);
723 if (!ctx_->sent_initial_metadata_) {
724 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
725 ctx_->initial_metadata_flags());
726 if (ctx_->compression_level_set()) {
727 finish_ops_.set_compression_level(ctx_->compression_level());
729 ctx_->sent_initial_metadata_ =
true;
733 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
734 finish_ops_.SendMessage(msg));
736 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
738 call_.PerformOps(&finish_ops_);
749 finish_ops_.set_output_tag(tag);
750 if (!ctx_->sent_initial_metadata_) {
751 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
752 ctx_->initial_metadata_flags());
753 if (ctx_->compression_level_set()) {
754 finish_ops_.set_compression_level(ctx_->compression_level());
756 ctx_->sent_initial_metadata_ =
true;
758 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
759 call_.PerformOps(&finish_ops_);
799 virtual void Finish(
const Status& status,
void* tag) = 0;
812 virtual void WriteAndFinish(
const W& msg,
WriteOptions options,
813 const Status& status,
void* tag) = 0;
822 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
834 meta_ops_.set_output_tag(tag);
835 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
836 ctx_->initial_metadata_flags());
837 if (ctx_->compression_level_set()) {
838 meta_ops_.set_compression_level(ctx_->compression_level());
840 ctx_->sent_initial_metadata_ =
true;
841 call_.PerformOps(&meta_ops_);
844 void Write(
const W& msg,
void* tag)
override {
845 write_ops_.set_output_tag(tag);
846 EnsureInitialMetadataSent(&write_ops_);
849 call_.PerformOps(&write_ops_);
853 write_ops_.set_output_tag(tag);
858 EnsureInitialMetadataSent(&write_ops_);
861 call_.PerformOps(&write_ops_);
872 void* tag)
override {
873 write_ops_.set_output_tag(tag);
874 EnsureInitialMetadataSent(&write_ops_);
877 write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
878 call_.PerformOps(&write_ops_);
890 finish_ops_.set_output_tag(tag);
891 EnsureInitialMetadataSent(&finish_ops_);
892 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
893 call_.PerformOps(&finish_ops_);
900 void EnsureInitialMetadataSent(T* ops) {
901 if (!ctx_->sent_initial_metadata_) {
902 ops->SendInitialMetadata(ctx_->initial_metadata_,
903 ctx_->initial_metadata_flags());
904 if (ctx_->compression_level_set()) {
905 ops->set_compression_level(ctx_->compression_level());
907 ctx_->sent_initial_metadata_ =
true;
925 template <
class W,
class R>
950 virtual void Finish(
const Status& status,
void* tag) = 0;
963 virtual void WriteAndFinish(
const W& msg,
WriteOptions options,
964 const Status& status,
void* tag) = 0;
971 template <
class W,
class R>
976 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
988 meta_ops_.set_output_tag(tag);
989 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
990 ctx_->initial_metadata_flags());
991 if (ctx_->compression_level_set()) {
992 meta_ops_.set_compression_level(ctx_->compression_level());
994 ctx_->sent_initial_metadata_ =
true;
995 call_.PerformOps(&meta_ops_);
998 void Read(R* msg,
void* tag)
override {
999 read_ops_.set_output_tag(tag);
1000 read_ops_.RecvMessage(msg);
1001 call_.PerformOps(&read_ops_);
1004 void Write(
const W& msg,
void* tag)
override {
1005 write_ops_.set_output_tag(tag);
1006 EnsureInitialMetadataSent(&write_ops_);
1009 call_.PerformOps(&write_ops_);
1013 write_ops_.set_output_tag(tag);
1017 EnsureInitialMetadataSent(&write_ops_);
1019 call_.PerformOps(&write_ops_);
1031 void* tag)
override {
1032 write_ops_.set_output_tag(tag);
1033 EnsureInitialMetadataSent(&write_ops_);
1036 write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
1037 call_.PerformOps(&write_ops_);
1049 finish_ops_.set_output_tag(tag);
1050 EnsureInitialMetadataSent(&finish_ops_);
1052 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
1053 call_.PerformOps(&finish_ops_);
1057 friend class ::grpc::Server;
1062 void EnsureInitialMetadataSent(T* ops) {
1063 if (!ctx_->sent_initial_metadata_) {
1064 ops->SendInitialMetadata(ctx_->initial_metadata_,
1065 ctx_->initial_metadata_flags());
1066 if (ctx_->compression_level_set()) {
1067 ops->set_compression_level(ctx_->compression_level());
1069 ctx_->sent_initial_metadata_ =
true;
1089 #endif // GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H Async client-side interface for bi-directional streaming, where the outgoing message stream going to ...
Definition: async_stream.h:505
virtual ~AsyncReaderInterface()
Definition: async_stream.h:86
void Read(R *msg, void *tag) override
Read a message of type R into msg.
Definition: async_stream.h:707
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call.h:121
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:138
void Read(R *msg, void *tag) override
Read a message of type R into msg.
Definition: async_stream.h:228
static ClientAsyncWriter< W > * Create(ChannelInterface *channel, CompletionQueue *cq, const ::grpc::internal::RpcMethod &method, ClientContext *context, R *response, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:322
static ClientAsyncReaderWriter< W, R > * Create(ChannelInterface *channel, CompletionQueue *cq, const ::grpc::internal::RpcMethod &method, ClientContext *context, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:487
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call.h:160
Primary implementation of CallOpSetInterface.
Definition: call.h:618
Async server-side API for doing bidirectional streaming RPCs, where the incoming message stream comin...
Definition: async_stream.h:972
void FinishWithError(const Status &status, void *tag) override
See the ServerAsyncReaderInterface.Read method for semantics.
Definition: async_stream.h:747
void Write(const W &msg, WriteOptions options, void *tag) override
Request the writing of msg using WriteOptions options with identifying tag tag.
Definition: async_stream.h:1012
void Finish(Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:583
Definition: async_stream.h:634
Definition: service_type.h:39
virtual void StartCall(void *tag)=0
Start the call that was set up by the constructor, but only if the constructor was invoked through th...
void Finish(Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:243
grpc_call * call() const
Definition: call.h:688
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:164
Definition: async_stream.h:308
Definition: async_stream.h:164
void Write(const W &msg, void *tag) override
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:844
Async server-side API for doing server streaming RPCs, where the outgoing message stream from the ser...
Definition: async_stream.h:819
static ClientAsyncReader< R > * Create(ChannelInterface *channel, CompletionQueue *cq, const ::grpc::internal::RpcMethod &method, ClientContext *context, const W &request, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:174
void Finish(const W &msg, const Status &status, void *tag) override
See the ServerAsyncReaderInterface.Read method for semantics.
Definition: async_stream.h:721
An interface that yields a sequence of messages of type R.
Definition: async_stream.h:84
void Finish(const Status &status, void *tag) override
See the ServerAsyncReaderWriterInterface.Finish method for semantics.
Definition: async_stream.h:1048
void Write(const W &msg, WriteOptions options, void *tag) override
Request the writing of msg using WriteOptions options with identifying tag tag.
Definition: async_stream.h:560
ServerAsyncReaderWriter(ServerContext *ctx)
Definition: async_stream.h:975
Definition: async_stream.h:158
Definition: async_stream.h:478
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:985
virtual void Finish(Status *status, void *tag)=0
Indicate that the stream is to be finished and request notification for when the call has been ended...
Async client-side interface for bi-directional streaming, where the client-to-server message stream h...
Definition: async_stream.h:464
An Alarm posts the user provided tag to its associated completion queue upon expiry or cancellation...
Definition: alarm.h:31
void StartCall(void *tag) override
Start the call that was set up by the constructor, but only if the constructor was invoked through th...
Definition: async_stream.h:353
void StartCall(void *tag) override
Start the call that was set up by the constructor, but only if the constructor was invoked through th...
Definition: async_stream.h:205
Codegen interface for grpc::Channel.
Definition: channel_interface.h:57
ServerAsyncWriter(ServerContext *ctx)
Definition: async_stream.h:821
CoreCodegenInterface * g_core_codegen_interface
Definition: call.h:46
virtual void ReadInitialMetadata(void *tag)=0
Request notification of the reading of the initial metadata.
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:831
void WriteLast(const W &msg, WriteOptions options, void *tag)
Request the writing of msg and coalesce it with the writing of trailing metadata, using WriteOptions ...
Definition: async_stream.h:150
Async API on the client side for doing client-streaming RPCs, where the outgoing message stream going...
Definition: async_stream.h:339
Server-side interface for asynchronous bi-directional streaming.
Definition: async_stream.h:926
Definition: byte_buffer.h:41
void WritesDone(void *tag) override
Signal the client is done with the writes (half-close the client stream).
Definition: async_stream.h:572
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:97
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:694
Per-message write options.
Definition: call.h:81
void Write(const W &msg, WriteOptions options, void *tag) override
Request the writing of msg using WriteOptions options with identifying tag tag.
Definition: async_stream.h:852
void Write(const W &msg, void *tag) override
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:375
void WriteAndFinish(const W &msg, WriteOptions options, const Status &status, void *tag) override
See the ServerAsyncReaderWriterInterface.WriteAndFinish method for semantics.
Definition: async_stream.h:1030
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue.h:95
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
bool ok() const
Is the status OK?
Definition: status.h:118
An interface that can be fed a sequence of messages of type W.
Definition: async_stream.h:106
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics.
Definition: async_stream.h:219
virtual ~ClientAsyncStreamingInterface()
Definition: async_stream.h:37
void Write(const W &msg, WriteOptions options, void *tag) override
Request the writing of msg using WriteOptions options with identifying tag tag.
Definition: async_stream.h:383
Did it work? If it didn't, why?
Definition: status.h:31
void Finish(const Status &status, void *tag) override
See the ServerAsyncWriterInterface.Finish method for semantics.
Definition: async_stream.h:889
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics of this method...
Definition: async_stream.h:533
Definition: async_stream.h:777
void Finish(Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:409
void WritesDone(void *tag) override
Signal the client is done with the writes (half-close the client stream).
Definition: async_stream.h:395
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call.h:185
void Write(const W &msg, void *tag) override
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:552
Async client-side API for doing server-streaming RPCs, where the incoming message stream coming from ...
Definition: async_stream.h:191
virtual ~AsyncWriterInterface()
Definition: async_stream.h:108
void Read(R *msg, void *tag) override
Read a message of type R into msg.
Definition: async_stream.h:998
void Write(const W &msg, void *tag) override
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:1004
void Read(R *msg, void *tag) override
Read a message of type R into msg.
Definition: async_stream.h:542
void WriteAndFinish(const W &msg, WriteOptions options, const Status &status, void *tag) override
See the ServerAsyncWriterInterface.WriteAndFinish method for semantics.
Definition: async_stream.h:871
void StartCall(void *tag) override
Start the call that was set up by the constructor, but only if the constructor was invoked through th...
Definition: async_stream.h:520
Async server-side API for doing client-streaming RPCs, where the incoming message stream from the cli...
Definition: async_stream.h:684
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics.
Definition: async_stream.h:366
Common interface for all client side asynchronous streaming.
Definition: async_stream.h:35
Straightforward wrapping of the C call object.
Definition: call.h:668
ServerAsyncReader(ServerContext *ctx)
Definition: async_stream.h:686
Common interface for client side asynchronous writing.
Definition: async_stream.h:295