19 #ifndef GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H 20 #define GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H 77 virtual void SendInitialMetadata() = 0;
88 virtual bool NextMessageSize(uint32_t* sz) = 0;
100 virtual bool Read(R* msg) = 0;
116 virtual bool Write(
const W& msg,
WriteOptions options) = 0;
156 virtual void WaitForInitialMetadata() = 0;
165 const ::grpc::internal::RpcMethod& method,
190 ops.RecvInitialMetadata(context_);
191 call_.PerformOps(&ops);
196 *sz = call_.max_receive_message_size();
209 if (!context_->initial_metadata_received_) {
210 ops.RecvInitialMetadata(context_);
213 call_.PerformOps(&ops);
214 return cq_.Pluck(&ops) && ops.got_message;
225 ops.ClientRecvStatus(context_, &status);
226 call_.PerformOps(&ops);
242 const ::grpc::internal::RpcMethod& method,
248 call_(channel->CreateCall(method, context, &cq_)) {
253 ops.SendInitialMetadata(context->send_initial_metadata_,
254 context->initial_metadata_flags());
257 ops.ClientSendClose();
274 virtual bool WritesDone() = 0;
283 const ::grpc::internal::RpcMethod& method,
307 ops.RecvInitialMetadata(context_);
318 using ::grpc::internal::WriterInterface<W>::Write;
321 ::grpc::internal::CallOpSendMessage,
327 ops.ClientSendClose();
329 if (context_->initial_metadata_corked_) {
330 ops.SendInitialMetadata(context_->send_initial_metadata_,
331 context_->initial_metadata_flags());
332 context_->set_initial_metadata_corked(
false);
334 if (!ops.SendMessage(msg, options).ok()) {
339 return cq_.Pluck(&ops);
344 ops.ClientSendClose();
346 return cq_.Pluck(&ops);
357 if (!context_->initial_metadata_received_) {
358 finish_ops_.RecvInitialMetadata(context_);
360 finish_ops_.ClientRecvStatus(context_, &status);
376 const ::grpc::internal::RpcMethod& method,
382 call_(channel->CreateCall(method, context, &cq_)) {
383 finish_ops_.RecvMessage(response);
384 finish_ops_.AllowNoMessage();
386 if (!context_->initial_metadata_corked_) {
389 ops.SendInitialMetadata(context->send_initial_metadata_,
390 context->initial_metadata_flags());
408 template <
class W,
class R>
417 virtual void WaitForInitialMetadata() = 0;
425 virtual bool WritesDone() = 0;
429 template <
class W,
class R>
434 const ::grpc::internal::RpcMethod& method,
ClientContext* context) {
444 template <
class W,
class R>
458 ops.RecvInitialMetadata(context_);
476 if (!context_->initial_metadata_received_) {
477 ops.RecvInitialMetadata(context_);
479 ops.RecvMessage(msg);
481 return cq_.Pluck(&ops) && ops.got_message;
489 using ::grpc::internal::WriterInterface<W>::Write;
492 ::grpc::internal::CallOpSendMessage,
498 ops.ClientSendClose();
500 if (context_->initial_metadata_corked_) {
501 ops.SendInitialMetadata(context_->send_initial_metadata_,
502 context_->initial_metadata_flags());
503 context_->set_initial_metadata_corked(
false);
505 if (!ops.SendMessage(msg, options).ok()) {
510 return cq_.Pluck(&ops);
515 ops.ClientSendClose();
517 return cq_.Pluck(&ops);
529 if (!context_->initial_metadata_received_) {
530 ops.RecvInitialMetadata(context_);
533 ops.ClientRecvStatus(context_, &status);
550 const ::grpc::internal::RpcMethod& method,
556 call_(channel->CreateCall(method, context, &cq_)) {
557 if (!context_->initial_metadata_corked_) {
560 ops.SendInitialMetadata(context->send_initial_metadata_,
561 context->initial_metadata_flags());
586 ops.SendInitialMetadata(ctx_->initial_metadata_,
587 ctx_->initial_metadata_flags());
588 if (ctx_->compression_level_set()) {
589 ops.set_compression_level(ctx_->compression_level());
591 ctx_->sent_initial_metadata_ =
true;
593 call_->
cq()->Pluck(&ops);
603 ops.RecvMessage(msg);
605 return call_->
cq()->Pluck(&ops) && ops.got_message;
612 template <
class ServiceType,
class RequestType,
class ResponseType>
616 : call_(call), ctx_(ctx) {}
638 ops.SendInitialMetadata(ctx_->initial_metadata_,
639 ctx_->initial_metadata_flags());
640 if (ctx_->compression_level_set()) {
641 ops.set_compression_level(ctx_->compression_level());
643 ctx_->sent_initial_metadata_ =
true;
645 call_->
cq()->Pluck(&ops);
659 if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) {
662 if (!ctx_->sent_initial_metadata_) {
663 ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_,
664 ctx_->initial_metadata_flags());
665 if (ctx_->compression_level_set()) {
666 ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
668 ctx_->sent_initial_metadata_ =
true;
675 ctx_->has_pending_ops_ =
true;
678 ctx_->has_pending_ops_ =
false;
679 return call_->
cq()->Pluck(&ctx_->pending_ops_);
686 template <
class ServiceType,
class RequestType,
class ResponseType>
690 : call_(call), ctx_(ctx) {}
694 template <
class W,
class R>
701 template <
class W,
class R>
702 class ServerReaderWriterBody final {
705 : call_(call), ctx_(ctx) {}
711 ops.SendInitialMetadata(ctx_->initial_metadata_,
712 ctx_->initial_metadata_flags());
713 if (ctx_->compression_level_set()) {
714 ops.set_compression_level(ctx_->compression_level());
716 ctx_->sent_initial_metadata_ =
true;
718 call_->
cq()->Pluck(&ops);
728 ops.RecvMessage(msg);
730 return call_->
cq()->Pluck(&ops) && ops.got_message;
737 if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) {
740 if (!ctx_->sent_initial_metadata_) {
741 ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_,
742 ctx_->initial_metadata_flags());
743 if (ctx_->compression_level_set()) {
744 ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
746 ctx_->sent_initial_metadata_ =
true;
753 ctx_->has_pending_ops_ =
true;
756 ctx_->has_pending_ops_ =
false;
757 return call_->
cq()->Pluck(&ctx_->pending_ops_);
771 template <
class W,
class R>
780 return body_.NextMessageSize(sz);
783 bool Read(R* msg)
override {
return body_.Read(msg); }
792 return body_.Write(msg, options);
801 : body_(call, ctx) {}
812 template <
class RequestType,
class ResponseType>
824 return body_.NextMessageSize(sz);
837 bool Read(RequestType* request)
override {
842 return body_.Read(request);
854 if (write_done_ || !read_done_) {
858 return body_.Write(response, options);
869 : body_(call, ctx), read_done_(false), write_done_(false) {}
877 template <
class RequestType,
class ResponseType>
889 return body_.NextMessageSize(sz);
902 bool Read(RequestType* request)
override {
907 return body_.Read(request);
919 return read_done_ && body_.Write(response, options);
929 : body_(call, ctx), read_done_(false) {}
934 #endif // GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H Synchronous (blocking) client-side API for doing server-streaming RPCs, where the stream of messages ...
Definition: channel_interface.h:32
A wrapper class of an application provided server streaming handler.
Definition: byte_buffer.h:47
Client-side interface for streaming writes of message type W.
Definition: sync_stream.h:265
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:83
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call.h:123
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:138
bool Read(R *msg) override
Block to read a message and parse to msg.
Definition: sync_stream.h:783
void WriteLast(const W &msg, WriteOptions options)
Write msg and coalesce it with the writing of trailing metadata, using WriteOptions options...
Definition: sync_stream.h:140
Definition: sync_stream.h:161
bool Write(const W &msg, WriteOptions options)
Definition: sync_stream.h:733
Server-side interface for bi-directional streaming.
Definition: sync_stream.h:695
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:878
ServerReaderWriterBody(Call *call, ServerContext *ctx)
Definition: sync_stream.h:704
Common interface for all synchronous client side streaming.
Definition: sync_stream.h:35
bool NextMessageSize(uint32_t *sz)
Definition: sync_stream.h:721
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the next message size available for reading on this stream. ...
Definition: sync_stream.h:463
Client-side interface for streaming reads of message of type R.
Definition: sync_stream.h:149
Definition: completion_queue.h:58
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call.h:162
bool Write(const ResponseType &response, WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:853
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:813
Definition: sync_stream.h:430
Primary implementation of CallOpSetInterface.
Definition: call.h:619
bool Read(R *msg) override
See the ReaderInterface.Read method for semantics.
Definition: sync_stream.h:205
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:582
CompletionQueue * cq() const
Definition: call.h:681
virtual ~ClientStreamingInterface()
Definition: sync_stream.h:37
void SendInitialMetadata() override
Block to send initial metadata to client.
Definition: sync_stream.h:885
bool Write(const W &msg)
Block to write msg to the stream with default write options.
Definition: sync_stream.h:124
bool Read(R *msg)
Definition: sync_stream.h:726
static ClientWriter< W > * Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, R *response)
Definition: sync_stream.h:282
#define GRPC_CQ_CURRENT_VERSION
Definition: grpc_types.h:651
virtual Status Finish()=0
Block waiting until the stream finishes and a final status of the call is available.
Server-side interface for streaming reads of message of type R.
Definition: sync_stream.h:570
Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:222
bool Read(R *msg) override
Block to read a message and parse to msg.
Definition: sync_stream.h:601
bool Write(const ResponseType &response, WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:918
bool WritesDone() override
Half close writing from the client.
Definition: sync_stream.h:342
bool Read(R *msg) override
See the ReaderInterface.Read method for semantics.
Definition: sync_stream.h:472
Definition: grpc_types.h:652
An interface that can be fed a sequence of messages of type W.
Definition: sync_stream.h:105
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:162
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:53
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the next message size available for reading on this stream. ...
Definition: sync_stream.h:195
bool Write(const W &msg, WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:490
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the request message size from the client.
Definition: sync_stream.h:888
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:777
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the next message size available for reading on this stream. ...
Definition: sync_stream.h:779
void SendInitialMetadata()
Definition: sync_stream.h:707
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:634
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the request message size from the client.
Definition: sync_stream.h:823
bool Read(RequestType *request) override
Read a message of type R into msg.
Definition: sync_stream.h:837
bool Write(const W &msg, WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:791
Events are popped out by calling grpc_completion_queue_pluck() API ONLY.
Definition: grpc_types.h:648
Server-side interface for streaming writes of message of type W.
Definition: sync_stream.h:621
bool Write(const W &msg, WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:654
An Alarm posts the user provided tag to its associated completion queue upon expiry or cancellation...
Definition: alarm.h:31
A wrapper class of an application provided client streaming handler.
Definition: completion_queue.h:76
An interface that yields a sequence of messages of type R.
Definition: sync_stream.h:82
Codegen interface for grpc::Channel.
Definition: channel_interface.h:55
The completion queue will have an associated pollset and there is no restriction on the type of file ...
Definition: grpc_types.h:628
Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:355
Definition: byte_buffer.h:41
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:96
Definition: sync_stream.h:279
Per-message write options.
Definition: call.h:83
void SendInitialMetadata() override
Block to send initial metadata to client.
Definition: sync_stream.h:820
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the next message size available for reading on this stream. ...
Definition: sync_stream.h:596
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs, where the outgoing message stream coming from the server has messages of type W.
Definition: completion_queue.h:55
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue.h:94
bool WritesDone() override
Half close writing from the client.
Definition: sync_stream.h:513
Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:525
void RecvMessage(R *message)
Definition: call.h:328
void WaitForInitialMetadata() override
Block waiting to read initial metadata from the server.
Definition: sync_stream.h:453
int max_receive_message_size() const
Definition: call.h:683
Common interface for all synchronous server side streaming.
Definition: sync_stream.h:67
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:772
Client-side interface for bi-directional streaming with client-to-server stream messages of type W an...
Definition: sync_stream.h:409
Did it work? If it didn't, why?
Definition: status.h:31
static ClientReaderWriter< W, R > * Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context)
Definition: sync_stream.h:432
void WaitForInitialMetadata() override
See the ClientStreamingInterface.WaitForInitialMetadata method for semantics.
Definition: sync_stream.h:185
virtual ~WriterInterface()
Definition: sync_stream.h:107
bool Read(RequestType *request) override
Read a message of type R into msg.
Definition: sync_stream.h:902
void WaitForInitialMetadata()
See the ClientStreamingInterface.WaitForInitialMetadata method for semantics.
Definition: sync_stream.h:302
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:676
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call.h:187
Synchronous (blocking) client-side API for bi-directional streaming RPCs, where the outgoing message ...
Definition: channel_interface.h:36
virtual ~ServerStreamingInterface()
Definition: sync_stream.h:69
virtual ~ReaderInterface()
Definition: sync_stream.h:84
static ClientReader< R > * Create(ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ClientContext *context, const W &request)
Definition: sync_stream.h:164
Synchronous (blocking) client-side API for doing client-streaming RPCs, where the outgoing message st...
Definition: channel_interface.h:34
Straightforward wrapping of the C call object.
Definition: call.h:660
bool Write(const W &msg, WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:319