34 #ifndef GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
35 #define GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
94 virtual bool Read(R* msg) = 0;
140 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
144 ops.SendInitialMetadata(context->send_initial_metadata_,
145 context->initial_metadata_flags());
148 ops.ClientSendClose();
157 ops.RecvInitialMetadata(context_);
169 if (!context_->initial_metadata_received_) {
170 ops.RecvInitialMetadata(context_);
172 ops.RecvMessage(msg);
174 return cq_.Pluck(&ops) && ops.got_message;
180 ops.ClientRecvStatus(context_, &status);
212 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
213 finish_ops_.RecvMessage(response);
214 finish_ops_.AllowNoMessage();
217 ops.SendInitialMetadata(context->send_initial_metadata_,
218 context->initial_metadata_flags());
227 ops.RecvInitialMetadata(context_);
235 if (!ops.SendMessage(msg, options).ok()) {
239 return cq_.Pluck(&ops);
244 ops.ClientSendClose();
246 return cq_.Pluck(&ops);
252 if (!context_->initial_metadata_received_) {
253 finish_ops_.RecvInitialMetadata(context_);
255 finish_ops_.ClientRecvStatus(context_, &status);
271 template <
class W,
class R>
289 template <
class W,
class R>
295 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
297 ops.SendInitialMetadata(context->send_initial_metadata_,
298 context->initial_metadata_flags());
307 ops.RecvInitialMetadata(context_);
319 if (!context_->initial_metadata_received_) {
320 ops.RecvInitialMetadata(context_);
322 ops.RecvMessage(msg);
324 return cq_.Pluck(&ops) && ops.got_message;
330 if (!ops.SendMessage(msg, options).ok())
return false;
332 return cq_.Pluck(&ops);
337 ops.ClientSendClose();
339 return cq_.Pluck(&ops);
344 if (!context_->initial_metadata_received_) {
345 ops.RecvInitialMetadata(context_);
348 ops.ClientRecvStatus(context_, &status);
374 ops.SendInitialMetadata(ctx_->initial_metadata_,
375 ctx_->initial_metadata_flags());
379 ctx_->sent_initial_metadata_ =
true;
381 call_->
cq()->Pluck(&ops);
391 ops.RecvMessage(msg);
393 return call_->
cq()->Pluck(&ops) && ops.got_message;
415 ops.SendInitialMetadata(ctx_->initial_metadata_,
416 ctx_->initial_metadata_flags());
420 ctx_->sent_initial_metadata_ =
true;
422 call_->
cq()->Pluck(&ops);
428 if (!ops.SendMessage(msg, options).ok()) {
431 if (!ctx_->sent_initial_metadata_) {
432 ops.SendInitialMetadata(ctx_->initial_metadata_,
433 ctx_->initial_metadata_flags());
437 ctx_->sent_initial_metadata_ =
true;
440 return call_->
cq()->Pluck(&ops);
449 template <
class W,
class R>
456 template <
class W,
class R>
457 class ServerReaderWriterBody final {
460 : call_(call), ctx_(ctx) {}
466 ops.SendInitialMetadata(ctx_->initial_metadata_,
467 ctx_->initial_metadata_flags());
471 ctx_->sent_initial_metadata_ =
true;
473 call_->
cq()->Pluck(&ops);
483 ops.RecvMessage(msg);
485 return call_->
cq()->Pluck(&ops) && ops.got_message;
490 if (!ops.SendMessage(msg, options).ok()) {
493 if (!ctx_->sent_initial_metadata_) {
494 ops.SendInitialMetadata(ctx_->initial_metadata_,
495 ctx_->initial_metadata_flags());
499 ctx_->sent_initial_metadata_ =
true;
502 return call_->
cq()->Pluck(&ops);
512 template <
class W,
class R>
520 return body_.NextMessageSize(sz);
523 bool Read(R* msg)
override {
return body_.Read(msg); }
527 return body_.Write(msg, options);
543 template <
class RequestType,
class ResponseType>
548 : body_(call, ctx), read_done_(false), write_done_(false) {}
556 bool Read(RequestType* request)
override {
561 return body_.
Read(request);
565 bool Write(
const ResponseType& response,
567 if (write_done_ || !read_done_) {
571 return body_.
Write(response, options);
585 template <
class RequestType,
class ResponseType>
590 : body_(call, ctx), read_done_(false) {}
598 bool Read(RequestType* request)
override {
603 return body_.
Read(request);
607 bool Write(
const ResponseType& response,
609 return read_done_ && body_.
Write(response, options);
619 #endif // GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
Definition: channel_interface.h:49
Client-side interface for streaming writes of message of type W.
Definition: sync_stream.h:194
grpc_compression_level compression_level() const
Definition: server_context.h:132
bool Write(const W &msg, const WriteOptions &options) override
Blocking write msg to the stream with options.
Definition: sync_stream.h:233
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:523
virtual void WaitForInitialMetadata()=0
Blocking wait for initial metadata from server.
Common interface for all synchronous server side streaming.
Definition: sync_stream.h:67
Server-side interface for bi-directional streaming.
Definition: sync_stream.h:450
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:586
ServerReaderWriterBody(Call *call, ServerContext *ctx)
Definition: sync_stream.h:459
bool NextMessageSize(uint32_t *sz)
Definition: sync_stream.h:476
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:312
Client-side interface for streaming reads of message of type R.
Definition: sync_stream.h:123
Definition: completion_queue.h:73
bool Write(const W &msg, const WriteOptions &options) override
Blocking write msg to the stream with options.
Definition: sync_stream.h:426
virtual ~ServerStreamingInterface()
Definition: sync_stream.h:69
An interface that yields a sequence of messages of type R.
Definition: sync_stream.h:77
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:121
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:544
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:167
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:370
virtual ~ReaderInterface()
Definition: sync_stream.h:79
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:592
bool Read(R *msg)
Definition: sync_stream.h:481
Server-side interface for streaming reads of message of type R.
Definition: sync_stream.h:362
Status Finish() override
Wait until the stream finishes, and return the final status.
Definition: sync_stream.h:177
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:389
bool WritesDone() override
Half close writing from the client.
Definition: sync_stream.h:242
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:317
Definition: client_context.h:154
Definition: completion_queue.h:68
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:162
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:594
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:517
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:519
void SendInitialMetadata()
Definition: sync_stream.h:462
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:411
ServerReader(Call *call, ServerContext *ctx)
Definition: sync_stream.h:368
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:552
bool Read(RequestType *request) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:556
virtual ~ClientStreamingInterface()
Definition: sync_stream.h:51
Server-side interface for streaming writes of message of type W.
Definition: sync_stream.h:403
virtual bool NextMessageSize(uint32_t *sz)=0
Upper bound on the next message size available for reading on this stream.
ClientReader(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const W &request)
Blocking create a stream and write the first request out.
Definition: sync_stream.h:138
CompletionQueue * cq() const
Definition: call.h:638
int max_receive_message_size() const
Definition: call.h:640
bool Write(const ResponseType &response, const WriteOptions &options) override
Blocking write msg to the stream with options.
Definition: sync_stream.h:607
virtual bool WritesDone()=0
Half close writing from the client.
Codegen interface for grpc::Channel.
Definition: channel_interface.h:64
virtual void SendInitialMetadata()=0
Blocking send initial metadata to client.
Primary implementaiton of CallOpSetInterface.
Definition: call.h:565
Status Finish() override
Read the final response and wait for the final status.
Definition: sync_stream.h:250
Definition: server_context.h:95
ServerSplitStreamer(Call *call, ServerContext *ctx)
Definition: sync_stream.h:589
Per-message write options.
Definition: call.h:85
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:550
ClientReaderWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context)
Blocking create a stream.
Definition: sync_stream.h:293
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:384
virtual void WaitForInitialMetadata()=0
Blocking wait for initial metadata from server.
virtual bool WritesDone()=0
Block until currently-pending writes are completed.
bool Write(const W &msg)
Blocking write msg to the stream with default options.
Definition: sync_stream.h:118
Definition: completion_queue.h:70
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue.h:101
ServerUnaryStreamer(Call *call, ServerContext *ctx)
Definition: sync_stream.h:547
bool WritesDone() override
Block until currently-pending writes are completed.
Definition: sync_stream.h:335
Definition: rpc_method.h:43
bool Write(const ResponseType &response, const WriteOptions &options) override
Blocking write msg to the stream with options.
Definition: sync_stream.h:565
Status Finish() override
Wait until the stream finishes, and return the final status.
Definition: sync_stream.h:342
virtual Status Finish()=0
Wait until the stream finishes, and return the final status.
bool Write(const W &msg, const WriteOptions &options)
Definition: sync_stream.h:488
void WaitForInitialMetadata() override
Blocking wait for initial metadata from server.
Definition: sync_stream.h:303
ServerReaderWriter(Call *call, ServerContext *ctx)
Definition: sync_stream.h:515
Definition: sync_stream.h:513
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:633
An interface that can be fed a sequence of messages of type W.
Definition: sync_stream.h:99
Client-side interface for bi-directional streaming.
Definition: sync_stream.h:272
Did it work? If it didn't, why?
Definition: status.h:45
void WaitForInitialMetadata() override
Blocking wait for initial metadata from server.
Definition: sync_stream.h:153
virtual bool Read(R *msg)=0
Blocking read a message and parse to msg.
Common interface for all synchronous client side streaming.
Definition: sync_stream.h:49
ServerWriter(Call *call, ServerContext *ctx)
Definition: sync_stream.h:409
bool Read(RequestType *request) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:598
void WaitForInitialMetadata()
Definition: sync_stream.h:223
virtual bool Write(const W &msg, const WriteOptions &options)=0
Blocking write msg to the stream with options.
Definition: channel_interface.h:53
bool Write(const W &msg, const WriteOptions &options) override
Blocking write msg to the stream with options.
Definition: sync_stream.h:328
bool compression_level_set() const
Definition: server_context.h:141
bool Write(const W &msg, const WriteOptions &options) override
Blocking write msg to the stream with options.
Definition: sync_stream.h:526
Definition: channel_interface.h:51
ClientWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, R *response)
Blocking create a stream.
Definition: sync_stream.h:210
virtual ~WriterInterface()
Definition: sync_stream.h:101