Go to the documentation of this file.
18 #ifndef GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H
19 #define GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H
99 virtual bool Read(R* msg) = 0;
164 const ::grpc::internal::RpcMethod& method,
176 class ClientReader final :
public ClientReaderInterface<R> {
190 ops.RecvInitialMetadata(context_);
197 *sz = (result > 0) ? result : UINT32_MAX;
210 if (!context_->initial_metadata_received_) {
211 ops.RecvInitialMetadata(context_);
215 return cq_.Pluck(&ops) && ops.got_message;
226 ops.ClientRecvStatus(context_, &status);
243 const ::grpc::internal::RpcMethod& method,
249 call_(channel->CreateCall(method, context, &cq_)) {
254 ops.SendInitialMetadata(&context->send_initial_metadata_,
255 context->initial_metadata_flags());
258 ops.ClientSendClose();
284 const ::grpc::internal::RpcMethod& method,
295 class ClientWriter :
public ClientWriterInterface<W> {
308 ops.RecvInitialMetadata(context_);
328 ops.ClientSendClose();
330 if (context_->initial_metadata_corked_) {
331 ops.SendInitialMetadata(&context_->send_initial_metadata_,
332 context_->initial_metadata_flags());
333 context_->set_initial_metadata_corked(
false);
335 if (!ops.SendMessagePtr(&msg, options).ok()) {
340 return cq_.Pluck(&ops);
345 ops.ClientSendClose();
347 return cq_.Pluck(&ops);
358 if (!context_->initial_metadata_received_) {
359 finish_ops_.RecvInitialMetadata(context_);
361 finish_ops_.ClientRecvStatus(context_, &status);
377 const ::grpc::internal::RpcMethod& method,
383 call_(channel->CreateCall(method, context, &cq_)) {
384 finish_ops_.RecvMessage(response);
385 finish_ops_.AllowNoMessage();
387 if (!context_->initial_metadata_corked_) {
390 ops.SendInitialMetadata(&context->send_initial_metadata_,
391 context->initial_metadata_flags());
409 template <
class W,
class R>
430 template <
class W,
class R>
435 const ::grpc::internal::RpcMethod& method,
446 template <
class W,
class R>
447 class ClientReaderWriter final :
public ClientReaderWriterInterface<W, R> {
460 ops.RecvInitialMetadata(context_);
467 *sz = (result > 0) ? result : UINT32_MAX;
479 if (!context_->initial_metadata_received_) {
480 ops.RecvInitialMetadata(context_);
482 ops.RecvMessage(msg);
484 return cq_.Pluck(&ops) && ops.got_message;
501 ops.ClientSendClose();
503 if (context_->initial_metadata_corked_) {
504 ops.SendInitialMetadata(&context_->send_initial_metadata_,
505 context_->initial_metadata_flags());
506 context_->set_initial_metadata_corked(
false);
508 if (!ops.SendMessagePtr(&msg, options).ok()) {
513 return cq_.Pluck(&ops);
518 ops.ClientSendClose();
520 return cq_.Pluck(&ops);
532 if (!context_->initial_metadata_received_) {
533 ops.RecvInitialMetadata(context_);
536 ops.ClientRecvStatus(context_, &status);
553 const ::grpc::internal::RpcMethod& method,
559 call_(channel->CreateCall(method, context, &cq_)) {
560 if (!context_->initial_metadata_corked_) {
563 ops.SendInitialMetadata(&context->send_initial_metadata_,
564 context->initial_metadata_flags());
590 ops.SendInitialMetadata(&ctx_->initial_metadata_,
591 ctx_->initial_metadata_flags());
595 ctx_->sent_initial_metadata_ =
true;
597 call_->
cq()->Pluck(&ops);
602 *sz = (result > 0) ? result : UINT32_MAX;
608 ops.RecvMessage(msg);
610 return call_->
cq()->Pluck(&ops) && ops.got_message;
617 template <
class ServiceType,
class RequestType,
class ResponseType>
621 : call_(call), ctx_(ctx) {}
644 ops.SendInitialMetadata(&ctx_->initial_metadata_,
645 ctx_->initial_metadata_flags());
649 ctx_->sent_initial_metadata_ =
true;
651 call_->
cq()->Pluck(&ops);
665 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
668 if (!ctx_->sent_initial_metadata_) {
669 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
670 ctx_->initial_metadata_flags());
674 ctx_->sent_initial_metadata_ =
true;
681 ctx_->has_pending_ops_ =
true;
684 ctx_->has_pending_ops_ =
false;
685 return call_->
cq()->Pluck(&ctx_->pending_ops_);
692 template <
class ServiceType,
class RequestType,
class ResponseType>
696 : call_(call), ctx_(ctx) {}
700 template <
class W,
class R>
707 template <
class W,
class R>
708 class ServerReaderWriterBody final {
711 : call_(call), ctx_(ctx) {}
717 ops.SendInitialMetadata(&ctx_->initial_metadata_,
718 ctx_->initial_metadata_flags());
722 ctx_->sent_initial_metadata_ =
true;
724 call_->
cq()->Pluck(&ops);
729 *sz = (result > 0) ? result : UINT32_MAX;
735 ops.RecvMessage(msg);
737 return call_->
cq()->Pluck(&ops) && ops.got_message;
744 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
747 if (!ctx_->sent_initial_metadata_) {
748 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
749 ctx_->initial_metadata_flags());
753 ctx_->sent_initial_metadata_ =
true;
760 ctx_->has_pending_ops_ =
true;
763 ctx_->has_pending_ops_ =
false;
764 return call_->
cq()->Pluck(&ctx_->pending_ops_);
778 template <
class W,
class R>
787 return body_.NextMessageSize(sz);
790 bool Read(R* msg)
override {
return body_.Read(msg); }
799 return body_.Write(msg, options);
808 : body_(call, ctx) {}
819 template <
class RequestType,
class ResponseType>
844 bool Read(RequestType* request)
override {
849 return body_.
Read(request);
860 bool Write(
const ResponseType& response,
862 if (write_done_ || !read_done_) {
866 return body_.
Write(response, options);
877 : body_(call, ctx), read_done_(false), write_done_(false) {}
885 template <
class RequestType,
class ResponseType>
910 bool Read(RequestType* request)
override {
915 return body_.
Read(request);
926 bool Write(
const ResponseType& response,
928 return read_done_ && body_.
Write(response, options);
938 : body_(call, ctx), read_done_(false) {}
943 #endif // GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H
bool Write(const W &msg, ::grpc::WriteOptions options) override
Definition: sync_stream.h:493
Server-side interface for streaming writes of message of type W.
Definition: sync_stream.h:626
void SendInitialMetadata()
Definition: sync_stream.h:713
static ClientWriter< W > * Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, R *response)
Definition: sync_stream.h:283
Definition: call_op_set.h:619
Definition: call_op_set.h:526
bool WritesDone() override
Definition: sync_stream.h:516
Synchronous (blocking) client-side API for doing client-streaming RPCs, where the outgoing message st...
Definition: channel_interface.h:31
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:546
void WaitForInitialMetadata() override
See the ClientStreamingInterface.WaitForInitialMetadata method for semantics.
Definition: sync_stream.h:185
::grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:528
int max_receive_message_size() const
Definition: call.h:72
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:852
virtual ::grpc::Status Finish()=0
Block waiting until the stream finishes and a final status of the call is available.
Definition: call_op_set.h:282
bool Read(R *msg) override
See the ReaderInterface.Read method for semantics.
Definition: sync_stream.h:475
bool Write(const W &msg, ::grpc::WriteOptions options) override
Definition: sync_stream.h:320
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:156
bool Write(const W &msg)
Block to write msg to the stream with default write options.
Definition: sync_stream.h:123
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs,...
Definition: completion_queue.h:58
bool Write(const W &msg, ::grpc::WriteOptions options) override
Definition: sync_stream.h:660
virtual bool Write(const W &msg, ::grpc::WriteOptions options)=0
Block to write msg to the stream with WriteOptions options.
ServerReaderWriterBody(grpc::internal::Call *call, ::grpc::ServerContext *ctx)
Definition: sync_stream.h:710
Definition: sync_stream.h:160
Straightforward wrapping of the C call object.
Definition: call.h:35
virtual bool NextMessageSize(uint32_t *sz)=0
Get an upper bound on the next message size available for reading on this stream.
Client-side interface for bi-directional streaming with client-to-server stream messages of type W an...
Definition: sync_stream.h:410
bool Write(const W &msg, ::grpc::WriteOptions options)
Definition: sync_stream.h:740
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the request message size from the client.
Definition: sync_stream.h:896
::grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:356
static ClientReader< R > * Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const W &request)
Definition: sync_stream.h:163
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:886
Definition: completion_queue.h:61
virtual ~ServerStreamingInterface()
Definition: sync_stream.h:68
::grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:223
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:465
bool Write(const ResponseType &response, ::grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:860
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:600
bool Read(RequestType *request) override
Read a message of type R into msg.
Definition: sync_stream.h:844
Did it work? If it didn't, why?
Definition: status.h:31
@ GRPC_CQ_DEFAULT_POLLING
The completion queue will have an associated pollset and there is no restriction on the type of file ...
Definition: grpc_types.h:712
virtual void WaitForInitialMetadata()=0
Block to wait for initial metadata from server.
void set_compression_level(grpc_compression_level level)
Set level to be the compression level used for the server call.
Definition: server_context.h:240
Synchronous (blocking) client-side API for doing server-streaming RPCs, where the stream of messages ...
Definition: channel_interface.h:29
Definition: sync_stream.h:431
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:233
An interface that can be fed a sequence of messages of type W.
Definition: sync_stream.h:104
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:56
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:192
Server-side interface for bi-directional streaming.
Definition: sync_stream.h:701
virtual bool Read(R *msg)=0
Block to read a message and parse to msg.
Client-side interface for streaming writes of message type W.
Definition: sync_stream.h:266
bool Read(R *msg) override
See the ReaderInterface.Read method for semantics.
Definition: sync_stream.h:206
static ClientReaderWriter< W, R > * Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context)
Definition: sync_stream.h:433
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:195
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the request message size from the client.
Definition: sync_stream.h:830
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:786
void WaitForInitialMetadata()
See the ClientStreamingInterface.WaitForInitialMetadata method for semantics.
Definition: sync_stream.h:303
@ GRPC_CQ_PLUCK
Events are popped out by calling grpc_completion_queue_pluck() API ONLY.
Definition: grpc_types.h:732
::grpc::CompletionQueue * cq() const
Definition: call.h:70
bool Write(const W &msg, ::grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:798
Definition: grpc_types.h:763
Common interface for all synchronous client side streaming.
Definition: sync_stream.h:34
bool NextMessageSize(uint32_t *sz)
Definition: sync_stream.h:727
::google::protobuf::util::Status Status
Definition: config_protobuf.h:91
void SendInitialMetadata() override
Block to send initial metadata to client.
Definition: sync_stream.h:893
bool Read(R *msg) override
Block to read a message and parse to msg.
Definition: sync_stream.h:790
virtual void SendInitialMetadata()=0
Block to send initial metadata to client.
Per-message write options.
Definition: call_op_set.h:79
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:779
void WriteLast(const W &msg, ::grpc::WriteOptions options)
Write msg and coalesce it with the writing of trailing metadata, using WriteOptions options.
Definition: sync_stream.h:139
A wrapper class of an application provided server streaming handler.
Definition: byte_buffer.h:47
virtual ~WriterInterface()
Definition: sync_stream.h:106
A wrapper class of an application provided client streaming handler.
Definition: completion_queue.h:71
Client-side interface for streaming reads of message of type R.
Definition: sync_stream.h:148
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:75
#define GRPC_CQ_CURRENT_VERSION
Definition: grpc_types.h:761
virtual void WaitForInitialMetadata()=0
Block to wait for initial metadata from server.
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:585
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:65
bool Read(R *msg)
Definition: sync_stream.h:733
Synchronous (blocking) client-side API for bi-directional streaming RPCs, where the outgoing message ...
Definition: channel_interface.h:33
virtual ~ReaderInterface()
Definition: sync_stream.h:83
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:248
Server-side interface for streaming reads of message of type R.
Definition: sync_stream.h:573
void WaitForInitialMetadata() override
Block waiting to read initial metadata from the server.
Definition: sync_stream.h:455
Definition: call_op_set.h:769
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:117
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:102
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:784
An interface that yields a sequence of messages of type R.
Definition: sync_stream.h:81
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:639
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:820
void SendInitialMetadata() override
Block to send initial metadata to client.
Definition: sync_stream.h:827
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:181
Definition: byte_buffer.h:52
Definition: sync_stream.h:280
virtual bool WritesDone()=0
Half close writing from the client.
bool Read(RequestType *request) override
Read a message of type R into msg.
Definition: sync_stream.h:910
virtual bool WritesDone()=0
Half close writing from the client.
virtual ~ClientStreamingInterface()
Definition: sync_stream.h:36
bool WritesDone() override
Definition: sync_stream.h:343
bool Write(const ResponseType &response, ::grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:926
void RecvMessage(R *message)
Definition: call_op_set.h:426
Common interface for all synchronous server side streaming.
Definition: sync_stream.h:66
bool Read(R *msg) override
Definition: sync_stream.h:606