GRPC C++  1.0.0
async_stream.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015, Google Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above
13  * copyright notice, this list of conditions and the following disclaimer
14  * in the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of Google Inc. nor the names of its
17  * contributors may be used to endorse or promote products derived from
18  * this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  */
33 
34 #ifndef GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H
35 #define GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H
36 
43 
44 namespace grpc {
45 
46 class CompletionQueue;
47 
50  public:
52 
59  virtual void ReadInitialMetadata(void* tag) = 0;
60 
66  virtual void Finish(Status* status, void* tag) = 0;
67 };
68 
70 template <class R>
72  public:
73  virtual ~AsyncReaderInterface() {}
74 
85  virtual void Read(R* msg, void* tag) = 0;
86 };
87 
89 template <class W>
91  public:
92  virtual ~AsyncWriterInterface() {}
93 
103  virtual void Write(const W& msg, void* tag) = 0;
104 };
105 
106 template <class R>
108  public AsyncReaderInterface<R> {};
109 
110 template <class R>
112  public:
114  template <class W>
116  const RpcMethod& method, ClientContext* context,
117  const W& request, void* tag)
118  : context_(context), call_(channel->CreateCall(method, context, cq)) {
119  init_ops_.set_output_tag(tag);
120  init_ops_.SendInitialMetadata(context->send_initial_metadata_,
121  context->initial_metadata_flags());
122  // TODO(ctiller): don't assert
123  GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok());
124  init_ops_.ClientSendClose();
125  call_.PerformOps(&init_ops_);
126  }
127 
129  GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
130 
131  meta_ops_.set_output_tag(tag);
132  meta_ops_.RecvInitialMetadata(context_);
133  call_.PerformOps(&meta_ops_);
134  }
135 
136  void Read(R* msg, void* tag) GRPC_OVERRIDE {
137  read_ops_.set_output_tag(tag);
138  if (!context_->initial_metadata_received_) {
139  read_ops_.RecvInitialMetadata(context_);
140  }
141  read_ops_.RecvMessage(msg);
142  call_.PerformOps(&read_ops_);
143  }
144 
145  void Finish(Status* status, void* tag) GRPC_OVERRIDE {
146  finish_ops_.set_output_tag(tag);
147  if (!context_->initial_metadata_received_) {
148  finish_ops_.RecvInitialMetadata(context_);
149  }
150  finish_ops_.ClientRecvStatus(context_, status);
151  call_.PerformOps(&finish_ops_);
152  }
153 
154  private:
155  ClientContext* context_;
156  Call call_;
158  init_ops_;
162 };
163 
165 template <class W>
167  public AsyncWriterInterface<W> {
168  public:
173  virtual void WritesDone(void* tag) = 0;
174 };
175 
176 template <class W>
178  public:
179  template <class R>
181  const RpcMethod& method, ClientContext* context,
182  R* response, void* tag)
183  : context_(context), call_(channel->CreateCall(method, context, cq)) {
184  finish_ops_.RecvMessage(response);
185  finish_ops_.AllowNoMessage();
186 
187  init_ops_.set_output_tag(tag);
188  init_ops_.SendInitialMetadata(context->send_initial_metadata_,
189  context->initial_metadata_flags());
190  call_.PerformOps(&init_ops_);
191  }
192 
194  GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
195 
196  meta_ops_.set_output_tag(tag);
197  meta_ops_.RecvInitialMetadata(context_);
198  call_.PerformOps(&meta_ops_);
199  }
200 
201  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
202  write_ops_.set_output_tag(tag);
203  // TODO(ctiller): don't assert
204  GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
205  call_.PerformOps(&write_ops_);
206  }
207 
208  void WritesDone(void* tag) GRPC_OVERRIDE {
209  writes_done_ops_.set_output_tag(tag);
210  writes_done_ops_.ClientSendClose();
211  call_.PerformOps(&writes_done_ops_);
212  }
213 
214  void Finish(Status* status, void* tag) GRPC_OVERRIDE {
215  finish_ops_.set_output_tag(tag);
216  if (!context_->initial_metadata_received_) {
217  finish_ops_.RecvInitialMetadata(context_);
218  }
219  finish_ops_.ClientRecvStatus(context_, status);
220  call_.PerformOps(&finish_ops_);
221  }
222 
223  private:
224  ClientContext* context_;
225  Call call_;
228  CallOpSet<CallOpSendMessage> write_ops_;
229  CallOpSet<CallOpClientSendClose> writes_done_ops_;
232  finish_ops_;
233 };
234 
236 template <class W, class R>
238  public AsyncWriterInterface<W>,
239  public AsyncReaderInterface<R> {
240  public:
245  virtual void WritesDone(void* tag) = 0;
246 };
247 
248 template <class W, class R>
250  : public ClientAsyncReaderWriterInterface<W, R> {
251  public:
253  const RpcMethod& method, ClientContext* context,
254  void* tag)
255  : context_(context), call_(channel->CreateCall(method, context, cq)) {
256  init_ops_.set_output_tag(tag);
257  init_ops_.SendInitialMetadata(context->send_initial_metadata_,
258  context->initial_metadata_flags());
259  call_.PerformOps(&init_ops_);
260  }
261 
263  GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
264 
265  meta_ops_.set_output_tag(tag);
266  meta_ops_.RecvInitialMetadata(context_);
267  call_.PerformOps(&meta_ops_);
268  }
269 
270  void Read(R* msg, void* tag) GRPC_OVERRIDE {
271  read_ops_.set_output_tag(tag);
272  if (!context_->initial_metadata_received_) {
273  read_ops_.RecvInitialMetadata(context_);
274  }
275  read_ops_.RecvMessage(msg);
276  call_.PerformOps(&read_ops_);
277  }
278 
279  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
280  write_ops_.set_output_tag(tag);
281  // TODO(ctiller): don't assert
282  GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
283  call_.PerformOps(&write_ops_);
284  }
285 
286  void WritesDone(void* tag) GRPC_OVERRIDE {
287  writes_done_ops_.set_output_tag(tag);
288  writes_done_ops_.ClientSendClose();
289  call_.PerformOps(&writes_done_ops_);
290  }
291 
292  void Finish(Status* status, void* tag) GRPC_OVERRIDE {
293  finish_ops_.set_output_tag(tag);
294  if (!context_->initial_metadata_received_) {
295  finish_ops_.RecvInitialMetadata(context_);
296  }
297  finish_ops_.ClientRecvStatus(context_, status);
298  call_.PerformOps(&finish_ops_);
299  }
300 
301  private:
302  ClientContext* context_;
303  Call call_;
307  CallOpSet<CallOpSendMessage> write_ops_;
308  CallOpSet<CallOpClientSendClose> writes_done_ops_;
310 };
311 
312 template <class W, class R>
314  public AsyncReaderInterface<R> {
315  public:
316  virtual void Finish(const W& msg, const Status& status, void* tag) = 0;
317 
318  virtual void FinishWithError(const Status& status, void* tag) = 0;
319 };
320 
321 template <class W, class R>
323  public:
325  : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
326 
328  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
329 
330  meta_ops_.set_output_tag(tag);
331  meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
332  ctx_->initial_metadata_flags());
333  if (ctx_->compression_level_set()) {
334  meta_ops_.set_compression_level(ctx_->compression_level());
335  }
336  ctx_->sent_initial_metadata_ = true;
337  call_.PerformOps(&meta_ops_);
338  }
339 
340  void Read(R* msg, void* tag) GRPC_OVERRIDE {
341  read_ops_.set_output_tag(tag);
342  read_ops_.RecvMessage(msg);
343  call_.PerformOps(&read_ops_);
344  }
345 
346  void Finish(const W& msg, const Status& status, void* tag) GRPC_OVERRIDE {
347  finish_ops_.set_output_tag(tag);
348  if (!ctx_->sent_initial_metadata_) {
349  finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
350  ctx_->initial_metadata_flags());
351  if (ctx_->compression_level_set()) {
352  finish_ops_.set_compression_level(ctx_->compression_level());
353  }
354  ctx_->sent_initial_metadata_ = true;
355  }
356  // The response is dropped if the status is not OK.
357  if (status.ok()) {
358  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
359  finish_ops_.SendMessage(msg));
360  } else {
361  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
362  }
363  call_.PerformOps(&finish_ops_);
364  }
365 
366  void FinishWithError(const Status& status, void* tag) GRPC_OVERRIDE {
367  GPR_CODEGEN_ASSERT(!status.ok());
368  finish_ops_.set_output_tag(tag);
369  if (!ctx_->sent_initial_metadata_) {
370  finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
371  ctx_->initial_metadata_flags());
372  if (ctx_->compression_level_set()) {
373  finish_ops_.set_compression_level(ctx_->compression_level());
374  }
375  ctx_->sent_initial_metadata_ = true;
376  }
377  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
378  call_.PerformOps(&finish_ops_);
379  }
380 
381  private:
382  void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
383 
384  Call call_;
385  ServerContext* ctx_;
390  finish_ops_;
391 };
392 
393 template <class W>
395  public AsyncWriterInterface<W> {
396  public:
397  virtual void Finish(const Status& status, void* tag) = 0;
398 };
399 
400 template <class W>
402  public:
404  : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
405 
407  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
408 
409  meta_ops_.set_output_tag(tag);
410  meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
411  ctx_->initial_metadata_flags());
412  if (ctx_->compression_level_set()) {
413  meta_ops_.set_compression_level(ctx_->compression_level());
414  }
415  ctx_->sent_initial_metadata_ = true;
416  call_.PerformOps(&meta_ops_);
417  }
418 
419  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
420  write_ops_.set_output_tag(tag);
421  if (!ctx_->sent_initial_metadata_) {
422  write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
423  ctx_->initial_metadata_flags());
424  if (ctx_->compression_level_set()) {
425  write_ops_.set_compression_level(ctx_->compression_level());
426  }
427  ctx_->sent_initial_metadata_ = true;
428  }
429  // TODO(ctiller): don't assert
430  GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
431  call_.PerformOps(&write_ops_);
432  }
433 
434  void Finish(const Status& status, void* tag) GRPC_OVERRIDE {
435  finish_ops_.set_output_tag(tag);
436  if (!ctx_->sent_initial_metadata_) {
437  finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
438  ctx_->initial_metadata_flags());
439  if (ctx_->compression_level_set()) {
440  finish_ops_.set_compression_level(ctx_->compression_level());
441  }
442  ctx_->sent_initial_metadata_ = true;
443  }
444  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
445  call_.PerformOps(&finish_ops_);
446  }
447 
448  private:
449  void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
450 
451  Call call_;
452  ServerContext* ctx_;
456 };
457 
459 template <class W, class R>
461  public AsyncWriterInterface<W>,
462  public AsyncReaderInterface<R> {
463  public:
464  virtual void Finish(const Status& status, void* tag) = 0;
465 };
466 
467 template <class W, class R>
469  : public ServerAsyncReaderWriterInterface<W, R> {
470  public:
472  : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
473 
475  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
476 
477  meta_ops_.set_output_tag(tag);
478  meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
479  ctx_->initial_metadata_flags());
480  if (ctx_->compression_level_set()) {
481  meta_ops_.set_compression_level(ctx_->compression_level());
482  }
483  ctx_->sent_initial_metadata_ = true;
484  call_.PerformOps(&meta_ops_);
485  }
486 
487  void Read(R* msg, void* tag) GRPC_OVERRIDE {
488  read_ops_.set_output_tag(tag);
489  read_ops_.RecvMessage(msg);
490  call_.PerformOps(&read_ops_);
491  }
492 
493  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
494  write_ops_.set_output_tag(tag);
495  if (!ctx_->sent_initial_metadata_) {
496  write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
497  ctx_->initial_metadata_flags());
498  if (ctx_->compression_level_set()) {
499  write_ops_.set_compression_level(ctx_->compression_level());
500  }
501  ctx_->sent_initial_metadata_ = true;
502  }
503  // TODO(ctiller): don't assert
504  GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
505  call_.PerformOps(&write_ops_);
506  }
507 
508  void Finish(const Status& status, void* tag) GRPC_OVERRIDE {
509  finish_ops_.set_output_tag(tag);
510  if (!ctx_->sent_initial_metadata_) {
511  finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
512  ctx_->initial_metadata_flags());
513  if (ctx_->compression_level_set()) {
514  finish_ops_.set_compression_level(ctx_->compression_level());
515  }
516  ctx_->sent_initial_metadata_ = true;
517  }
518  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
519  call_.PerformOps(&finish_ops_);
520  }
521 
522  private:
523  friend class ::grpc::Server;
524 
525  void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
526 
527  Call call_;
528  ServerContext* ctx_;
533 };
534 
535 } // namespace grpc
536 
537 #endif // GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H
Common interface for all client side asynchronous streaming.
Definition: async_stream.h:49
Definition: async_stream.h:249
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: async_stream.h:474
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:201
void WritesDone(void *tag) GRPC_OVERRIDE
Signal the client is done with the writes.
Definition: async_stream.h:208
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:487
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: async_stream.h:406
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:97
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Request notification of the reading of the initial metadata.
Definition: async_stream.h:262
Definition: call.h:500
Definition: async_stream.h:468
Definition: service_type.h:53
void Finish(const Status &status, void *tag) GRPC_OVERRIDE
Definition: async_stream.h:434
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:419
Definition: async_stream.h:313
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:270
Definition: call.h:427
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:340
Definition: client_context.h:154
void Finish(const Status &status, void *tag) GRPC_OVERRIDE
Definition: async_stream.h:508
Definition: async_stream.h:401
ClientAsyncWriter(ChannelInterface *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, R *response, void *tag)
Definition: async_stream.h:180
Definition: call.h:232
virtual ~AsyncReaderInterface()
Definition: async_stream.h:73
ServerAsyncReaderWriter(ServerContext *ctx)
Definition: async_stream.h:471
ClientAsyncReaderWriter(ChannelInterface *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, void *tag)
Definition: async_stream.h:252
Definition: async_stream.h:107
Definition: call.h:645
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:279
Client-side interface for asynchronous bi-directional streaming.
Definition: async_stream.h:237
Definition: alarm.h:48
Codegen interface for grpc::Channel.
Definition: channel_interface.h:64
ServerAsyncWriter(ServerContext *ctx)
Definition: async_stream.h:403
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Request notification of the reading of the initial metadata.
Definition: async_stream.h:193
An interface that can be fed a sequence of messages of type W.
Definition: async_stream.h:90
Definition: async_stream.h:177
Primary implementaiton of CallOpSetInterface.
Definition: call.h:593
void FinishWithError(const Status &status, void *tag) GRPC_OVERRIDE
Definition: async_stream.h:366
Server-side interface for asynchronous bi-directional streaming.
Definition: async_stream.h:460
Definition: server_context.h:91
A thin wrapper around grpc_completion_queue (see / src/core/surface/completion_queue.h).
Definition: completion_queue.h:97
#define GRPC_FINAL
Definition: config.h:72
virtual ~ClientAsyncStreamingInterface()
Definition: async_stream.h:51
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Indicate that the stream is to be finished and request notification Should not be used concurrently w...
Definition: async_stream.h:214
virtual void ReadInitialMetadata(void *tag)=0
Request notification of the reading of the initial metadata.
Definition: rpc_method.h:43
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Indicate that the stream is to be finished and request notification Should not be used concurrently w...
Definition: async_stream.h:145
Did it work? If it didn&#39;t, why?
Definition: status.h:45
void Finish(const W &msg, const Status &status, void *tag) GRPC_OVERRIDE
Definition: async_stream.h:346
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:493
virtual void Finish(Status *status, void *tag)=0
Indicate that the stream is to be finished and request notification Should not be used concurrently w...
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: async_stream.h:327
Definition: async_stream.h:394
Definition: call.h:181
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:136
Definition: async_stream.h:111
virtual ~AsyncWriterInterface()
Definition: async_stream.h:92
#define GRPC_OVERRIDE
Definition: config.h:78
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Indicate that the stream is to be finished and request notification Should not be used concurrently w...
Definition: async_stream.h:292
Definition: async_stream.h:322
Definition: call.h:470
void WritesDone(void *tag) GRPC_OVERRIDE
Signal the client is done with the writes.
Definition: async_stream.h:286
ClientAsyncReader(ChannelInterface *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, const W &request, void *tag)
Create a stream and write the first request out.
Definition: async_stream.h:115
Definition: call.h:353
ServerAsyncReader(ServerContext *ctx)
Definition: async_stream.h:324
An interface that yields a sequence of messages of type R.
Definition: async_stream.h:71
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Request notification of the reading of the initial metadata.
Definition: async_stream.h:128
Common interface for client side asynchronous writing.
Definition: async_stream.h:166