GRPC C++  1.3.0
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sync_stream.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015, Google Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above
13  * copyright notice, this list of conditions and the following disclaimer
14  * in the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of Google Inc. nor the names of its
17  * contributors may be used to endorse or promote products derived from
18  * this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  */
33 
34 #ifndef GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
35 #define GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
36 
45 
46 namespace grpc {
47 
50  public:
52 
63  virtual Status Finish() = 0;
64 };
65 
68  public:
70 
72  virtual void SendInitialMetadata() = 0;
73 };
74 
76 template <class R>
78  public:
79  virtual ~ReaderInterface() {}
80 
82  virtual bool NextMessageSize(uint32_t* sz) = 0;
83 
94  virtual bool Read(R* msg) = 0;
95 };
96 
98 template <class W>
100  public:
101  virtual ~WriterInterface() {}
102 
110  virtual bool Write(const W& msg, WriteOptions options) = 0;
111 
118  inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
119 
134  void WriteLast(const W& msg, WriteOptions options) {
135  Write(msg, options.set_last_message());
136  }
137 };
138 
140 template <class R>
142  public ReaderInterface<R> {
143  public:
148  virtual void WaitForInitialMetadata() = 0;
149 };
150 
151 template <class R>
152 class ClientReader final : public ClientReaderInterface<R> {
153  public:
155  template <class W>
156  ClientReader(ChannelInterface* channel, const RpcMethod& method,
157  ClientContext* context, const W& request)
158  : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
161  ops;
162  ops.SendInitialMetadata(context->send_initial_metadata_,
163  context->initial_metadata_flags());
164  // TODO(ctiller): don't assert
165  GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
166  ops.ClientSendClose();
167  call_.PerformOps(&ops);
168  cq_.Pluck(&ops);
169  }
170 
171  void WaitForInitialMetadata() override {
172  GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
173 
175  ops.RecvInitialMetadata(context_);
176  call_.PerformOps(&ops);
177  cq_.Pluck(&ops);
178  }
179 
180  bool NextMessageSize(uint32_t* sz) override {
181  *sz = call_.max_receive_message_size();
182  return true;
183  }
184 
185  bool Read(R* msg) override {
187  if (!context_->initial_metadata_received_) {
188  ops.RecvInitialMetadata(context_);
189  }
190  ops.RecvMessage(msg);
191  call_.PerformOps(&ops);
192  return cq_.Pluck(&ops) && ops.got_message;
193  }
194 
195  Status Finish() override {
197  Status status;
198  ops.ClientRecvStatus(context_, &status);
199  call_.PerformOps(&ops);
200  GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
201  return status;
202  }
203 
204  private:
205  ClientContext* context_;
206  CompletionQueue cq_;
207  Call call_;
208 };
209 
211 template <class W>
213  public WriterInterface<W> {
214  public:
220  virtual bool WritesDone() = 0;
221 };
222 
223 template <class W>
224 class ClientWriter : public ClientWriterInterface<W> {
225  public:
227  template <class R>
228  ClientWriter(ChannelInterface* channel, const RpcMethod& method,
229  ClientContext* context, R* response)
230  : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
231  finish_ops_.RecvMessage(response);
232  finish_ops_.AllowNoMessage();
233 
234  if (!context_->initial_metadata_corked_) {
236  ops.SendInitialMetadata(context->send_initial_metadata_,
237  context->initial_metadata_flags());
238  call_.PerformOps(&ops);
239  cq_.Pluck(&ops);
240  }
241  }
242 
244  GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
245 
247  ops.RecvInitialMetadata(context_);
248  call_.PerformOps(&ops);
249  cq_.Pluck(&ops); // status ignored
250  }
251 
253  bool Write(const W& msg, WriteOptions options) override {
256  ops;
257 
258  if (options.is_last_message()) {
259  options.set_buffer_hint();
260  ops.ClientSendClose();
261  }
262  if (context_->initial_metadata_corked_) {
263  ops.SendInitialMetadata(context_->send_initial_metadata_,
264  context_->initial_metadata_flags());
265  context_->set_initial_metadata_corked(false);
266  }
267  if (!ops.SendMessage(msg, options).ok()) {
268  return false;
269  }
270 
271  call_.PerformOps(&ops);
272  return cq_.Pluck(&ops);
273  }
274 
275  bool WritesDone() override {
277  ops.ClientSendClose();
278  call_.PerformOps(&ops);
279  return cq_.Pluck(&ops);
280  }
281 
283  Status Finish() override {
284  Status status;
285  if (!context_->initial_metadata_received_) {
286  finish_ops_.RecvInitialMetadata(context_);
287  }
288  finish_ops_.ClientRecvStatus(context_, &status);
289  call_.PerformOps(&finish_ops_);
290  GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_));
291  return status;
292  }
293 
294  private:
295  ClientContext* context_;
298  finish_ops_;
299  CompletionQueue cq_;
300  Call call_;
301 };
302 
304 template <class W, class R>
306  public WriterInterface<W>,
307  public ReaderInterface<R> {
308  public:
313  virtual void WaitForInitialMetadata() = 0;
314 
319  virtual bool WritesDone() = 0;
320 };
321 
322 template <class W, class R>
323 class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
324  public:
327  ClientContext* context)
328  : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
329  if (!context_->initial_metadata_corked_) {
331  ops.SendInitialMetadata(context->send_initial_metadata_,
332  context->initial_metadata_flags());
333  call_.PerformOps(&ops);
334  cq_.Pluck(&ops);
335  }
336  }
337 
338  void WaitForInitialMetadata() override {
339  GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
340 
342  ops.RecvInitialMetadata(context_);
343  call_.PerformOps(&ops);
344  cq_.Pluck(&ops); // status ignored
345  }
346 
347  bool NextMessageSize(uint32_t* sz) override {
348  *sz = call_.max_receive_message_size();
349  return true;
350  }
351 
352  bool Read(R* msg) override {
354  if (!context_->initial_metadata_received_) {
355  ops.RecvInitialMetadata(context_);
356  }
357  ops.RecvMessage(msg);
358  call_.PerformOps(&ops);
359  return cq_.Pluck(&ops) && ops.got_message;
360  }
361 
363  bool Write(const W& msg, WriteOptions options) override {
366  ops;
367 
368  if (options.is_last_message()) {
369  options.set_buffer_hint();
370  ops.ClientSendClose();
371  }
372  if (context_->initial_metadata_corked_) {
373  ops.SendInitialMetadata(context_->send_initial_metadata_,
374  context_->initial_metadata_flags());
375  context_->set_initial_metadata_corked(false);
376  }
377  if (!ops.SendMessage(msg, options).ok()) {
378  return false;
379  }
380 
381  call_.PerformOps(&ops);
382  return cq_.Pluck(&ops);
383  }
384 
385  bool WritesDone() override {
387  ops.ClientSendClose();
388  call_.PerformOps(&ops);
389  return cq_.Pluck(&ops);
390  }
391 
392  Status Finish() override {
394  if (!context_->initial_metadata_received_) {
395  ops.RecvInitialMetadata(context_);
396  }
397  Status status;
398  ops.ClientRecvStatus(context_, &status);
399  call_.PerformOps(&ops);
400  GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
401  return status;
402  }
403 
404  private:
405  ClientContext* context_;
406  CompletionQueue cq_;
407  Call call_;
408 };
409 
411 template <class R>
413  public ReaderInterface<R> {};
414 
415 template <class R>
416 class ServerReader final : public ServerReaderInterface<R> {
417  public:
418  ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
419 
420  void SendInitialMetadata() override {
421  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
422 
424  ops.SendInitialMetadata(ctx_->initial_metadata_,
425  ctx_->initial_metadata_flags());
426  if (ctx_->compression_level_set()) {
427  ops.set_compression_level(ctx_->compression_level());
428  }
429  ctx_->sent_initial_metadata_ = true;
430  call_->PerformOps(&ops);
431  call_->cq()->Pluck(&ops);
432  }
433 
434  bool NextMessageSize(uint32_t* sz) override {
435  *sz = call_->max_receive_message_size();
436  return true;
437  }
438 
439  bool Read(R* msg) override {
441  ops.RecvMessage(msg);
442  call_->PerformOps(&ops);
443  return call_->cq()->Pluck(&ops) && ops.got_message;
444  }
445 
446  private:
447  Call* const call_;
448  ServerContext* const ctx_;
449 };
450 
452 template <class W>
454  public WriterInterface<W> {};
455 
456 template <class W>
457 class ServerWriter final : public ServerWriterInterface<W> {
458  public:
459  ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
460 
461  void SendInitialMetadata() override {
462  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
463 
465  ops.SendInitialMetadata(ctx_->initial_metadata_,
466  ctx_->initial_metadata_flags());
467  if (ctx_->compression_level_set()) {
468  ops.set_compression_level(ctx_->compression_level());
469  }
470  ctx_->sent_initial_metadata_ = true;
471  call_->PerformOps(&ops);
472  call_->cq()->Pluck(&ops);
473  }
474 
476  bool Write(const W& msg, WriteOptions options) override {
477  if (options.is_last_message()) {
478  options.set_buffer_hint();
479  }
481  if (!ops.SendMessage(msg, options).ok()) {
482  return false;
483  }
484  if (!ctx_->sent_initial_metadata_) {
485  ops.SendInitialMetadata(ctx_->initial_metadata_,
486  ctx_->initial_metadata_flags());
487  if (ctx_->compression_level_set()) {
488  ops.set_compression_level(ctx_->compression_level());
489  }
490  ctx_->sent_initial_metadata_ = true;
491  }
492  call_->PerformOps(&ops);
493  return call_->cq()->Pluck(&ops);
494  }
495 
496  private:
497  Call* const call_;
498  ServerContext* const ctx_;
499 };
500 
502 template <class W, class R>
504  public WriterInterface<W>,
505  public ReaderInterface<R> {};
506 
507 // Actual implementation of bi-directional streaming
508 namespace internal {
509 template <class W, class R>
510 class ServerReaderWriterBody final {
511  public:
513  : call_(call), ctx_(ctx) {}
514 
516  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
517 
519  ops.SendInitialMetadata(ctx_->initial_metadata_,
520  ctx_->initial_metadata_flags());
521  if (ctx_->compression_level_set()) {
522  ops.set_compression_level(ctx_->compression_level());
523  }
524  ctx_->sent_initial_metadata_ = true;
525  call_->PerformOps(&ops);
526  call_->cq()->Pluck(&ops);
527  }
528 
529  bool NextMessageSize(uint32_t* sz) {
530  *sz = call_->max_receive_message_size();
531  return true;
532  }
533 
534  bool Read(R* msg) {
536  ops.RecvMessage(msg);
537  call_->PerformOps(&ops);
538  return call_->cq()->Pluck(&ops) && ops.got_message;
539  }
540 
541  bool Write(const W& msg, WriteOptions options) {
542  if (options.is_last_message()) {
543  options.set_buffer_hint();
544  }
546  if (!ops.SendMessage(msg, options).ok()) {
547  return false;
548  }
549  if (!ctx_->sent_initial_metadata_) {
550  ops.SendInitialMetadata(ctx_->initial_metadata_,
551  ctx_->initial_metadata_flags());
552  if (ctx_->compression_level_set()) {
553  ops.set_compression_level(ctx_->compression_level());
554  }
555  ctx_->sent_initial_metadata_ = true;
556  }
557  call_->PerformOps(&ops);
558  return call_->cq()->Pluck(&ops);
559  }
560 
561  private:
562  Call* const call_;
563  ServerContext* const ctx_;
564 };
565 }
566 
567 // class to represent the user API for a bidirectional streaming call
568 template <class W, class R>
570  public:
571  ServerReaderWriter(Call* call, ServerContext* ctx) : body_(call, ctx) {}
572 
573  void SendInitialMetadata() override { body_.SendInitialMetadata(); }
574 
575  bool NextMessageSize(uint32_t* sz) override {
576  return body_.NextMessageSize(sz);
577  }
578 
579  bool Read(R* msg) override { return body_.Read(msg); }
580 
582  bool Write(const W& msg, WriteOptions options) override {
583  return body_.Write(msg, options);
584  }
585 
586  private:
588 };
589 
599 template <class RequestType, class ResponseType>
601  : public ServerReaderWriterInterface<ResponseType, RequestType> {
602  public:
604  : body_(call, ctx), read_done_(false), write_done_(false) {}
605 
606  void SendInitialMetadata() override { body_.SendInitialMetadata(); }
607 
608  bool NextMessageSize(uint32_t* sz) override {
609  return body_.NextMessageSize(sz);
610  }
611 
612  bool Read(RequestType* request) override {
613  if (read_done_) {
614  return false;
615  }
616  read_done_ = true;
617  return body_.Read(request);
618  }
619 
621  bool Write(const ResponseType& response, WriteOptions options) override {
622  if (write_done_ || !read_done_) {
623  return false;
624  }
625  write_done_ = true;
626  return body_.Write(response, options);
627  }
628 
629  private:
631  bool read_done_;
632  bool write_done_;
633 };
634 
640 template <class RequestType, class ResponseType>
642  : public ServerReaderWriterInterface<ResponseType, RequestType> {
643  public:
645  : body_(call, ctx), read_done_(false) {}
646 
647  void SendInitialMetadata() override { body_.SendInitialMetadata(); }
648 
649  bool NextMessageSize(uint32_t* sz) override {
650  return body_.NextMessageSize(sz);
651  }
652 
653  bool Read(RequestType* request) override {
654  if (read_done_) {
655  return false;
656  }
657  read_done_ = true;
658  return body_.Read(request);
659  }
660 
662  bool Write(const ResponseType& response, WriteOptions options) override {
663  return read_done_ && body_.Write(response, options);
664  }
665 
666  private:
668  bool read_done_;
669 };
670 
671 } // namespace grpc
672 
673 #endif // GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
Definition: channel_interface.h:49
Client-side interface for streaming writes of message of type W.
Definition: sync_stream.h:212
grpc_compression_level compression_level() const
Definition: server_context.h:131
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call.h:135
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:579
virtual void WaitForInitialMetadata()=0
Blocking wait for initial metadata from server.
Common interface for all synchronous server side streaming.
Definition: sync_stream.h:67
bool Write(const W &msg, WriteOptions options)
Definition: sync_stream.h:541
Server-side interface for bi-directional streaming.
Definition: sync_stream.h:503
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:641
ServerReaderWriterBody(Call *call, ServerContext *ctx)
Definition: sync_stream.h:512
bool NextMessageSize(uint32_t *sz)
Definition: sync_stream.h:529
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:347
Client-side interface for streaming reads of message of type R.
Definition: sync_stream.h:141
Definition: completion_queue.h:73
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call.h:174
virtual ~ServerStreamingInterface()
Definition: sync_stream.h:69
An interface that yields a sequence of messages of type R.
Definition: sync_stream.h:77
bool Write(const ResponseType &response, WriteOptions options) override
Blocking write msg to the stream with WriteOptions options.
Definition: sync_stream.h:621
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:122
Definition: call.h:536
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:600
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:185
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:420
virtual ~ReaderInterface()
Definition: sync_stream.h:79
void set_initial_metadata_corked(bool corked)
Flag whether the initial metadata should be corked.
Definition: client_context.h:291
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:647
bool Read(R *msg)
Definition: sync_stream.h:534
Server-side interface for streaming reads of message of type R.
Definition: sync_stream.h:412
Status Finish() override
Wait until the stream finishes, and return the final status.
Definition: sync_stream.h:195
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:439
bool Write(const ResponseType &response, WriteOptions options) override
Blocking write msg to the stream with WriteOptions options.
Definition: sync_stream.h:662
bool WritesDone() override
Half close writing from the client.
Definition: sync_stream.h:275
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:352
Definition: client_context.h:154
Definition: completion_queue.h:68
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:180
bool Write(const W &msg, WriteOptions options) override
Blocking write msg to the stream with WriteOptions options.
Definition: sync_stream.h:363
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:649
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:573
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:575
void SendInitialMetadata()
Definition: sync_stream.h:515
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:461
Definition: call.h:268
ServerReader(Call *call, ServerContext *ctx)
Definition: sync_stream.h:418
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:608
bool Read(RequestType *request) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:612
bool Write(const W &msg, WriteOptions options) override
Blocking write msg to the stream with WriteOptions options.
Definition: sync_stream.h:582
Definition: call.h:440
virtual ~ClientStreamingInterface()
Definition: sync_stream.h:51
Server-side interface for streaming writes of message of type W.
Definition: sync_stream.h:453
virtual bool NextMessageSize(uint32_t *sz)=0
Upper bound on the next message size available for reading on this stream.
ClientReader(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const W &request)
Blocking create a stream and write the first request out.
Definition: sync_stream.h:156
CompletionQueue * cq() const
Definition: call.h:696
int max_receive_message_size() const
Definition: call.h:698
bool Write(const W &msg, WriteOptions options) override
Blocking write msg to the stream with WriteOptions options.
Definition: sync_stream.h:476
Definition: call.h:675
virtual bool WritesDone()=0
Half close writing from the client.
Codegen interface for grpc::Channel.
Definition: channel_interface.h:64
virtual bool Write(const W &msg, WriteOptions options)=0
Blocking write msg to the stream with WriteOptions options.
virtual void SendInitialMetadata()=0
Blocking send initial metadata to client.
Primary implementaiton of CallOpSetInterface.
Definition: call.h:623
Status Finish() override
Read the final response and wait for the final status.
Definition: sync_stream.h:283
Definition: server_context.h:94
ServerSplitStreamer(Call *call, ServerContext *ctx)
Definition: sync_stream.h:644
Per-message write options.
Definition: call.h:95
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:606
ClientReaderWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context)
Blocking create a stream.
Definition: sync_stream.h:326
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:434
virtual void WaitForInitialMetadata()=0
Blocking wait for initial metadata from server.
virtual bool WritesDone()=0
Block until currently-pending writes are completed.
bool Write(const W &msg)
Blocking write msg to the stream with default write options.
Definition: sync_stream.h:118
Definition: completion_queue.h:70
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue.h:101
ServerUnaryStreamer(Call *call, ServerContext *ctx)
Definition: sync_stream.h:603
bool WritesDone() override
Block until currently-pending writes are completed.
Definition: sync_stream.h:385
Definition: rpc_method.h:43
Status Finish() override
Wait until the stream finishes, and return the final status.
Definition: sync_stream.h:392
virtual Status Finish()=0
Wait until the stream finishes, and return the final status.
void WaitForInitialMetadata() override
Blocking wait for initial metadata from server.
Definition: sync_stream.h:338
ServerReaderWriter(Call *call, ServerContext *ctx)
Definition: sync_stream.h:571
Definition: sync_stream.h:569
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:691
An interface that can be fed a sequence of messages of type W.
Definition: sync_stream.h:99
Client-side interface for bi-directional streaming.
Definition: sync_stream.h:305
Did it work? If it didn't, why?
Definition: status.h:45
void WaitForInitialMetadata() override
Blocking wait for initial metadata from server.
Definition: sync_stream.h:171
virtual bool Read(R *msg)=0
Blocking read a message and parse to msg.
Common interface for all synchronous client side streaming.
Definition: sync_stream.h:49
ServerWriter(Call *call, ServerContext *ctx)
Definition: sync_stream.h:459
Definition: call.h:217
bool Read(RequestType *request) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:653
void WriteLast(const W &msg, WriteOptions options)
Write msg and coalesce it with the writing of trailing metadata, using WriteOptions options...
Definition: sync_stream.h:134
void WaitForInitialMetadata()
Definition: sync_stream.h:243
Definition: channel_interface.h:53
bool compression_level_set() const
Definition: server_context.h:140
Definition: channel_interface.h:51
Definition: call.h:507
ClientWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, R *response)
Blocking create a stream.
Definition: sync_stream.h:228
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call.h:190
virtual ~WriterInterface()
Definition: sync_stream.h:101
Definition: call.h:386
bool Write(const W &msg, WriteOptions options) override
Blocking write msg to the stream with WriteOptions options.
Definition: sync_stream.h:253