34 #ifndef GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
35 #define GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
94 virtual bool Read(R* msg) = 0;
158 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
162 ops.SendInitialMetadata(context->send_initial_metadata_,
163 context->initial_metadata_flags());
166 ops.ClientSendClose();
175 ops.RecvInitialMetadata(context_);
187 if (!context_->initial_metadata_received_) {
188 ops.RecvInitialMetadata(context_);
190 ops.RecvMessage(msg);
192 return cq_.Pluck(&ops) && ops.got_message;
198 ops.ClientRecvStatus(context_, &status);
230 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
231 finish_ops_.RecvMessage(response);
232 finish_ops_.AllowNoMessage();
234 if (!context_->initial_metadata_corked_) {
236 ops.SendInitialMetadata(context->send_initial_metadata_,
237 context->initial_metadata_flags());
247 ops.RecvInitialMetadata(context_);
260 ops.ClientSendClose();
262 if (context_->initial_metadata_corked_) {
263 ops.SendInitialMetadata(context_->send_initial_metadata_,
264 context_->initial_metadata_flags());
267 if (!ops.SendMessage(msg, options).ok()) {
272 return cq_.Pluck(&ops);
277 ops.ClientSendClose();
279 return cq_.Pluck(&ops);
285 if (!context_->initial_metadata_received_) {
286 finish_ops_.RecvInitialMetadata(context_);
288 finish_ops_.ClientRecvStatus(context_, &status);
304 template <
class W,
class R>
322 template <
class W,
class R>
328 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
329 if (!context_->initial_metadata_corked_) {
331 ops.SendInitialMetadata(context->send_initial_metadata_,
332 context->initial_metadata_flags());
342 ops.RecvInitialMetadata(context_);
354 if (!context_->initial_metadata_received_) {
355 ops.RecvInitialMetadata(context_);
357 ops.RecvMessage(msg);
359 return cq_.Pluck(&ops) && ops.got_message;
370 ops.ClientSendClose();
372 if (context_->initial_metadata_corked_) {
373 ops.SendInitialMetadata(context_->send_initial_metadata_,
374 context_->initial_metadata_flags());
377 if (!ops.SendMessage(msg, options).ok()) {
382 return cq_.Pluck(&ops);
387 ops.ClientSendClose();
389 return cq_.Pluck(&ops);
394 if (!context_->initial_metadata_received_) {
395 ops.RecvInitialMetadata(context_);
398 ops.ClientRecvStatus(context_, &status);
424 ops.SendInitialMetadata(ctx_->initial_metadata_,
425 ctx_->initial_metadata_flags());
429 ctx_->sent_initial_metadata_ =
true;
431 call_->
cq()->Pluck(&ops);
441 ops.RecvMessage(msg);
443 return call_->
cq()->Pluck(&ops) && ops.got_message;
465 ops.SendInitialMetadata(ctx_->initial_metadata_,
466 ctx_->initial_metadata_flags());
470 ctx_->sent_initial_metadata_ =
true;
472 call_->
cq()->Pluck(&ops);
481 if (!ops.SendMessage(msg, options).ok()) {
484 if (!ctx_->sent_initial_metadata_) {
485 ops.SendInitialMetadata(ctx_->initial_metadata_,
486 ctx_->initial_metadata_flags());
490 ctx_->sent_initial_metadata_ =
true;
493 return call_->
cq()->Pluck(&ops);
502 template <
class W,
class R>
509 template <
class W,
class R>
510 class ServerReaderWriterBody final {
513 : call_(call), ctx_(ctx) {}
519 ops.SendInitialMetadata(ctx_->initial_metadata_,
520 ctx_->initial_metadata_flags());
524 ctx_->sent_initial_metadata_ =
true;
526 call_->
cq()->Pluck(&ops);
536 ops.RecvMessage(msg);
538 return call_->
cq()->Pluck(&ops) && ops.got_message;
546 if (!ops.SendMessage(msg, options).ok()) {
549 if (!ctx_->sent_initial_metadata_) {
550 ops.SendInitialMetadata(ctx_->initial_metadata_,
551 ctx_->initial_metadata_flags());
555 ctx_->sent_initial_metadata_ =
true;
558 return call_->
cq()->Pluck(&ops);
568 template <
class W,
class R>
576 return body_.NextMessageSize(sz);
579 bool Read(R* msg)
override {
return body_.Read(msg); }
583 return body_.Write(msg, options);
599 template <
class RequestType,
class ResponseType>
604 : body_(call, ctx), read_done_(false), write_done_(false) {}
612 bool Read(RequestType* request)
override {
617 return body_.
Read(request);
622 if (write_done_ || !read_done_) {
626 return body_.
Write(response, options);
640 template <
class RequestType,
class ResponseType>
645 : body_(call, ctx), read_done_(false) {}
653 bool Read(RequestType* request)
override {
658 return body_.
Read(request);
663 return read_done_ && body_.
Write(response, options);
673 #endif // GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
Definition: channel_interface.h:49
Client-side interface for streaming writes of message of type W.
Definition: sync_stream.h:212
grpc_compression_level compression_level() const
Definition: server_context.h:131
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call.h:135
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:579
virtual void WaitForInitialMetadata()=0
Blocking wait for initial metadata from server.
Common interface for all synchronous server side streaming.
Definition: sync_stream.h:67
bool Write(const W &msg, WriteOptions options)
Definition: sync_stream.h:541
Server-side interface for bi-directional streaming.
Definition: sync_stream.h:503
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:641
ServerReaderWriterBody(Call *call, ServerContext *ctx)
Definition: sync_stream.h:512
bool NextMessageSize(uint32_t *sz)
Definition: sync_stream.h:529
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:347
Client-side interface for streaming reads of message of type R.
Definition: sync_stream.h:141
Definition: completion_queue.h:73
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call.h:174
virtual ~ServerStreamingInterface()
Definition: sync_stream.h:69
An interface that yields a sequence of messages of type R.
Definition: sync_stream.h:77
bool Write(const ResponseType &response, WriteOptions options) override
Blocking write msg to the stream with WriteOptions options.
Definition: sync_stream.h:621
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:122
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:600
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:185
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:420
virtual ~ReaderInterface()
Definition: sync_stream.h:79
void set_initial_metadata_corked(bool corked)
Flag whether the initial metadata should be corked.
Definition: client_context.h:291
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:647
bool Read(R *msg)
Definition: sync_stream.h:534
Server-side interface for streaming reads of message of type R.
Definition: sync_stream.h:412
Status Finish() override
Wait until the stream finishes, and return the final status.
Definition: sync_stream.h:195
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:439
bool Write(const ResponseType &response, WriteOptions options) override
Blocking write msg to the stream with WriteOptions options.
Definition: sync_stream.h:662
bool WritesDone() override
Half close writing from the client.
Definition: sync_stream.h:275
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:352
Definition: client_context.h:154
Definition: completion_queue.h:68
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:180
bool Write(const W &msg, WriteOptions options) override
Blocking write msg to the stream with WriteOptions options.
Definition: sync_stream.h:363
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:649
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:573
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:575
void SendInitialMetadata()
Definition: sync_stream.h:515
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:461
ServerReader(Call *call, ServerContext *ctx)
Definition: sync_stream.h:418
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:608
bool Read(RequestType *request) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:612
bool Write(const W &msg, WriteOptions options) override
Blocking write msg to the stream with WriteOptions options.
Definition: sync_stream.h:582
virtual ~ClientStreamingInterface()
Definition: sync_stream.h:51
Server-side interface for streaming writes of message of type W.
Definition: sync_stream.h:453
virtual bool NextMessageSize(uint32_t *sz)=0
Upper bound on the next message size available for reading on this stream.
ClientReader(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const W &request)
Blocking create a stream and write the first request out.
Definition: sync_stream.h:156
CompletionQueue * cq() const
Definition: call.h:696
int max_receive_message_size() const
Definition: call.h:698
bool Write(const W &msg, WriteOptions options) override
Blocking write msg to the stream with WriteOptions options.
Definition: sync_stream.h:476
virtual bool WritesDone()=0
Half close writing from the client.
Codegen interface for grpc::Channel.
Definition: channel_interface.h:64
virtual bool Write(const W &msg, WriteOptions options)=0
Blocking write msg to the stream with WriteOptions options.
virtual void SendInitialMetadata()=0
Blocking send initial metadata to client.
Primary implementaiton of CallOpSetInterface.
Definition: call.h:623
Status Finish() override
Read the final response and wait for the final status.
Definition: sync_stream.h:283
Definition: server_context.h:94
ServerSplitStreamer(Call *call, ServerContext *ctx)
Definition: sync_stream.h:644
Per-message write options.
Definition: call.h:95
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:606
ClientReaderWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context)
Blocking create a stream.
Definition: sync_stream.h:326
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:434
virtual void WaitForInitialMetadata()=0
Blocking wait for initial metadata from server.
virtual bool WritesDone()=0
Block until currently-pending writes are completed.
bool Write(const W &msg)
Blocking write msg to the stream with default write options.
Definition: sync_stream.h:118
Definition: completion_queue.h:70
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue.h:101
ServerUnaryStreamer(Call *call, ServerContext *ctx)
Definition: sync_stream.h:603
bool WritesDone() override
Block until currently-pending writes are completed.
Definition: sync_stream.h:385
Definition: rpc_method.h:43
Status Finish() override
Wait until the stream finishes, and return the final status.
Definition: sync_stream.h:392
virtual Status Finish()=0
Wait until the stream finishes, and return the final status.
void WaitForInitialMetadata() override
Blocking wait for initial metadata from server.
Definition: sync_stream.h:338
ServerReaderWriter(Call *call, ServerContext *ctx)
Definition: sync_stream.h:571
Definition: sync_stream.h:569
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:691
An interface that can be fed a sequence of messages of type W.
Definition: sync_stream.h:99
Client-side interface for bi-directional streaming.
Definition: sync_stream.h:305
Did it work? If it didn't, why?
Definition: status.h:45
void WaitForInitialMetadata() override
Blocking wait for initial metadata from server.
Definition: sync_stream.h:171
virtual bool Read(R *msg)=0
Blocking read a message and parse to msg.
Common interface for all synchronous client side streaming.
Definition: sync_stream.h:49
ServerWriter(Call *call, ServerContext *ctx)
Definition: sync_stream.h:459
bool Read(RequestType *request) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:653
void WriteLast(const W &msg, WriteOptions options)
Write msg and coalesce it with the writing of trailing metadata, using WriteOptions options...
Definition: sync_stream.h:134
void WaitForInitialMetadata()
Definition: sync_stream.h:243
Definition: channel_interface.h:53
bool compression_level_set() const
Definition: server_context.h:140
Definition: channel_interface.h:51
ClientWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, R *response)
Blocking create a stream.
Definition: sync_stream.h:228
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call.h:190
virtual ~WriterInterface()
Definition: sync_stream.h:101
bool Write(const W &msg, WriteOptions options) override
Blocking write msg to the stream with WriteOptions options.
Definition: sync_stream.h:253