GRPC C++  1.2.0
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sync_stream.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015, Google Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above
13  * copyright notice, this list of conditions and the following disclaimer
14  * in the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of Google Inc. nor the names of its
17  * contributors may be used to endorse or promote products derived from
18  * this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  */
33 
34 #ifndef GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
35 #define GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
36 
45 
46 namespace grpc {
47 
50  public:
52 
63  virtual Status Finish() = 0;
64 };
65 
68  public:
70 
72  virtual void SendInitialMetadata() = 0;
73 };
74 
76 template <class R>
78  public:
79  virtual ~ReaderInterface() {}
80 
82  virtual bool NextMessageSize(uint32_t* sz) = 0;
83 
94  virtual bool Read(R* msg) = 0;
95 };
96 
98 template <class W>
100  public:
101  virtual ~WriterInterface() {}
102 
110  virtual bool Write(const W& msg, const WriteOptions& options) = 0;
111 
118  inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
119 };
120 
122 template <class R>
124  public ReaderInterface<R> {
125  public:
130  virtual void WaitForInitialMetadata() = 0;
131 };
132 
133 template <class R>
134 class ClientReader final : public ClientReaderInterface<R> {
135  public:
137  template <class W>
138  ClientReader(ChannelInterface* channel, const RpcMethod& method,
139  ClientContext* context, const W& request)
140  : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
143  ops;
144  ops.SendInitialMetadata(context->send_initial_metadata_,
145  context->initial_metadata_flags());
146  // TODO(ctiller): don't assert
147  GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
148  ops.ClientSendClose();
149  call_.PerformOps(&ops);
150  cq_.Pluck(&ops);
151  }
152 
153  void WaitForInitialMetadata() override {
154  GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
155 
157  ops.RecvInitialMetadata(context_);
158  call_.PerformOps(&ops);
159  cq_.Pluck(&ops);
160  }
161 
162  bool NextMessageSize(uint32_t* sz) override {
163  *sz = call_.max_receive_message_size();
164  return true;
165  }
166 
167  bool Read(R* msg) override {
169  if (!context_->initial_metadata_received_) {
170  ops.RecvInitialMetadata(context_);
171  }
172  ops.RecvMessage(msg);
173  call_.PerformOps(&ops);
174  return cq_.Pluck(&ops) && ops.got_message;
175  }
176 
177  Status Finish() override {
179  Status status;
180  ops.ClientRecvStatus(context_, &status);
181  call_.PerformOps(&ops);
182  GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
183  return status;
184  }
185 
186  private:
187  ClientContext* context_;
188  CompletionQueue cq_;
189  Call call_;
190 };
191 
193 template <class W>
195  public WriterInterface<W> {
196  public:
202  virtual bool WritesDone() = 0;
203 };
204 
205 template <class W>
206 class ClientWriter : public ClientWriterInterface<W> {
207  public:
209  template <class R>
210  ClientWriter(ChannelInterface* channel, const RpcMethod& method,
211  ClientContext* context, R* response)
212  : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
213  finish_ops_.RecvMessage(response);
214  finish_ops_.AllowNoMessage();
215 
217  ops.SendInitialMetadata(context->send_initial_metadata_,
218  context->initial_metadata_flags());
219  call_.PerformOps(&ops);
220  cq_.Pluck(&ops);
221  }
222 
224  GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
225 
227  ops.RecvInitialMetadata(context_);
228  call_.PerformOps(&ops);
229  cq_.Pluck(&ops); // status ignored
230  }
231 
233  bool Write(const W& msg, const WriteOptions& options) override {
235  if (!ops.SendMessage(msg, options).ok()) {
236  return false;
237  }
238  call_.PerformOps(&ops);
239  return cq_.Pluck(&ops);
240  }
241 
242  bool WritesDone() override {
244  ops.ClientSendClose();
245  call_.PerformOps(&ops);
246  return cq_.Pluck(&ops);
247  }
248 
250  Status Finish() override {
251  Status status;
252  if (!context_->initial_metadata_received_) {
253  finish_ops_.RecvInitialMetadata(context_);
254  }
255  finish_ops_.ClientRecvStatus(context_, &status);
256  call_.PerformOps(&finish_ops_);
257  GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_));
258  return status;
259  }
260 
261  private:
262  ClientContext* context_;
265  finish_ops_;
266  CompletionQueue cq_;
267  Call call_;
268 };
269 
271 template <class W, class R>
273  public WriterInterface<W>,
274  public ReaderInterface<R> {
275  public:
280  virtual void WaitForInitialMetadata() = 0;
281 
286  virtual bool WritesDone() = 0;
287 };
288 
289 template <class W, class R>
290 class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
291  public:
294  ClientContext* context)
295  : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
297  ops.SendInitialMetadata(context->send_initial_metadata_,
298  context->initial_metadata_flags());
299  call_.PerformOps(&ops);
300  cq_.Pluck(&ops);
301  }
302 
303  void WaitForInitialMetadata() override {
304  GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
305 
307  ops.RecvInitialMetadata(context_);
308  call_.PerformOps(&ops);
309  cq_.Pluck(&ops); // status ignored
310  }
311 
312  bool NextMessageSize(uint32_t* sz) override {
313  *sz = call_.max_receive_message_size();
314  return true;
315  }
316 
317  bool Read(R* msg) override {
319  if (!context_->initial_metadata_received_) {
320  ops.RecvInitialMetadata(context_);
321  }
322  ops.RecvMessage(msg);
323  call_.PerformOps(&ops);
324  return cq_.Pluck(&ops) && ops.got_message;
325  }
326 
328  bool Write(const W& msg, const WriteOptions& options) override {
330  if (!ops.SendMessage(msg, options).ok()) return false;
331  call_.PerformOps(&ops);
332  return cq_.Pluck(&ops);
333  }
334 
335  bool WritesDone() override {
337  ops.ClientSendClose();
338  call_.PerformOps(&ops);
339  return cq_.Pluck(&ops);
340  }
341 
342  Status Finish() override {
344  if (!context_->initial_metadata_received_) {
345  ops.RecvInitialMetadata(context_);
346  }
347  Status status;
348  ops.ClientRecvStatus(context_, &status);
349  call_.PerformOps(&ops);
350  GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
351  return status;
352  }
353 
354  private:
355  ClientContext* context_;
356  CompletionQueue cq_;
357  Call call_;
358 };
359 
361 template <class R>
363  public ReaderInterface<R> {};
364 
365 template <class R>
366 class ServerReader final : public ServerReaderInterface<R> {
367  public:
368  ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
369 
370  void SendInitialMetadata() override {
371  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
372 
374  ops.SendInitialMetadata(ctx_->initial_metadata_,
375  ctx_->initial_metadata_flags());
376  if (ctx_->compression_level_set()) {
377  ops.set_compression_level(ctx_->compression_level());
378  }
379  ctx_->sent_initial_metadata_ = true;
380  call_->PerformOps(&ops);
381  call_->cq()->Pluck(&ops);
382  }
383 
384  bool NextMessageSize(uint32_t* sz) override {
385  *sz = call_->max_receive_message_size();
386  return true;
387  }
388 
389  bool Read(R* msg) override {
391  ops.RecvMessage(msg);
392  call_->PerformOps(&ops);
393  return call_->cq()->Pluck(&ops) && ops.got_message;
394  }
395 
396  private:
397  Call* const call_;
398  ServerContext* const ctx_;
399 };
400 
402 template <class W>
404  public WriterInterface<W> {};
405 
406 template <class W>
407 class ServerWriter final : public ServerWriterInterface<W> {
408  public:
409  ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
410 
411  void SendInitialMetadata() override {
412  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
413 
415  ops.SendInitialMetadata(ctx_->initial_metadata_,
416  ctx_->initial_metadata_flags());
417  if (ctx_->compression_level_set()) {
418  ops.set_compression_level(ctx_->compression_level());
419  }
420  ctx_->sent_initial_metadata_ = true;
421  call_->PerformOps(&ops);
422  call_->cq()->Pluck(&ops);
423  }
424 
426  bool Write(const W& msg, const WriteOptions& options) override {
428  if (!ops.SendMessage(msg, options).ok()) {
429  return false;
430  }
431  if (!ctx_->sent_initial_metadata_) {
432  ops.SendInitialMetadata(ctx_->initial_metadata_,
433  ctx_->initial_metadata_flags());
434  if (ctx_->compression_level_set()) {
435  ops.set_compression_level(ctx_->compression_level());
436  }
437  ctx_->sent_initial_metadata_ = true;
438  }
439  call_->PerformOps(&ops);
440  return call_->cq()->Pluck(&ops);
441  }
442 
443  private:
444  Call* const call_;
445  ServerContext* const ctx_;
446 };
447 
449 template <class W, class R>
451  public WriterInterface<W>,
452  public ReaderInterface<R> {};
453 
454 // Actual implementation of bi-directional streaming
455 namespace internal {
456 template <class W, class R>
457 class ServerReaderWriterBody final {
458  public:
460  : call_(call), ctx_(ctx) {}
461 
463  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
464 
466  ops.SendInitialMetadata(ctx_->initial_metadata_,
467  ctx_->initial_metadata_flags());
468  if (ctx_->compression_level_set()) {
469  ops.set_compression_level(ctx_->compression_level());
470  }
471  ctx_->sent_initial_metadata_ = true;
472  call_->PerformOps(&ops);
473  call_->cq()->Pluck(&ops);
474  }
475 
476  bool NextMessageSize(uint32_t* sz) {
477  *sz = call_->max_receive_message_size();
478  return true;
479  }
480 
481  bool Read(R* msg) {
483  ops.RecvMessage(msg);
484  call_->PerformOps(&ops);
485  return call_->cq()->Pluck(&ops) && ops.got_message;
486  }
487 
488  bool Write(const W& msg, const WriteOptions& options) {
490  if (!ops.SendMessage(msg, options).ok()) {
491  return false;
492  }
493  if (!ctx_->sent_initial_metadata_) {
494  ops.SendInitialMetadata(ctx_->initial_metadata_,
495  ctx_->initial_metadata_flags());
496  if (ctx_->compression_level_set()) {
497  ops.set_compression_level(ctx_->compression_level());
498  }
499  ctx_->sent_initial_metadata_ = true;
500  }
501  call_->PerformOps(&ops);
502  return call_->cq()->Pluck(&ops);
503  }
504 
505  private:
506  Call* const call_;
507  ServerContext* const ctx_;
508 };
509 }
510 
511 // class to represent the user API for a bidirectional streaming call
512 template <class W, class R>
514  public:
515  ServerReaderWriter(Call* call, ServerContext* ctx) : body_(call, ctx) {}
516 
517  void SendInitialMetadata() override { body_.SendInitialMetadata(); }
518 
519  bool NextMessageSize(uint32_t* sz) override {
520  return body_.NextMessageSize(sz);
521  }
522 
523  bool Read(R* msg) override { return body_.Read(msg); }
524 
526  bool Write(const W& msg, const WriteOptions& options) override {
527  return body_.Write(msg, options);
528  }
529 
530  private:
532 };
533 
543 template <class RequestType, class ResponseType>
545  : public ServerReaderWriterInterface<ResponseType, RequestType> {
546  public:
548  : body_(call, ctx), read_done_(false), write_done_(false) {}
549 
550  void SendInitialMetadata() override { body_.SendInitialMetadata(); }
551 
552  bool NextMessageSize(uint32_t* sz) override {
553  return body_.NextMessageSize(sz);
554  }
555 
556  bool Read(RequestType* request) override {
557  if (read_done_) {
558  return false;
559  }
560  read_done_ = true;
561  return body_.Read(request);
562  }
563 
565  bool Write(const ResponseType& response,
566  const WriteOptions& options) override {
567  if (write_done_ || !read_done_) {
568  return false;
569  }
570  write_done_ = true;
571  return body_.Write(response, options);
572  }
573 
574  private:
576  bool read_done_;
577  bool write_done_;
578 };
579 
585 template <class RequestType, class ResponseType>
587  : public ServerReaderWriterInterface<ResponseType, RequestType> {
588  public:
590  : body_(call, ctx), read_done_(false) {}
591 
592  void SendInitialMetadata() override { body_.SendInitialMetadata(); }
593 
594  bool NextMessageSize(uint32_t* sz) override {
595  return body_.NextMessageSize(sz);
596  }
597 
598  bool Read(RequestType* request) override {
599  if (read_done_) {
600  return false;
601  }
602  read_done_ = true;
603  return body_.Read(request);
604  }
605 
607  bool Write(const ResponseType& response,
608  const WriteOptions& options) override {
609  return read_done_ && body_.Write(response, options);
610  }
611 
612  private:
614  bool read_done_;
615 };
616 
617 } // namespace grpc
618 
619 #endif // GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
Definition: channel_interface.h:49
Client-side interface for streaming writes of message of type W.
Definition: sync_stream.h:194
grpc_compression_level compression_level() const
Definition: server_context.h:132
bool Write(const W &msg, const WriteOptions &options) override
Blocking write msg to the stream with options.
Definition: sync_stream.h:233
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:523
virtual void WaitForInitialMetadata()=0
Blocking wait for initial metadata from server.
Common interface for all synchronous server side streaming.
Definition: sync_stream.h:67
Server-side interface for bi-directional streaming.
Definition: sync_stream.h:450
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:586
ServerReaderWriterBody(Call *call, ServerContext *ctx)
Definition: sync_stream.h:459
bool NextMessageSize(uint32_t *sz)
Definition: sync_stream.h:476
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:312
Client-side interface for streaming reads of message of type R.
Definition: sync_stream.h:123
Definition: completion_queue.h:73
bool Write(const W &msg, const WriteOptions &options) override
Blocking write msg to the stream with options.
Definition: sync_stream.h:426
virtual ~ServerStreamingInterface()
Definition: sync_stream.h:69
An interface that yields a sequence of messages of type R.
Definition: sync_stream.h:77
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:121
Definition: call.h:486
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:544
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:167
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:370
virtual ~ReaderInterface()
Definition: sync_stream.h:79
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:592
bool Read(R *msg)
Definition: sync_stream.h:481
Server-side interface for streaming reads of message of type R.
Definition: sync_stream.h:362
Status Finish() override
Wait until the stream finishes, and return the final status.
Definition: sync_stream.h:177
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:389
bool WritesDone() override
Half close writing from the client.
Definition: sync_stream.h:242
bool Read(R *msg) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:317
Definition: client_context.h:154
Definition: completion_queue.h:68
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:162
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:594
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:517
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:519
void SendInitialMetadata()
Definition: sync_stream.h:462
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:411
Definition: call.h:219
ServerReader(Call *call, ServerContext *ctx)
Definition: sync_stream.h:368
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:552
bool Read(RequestType *request) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:556
Definition: call.h:392
virtual ~ClientStreamingInterface()
Definition: sync_stream.h:51
Server-side interface for streaming writes of message of type W.
Definition: sync_stream.h:403
virtual bool NextMessageSize(uint32_t *sz)=0
Upper bound on the next message size available for reading on this stream.
ClientReader(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const W &request)
Blocking create a stream and write the first request out.
Definition: sync_stream.h:138
CompletionQueue * cq() const
Definition: call.h:638
int max_receive_message_size() const
Definition: call.h:640
Definition: call.h:617
bool Write(const ResponseType &response, const WriteOptions &options) override
Blocking write msg to the stream with options.
Definition: sync_stream.h:607
virtual bool WritesDone()=0
Half close writing from the client.
Codegen interface for grpc::Channel.
Definition: channel_interface.h:64
virtual void SendInitialMetadata()=0
Blocking send initial metadata to client.
Primary implementaiton of CallOpSetInterface.
Definition: call.h:565
Status Finish() override
Read the final response and wait for the final status.
Definition: sync_stream.h:250
Definition: server_context.h:95
ServerSplitStreamer(Call *call, ServerContext *ctx)
Definition: sync_stream.h:589
Per-message write options.
Definition: call.h:85
void SendInitialMetadata() override
Blocking send initial metadata to client.
Definition: sync_stream.h:550
ClientReaderWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context)
Blocking create a stream.
Definition: sync_stream.h:293
bool NextMessageSize(uint32_t *sz) override
Upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:384
virtual void WaitForInitialMetadata()=0
Blocking wait for initial metadata from server.
virtual bool WritesDone()=0
Block until currently-pending writes are completed.
bool Write(const W &msg)
Blocking write msg to the stream with default options.
Definition: sync_stream.h:118
Definition: completion_queue.h:70
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue.h:101
ServerUnaryStreamer(Call *call, ServerContext *ctx)
Definition: sync_stream.h:547
bool WritesDone() override
Block until currently-pending writes are completed.
Definition: sync_stream.h:335
Definition: rpc_method.h:43
bool Write(const ResponseType &response, const WriteOptions &options) override
Blocking write msg to the stream with options.
Definition: sync_stream.h:565
Status Finish() override
Wait until the stream finishes, and return the final status.
Definition: sync_stream.h:342
virtual Status Finish()=0
Wait until the stream finishes, and return the final status.
bool Write(const W &msg, const WriteOptions &options)
Definition: sync_stream.h:488
void WaitForInitialMetadata() override
Blocking wait for initial metadata from server.
Definition: sync_stream.h:303
ServerReaderWriter(Call *call, ServerContext *ctx)
Definition: sync_stream.h:515
Definition: sync_stream.h:513
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:633
An interface that can be fed a sequence of messages of type W.
Definition: sync_stream.h:99
Client-side interface for bi-directional streaming.
Definition: sync_stream.h:272
Did it work? If it didn't, why?
Definition: status.h:45
void WaitForInitialMetadata() override
Blocking wait for initial metadata from server.
Definition: sync_stream.h:153
virtual bool Read(R *msg)=0
Blocking read a message and parse to msg.
Common interface for all synchronous client side streaming.
Definition: sync_stream.h:49
ServerWriter(Call *call, ServerContext *ctx)
Definition: sync_stream.h:409
Definition: call.h:168
bool Read(RequestType *request) override
Blocking read a message and parse to msg.
Definition: sync_stream.h:598
void WaitForInitialMetadata()
Definition: sync_stream.h:223
virtual bool Write(const W &msg, const WriteOptions &options)=0
Blocking write msg to the stream with options.
Definition: channel_interface.h:53
bool Write(const W &msg, const WriteOptions &options) override
Blocking write msg to the stream with options.
Definition: sync_stream.h:328
bool compression_level_set() const
Definition: server_context.h:141
bool Write(const W &msg, const WriteOptions &options) override
Blocking write msg to the stream with options.
Definition: sync_stream.h:526
Definition: channel_interface.h:51
Definition: call.h:457
ClientWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, R *response)
Blocking create a stream.
Definition: sync_stream.h:210
virtual ~WriterInterface()
Definition: sync_stream.h:101
Definition: call.h:338