GRPC C++  0.13.1-pre1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
async_stream.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015-2016, Google Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above
13  * copyright notice, this list of conditions and the following disclaimer
14  * in the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of Google Inc. nor the names of its
17  * contributors may be used to endorse or promote products derived from
18  * this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  */
33 
34 #ifndef GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H
35 #define GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H
36 
42 
43 namespace grpc {
44 
45 class CompletionQueue;
46 
49  public:
51 
56  virtual void ReadInitialMetadata(void* tag) = 0;
57 
62  virtual void Finish(Status* status, void* tag) = 0;
63 };
64 
66 template <class R>
68  public:
69  virtual ~AsyncReaderInterface() {}
70 
76  virtual void Read(R* msg, void* tag) = 0;
77 };
78 
80 template <class W>
82  public:
83  virtual ~AsyncWriterInterface() {}
84 
93  virtual void Write(const W& msg, void* tag) = 0;
94 };
95 
96 template <class R>
98  public AsyncReaderInterface<R> {};
99 
100 template <class R>
102  public:
104  template <class W>
106  const RpcMethod& method, ClientContext* context,
107  const W& request, void* tag)
108  : context_(context), call_(channel->CreateCall(method, context, cq)) {
109  init_ops_.set_output_tag(tag);
110  init_ops_.SendInitialMetadata(context->send_initial_metadata_);
111  // TODO(ctiller): don't assert
112  GPR_ASSERT(init_ops_.SendMessage(request).ok());
113  init_ops_.ClientSendClose();
114  call_.PerformOps(&init_ops_);
115  }
116 
118  GPR_ASSERT(!context_->initial_metadata_received_);
119 
120  meta_ops_.set_output_tag(tag);
121  meta_ops_.RecvInitialMetadata(context_);
122  call_.PerformOps(&meta_ops_);
123  }
124 
125  void Read(R* msg, void* tag) GRPC_OVERRIDE {
126  read_ops_.set_output_tag(tag);
127  if (!context_->initial_metadata_received_) {
128  read_ops_.RecvInitialMetadata(context_);
129  }
130  read_ops_.RecvMessage(msg);
131  call_.PerformOps(&read_ops_);
132  }
133 
134  void Finish(Status* status, void* tag) GRPC_OVERRIDE {
135  finish_ops_.set_output_tag(tag);
136  if (!context_->initial_metadata_received_) {
137  finish_ops_.RecvInitialMetadata(context_);
138  }
139  finish_ops_.ClientRecvStatus(context_, status);
140  call_.PerformOps(&finish_ops_);
141  }
142 
143  private:
144  ClientContext* context_;
145  Call call_;
147  init_ops_;
151 };
152 
154 template <class W>
156  public AsyncWriterInterface<W> {
157  public:
161  virtual void WritesDone(void* tag) = 0;
162 };
163 
164 template <class W>
166  public:
167  template <class R>
169  const RpcMethod& method, ClientContext* context,
170  R* response, void* tag)
171  : context_(context), call_(channel->CreateCall(method, context, cq)) {
172  finish_ops_.RecvMessage(response);
173 
174  init_ops_.set_output_tag(tag);
175  init_ops_.SendInitialMetadata(context->send_initial_metadata_);
176  call_.PerformOps(&init_ops_);
177  }
178 
180  GPR_ASSERT(!context_->initial_metadata_received_);
181 
182  meta_ops_.set_output_tag(tag);
183  meta_ops_.RecvInitialMetadata(context_);
184  call_.PerformOps(&meta_ops_);
185  }
186 
187  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
188  write_ops_.set_output_tag(tag);
189  // TODO(ctiller): don't assert
190  GPR_ASSERT(write_ops_.SendMessage(msg).ok());
191  call_.PerformOps(&write_ops_);
192  }
193 
194  void WritesDone(void* tag) GRPC_OVERRIDE {
195  writes_done_ops_.set_output_tag(tag);
196  writes_done_ops_.ClientSendClose();
197  call_.PerformOps(&writes_done_ops_);
198  }
199 
200  void Finish(Status* status, void* tag) GRPC_OVERRIDE {
201  finish_ops_.set_output_tag(tag);
202  if (!context_->initial_metadata_received_) {
203  finish_ops_.RecvInitialMetadata(context_);
204  }
205  finish_ops_.ClientRecvStatus(context_, status);
206  call_.PerformOps(&finish_ops_);
207  }
208 
209  private:
210  ClientContext* context_;
211  Call call_;
214  CallOpSet<CallOpSendMessage> write_ops_;
215  CallOpSet<CallOpClientSendClose> writes_done_ops_;
217  CallOpClientRecvStatus> finish_ops_;
218 };
219 
221 template <class W, class R>
223  public AsyncWriterInterface<W>,
224  public AsyncReaderInterface<R> {
225  public:
229  virtual void WritesDone(void* tag) = 0;
230 };
231 
232 template <class W, class R>
234  : public ClientAsyncReaderWriterInterface<W, R> {
235  public:
237  const RpcMethod& method, ClientContext* context,
238  void* tag)
239  : context_(context), call_(channel->CreateCall(method, context, cq)) {
240  init_ops_.set_output_tag(tag);
241  init_ops_.SendInitialMetadata(context->send_initial_metadata_);
242  call_.PerformOps(&init_ops_);
243  }
244 
246  GPR_ASSERT(!context_->initial_metadata_received_);
247 
248  meta_ops_.set_output_tag(tag);
249  meta_ops_.RecvInitialMetadata(context_);
250  call_.PerformOps(&meta_ops_);
251  }
252 
253  void Read(R* msg, void* tag) GRPC_OVERRIDE {
254  read_ops_.set_output_tag(tag);
255  if (!context_->initial_metadata_received_) {
256  read_ops_.RecvInitialMetadata(context_);
257  }
258  read_ops_.RecvMessage(msg);
259  call_.PerformOps(&read_ops_);
260  }
261 
262  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
263  write_ops_.set_output_tag(tag);
264  // TODO(ctiller): don't assert
265  GPR_ASSERT(write_ops_.SendMessage(msg).ok());
266  call_.PerformOps(&write_ops_);
267  }
268 
269  void WritesDone(void* tag) GRPC_OVERRIDE {
270  writes_done_ops_.set_output_tag(tag);
271  writes_done_ops_.ClientSendClose();
272  call_.PerformOps(&writes_done_ops_);
273  }
274 
275  void Finish(Status* status, void* tag) GRPC_OVERRIDE {
276  finish_ops_.set_output_tag(tag);
277  if (!context_->initial_metadata_received_) {
278  finish_ops_.RecvInitialMetadata(context_);
279  }
280  finish_ops_.ClientRecvStatus(context_, status);
281  call_.PerformOps(&finish_ops_);
282  }
283 
284  private:
285  ClientContext* context_;
286  Call call_;
290  CallOpSet<CallOpSendMessage> write_ops_;
291  CallOpSet<CallOpClientSendClose> writes_done_ops_;
293 };
294 
295 template <class W, class R>
297  public AsyncReaderInterface<R> {
298  public:
300  : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
301 
303  GPR_ASSERT(!ctx_->sent_initial_metadata_);
304 
305  meta_ops_.set_output_tag(tag);
306  meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
307  ctx_->sent_initial_metadata_ = true;
308  call_.PerformOps(&meta_ops_);
309  }
310 
311  void Read(R* msg, void* tag) GRPC_OVERRIDE {
312  read_ops_.set_output_tag(tag);
313  read_ops_.RecvMessage(msg);
314  call_.PerformOps(&read_ops_);
315  }
316 
317  void Finish(const W& msg, const Status& status, void* tag) {
318  finish_ops_.set_output_tag(tag);
319  if (!ctx_->sent_initial_metadata_) {
320  finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
321  ctx_->sent_initial_metadata_ = true;
322  }
323  // The response is dropped if the status is not OK.
324  if (status.ok()) {
325  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
326  finish_ops_.SendMessage(msg));
327  } else {
328  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
329  }
330  call_.PerformOps(&finish_ops_);
331  }
332 
333  void FinishWithError(const Status& status, void* tag) {
334  GPR_ASSERT(!status.ok());
335  finish_ops_.set_output_tag(tag);
336  if (!ctx_->sent_initial_metadata_) {
337  finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
338  ctx_->sent_initial_metadata_ = true;
339  }
340  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
341  call_.PerformOps(&finish_ops_);
342  }
343 
344  private:
345  void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
346 
347  Call call_;
348  ServerContext* ctx_;
349  CallOpSet<CallOpSendInitialMetadata> meta_ops_;
350  CallOpSet<CallOpRecvMessage<R>> read_ops_;
351  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
352  CallOpServerSendStatus> finish_ops_;
353 };
354 
355 template <class W>
357  public AsyncWriterInterface<W> {
358  public:
360  : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
361 
363  GPR_ASSERT(!ctx_->sent_initial_metadata_);
364 
365  meta_ops_.set_output_tag(tag);
366  meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
367  ctx_->sent_initial_metadata_ = true;
368  call_.PerformOps(&meta_ops_);
369  }
370 
371  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
372  write_ops_.set_output_tag(tag);
373  if (!ctx_->sent_initial_metadata_) {
374  write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
375  ctx_->sent_initial_metadata_ = true;
376  }
377  // TODO(ctiller): don't assert
378  GPR_ASSERT(write_ops_.SendMessage(msg).ok());
379  call_.PerformOps(&write_ops_);
380  }
381 
382  void Finish(const Status& status, void* tag) {
383  finish_ops_.set_output_tag(tag);
384  if (!ctx_->sent_initial_metadata_) {
385  finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
386  ctx_->sent_initial_metadata_ = true;
387  }
388  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
389  call_.PerformOps(&finish_ops_);
390  }
391 
392  private:
393  void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
394 
395  Call call_;
396  ServerContext* ctx_;
397  CallOpSet<CallOpSendInitialMetadata> meta_ops_;
398  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
399  CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
400 };
401 
403 template <class W, class R>
405  public AsyncWriterInterface<W>,
406  public AsyncReaderInterface<R> {
407  public:
409  : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
410 
412  GPR_ASSERT(!ctx_->sent_initial_metadata_);
413 
414  meta_ops_.set_output_tag(tag);
415  meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
416  ctx_->sent_initial_metadata_ = true;
417  call_.PerformOps(&meta_ops_);
418  }
419 
420  void Read(R* msg, void* tag) GRPC_OVERRIDE {
421  read_ops_.set_output_tag(tag);
422  read_ops_.RecvMessage(msg);
423  call_.PerformOps(&read_ops_);
424  }
425 
426  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
427  write_ops_.set_output_tag(tag);
428  if (!ctx_->sent_initial_metadata_) {
429  write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
430  ctx_->sent_initial_metadata_ = true;
431  }
432  // TODO(ctiller): don't assert
433  GPR_ASSERT(write_ops_.SendMessage(msg).ok());
434  call_.PerformOps(&write_ops_);
435  }
436 
437  void Finish(const Status& status, void* tag) {
438  finish_ops_.set_output_tag(tag);
439  if (!ctx_->sent_initial_metadata_) {
440  finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
441  ctx_->sent_initial_metadata_ = true;
442  }
443  finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
444  call_.PerformOps(&finish_ops_);
445  }
446 
447  private:
448  friend class ::grpc::Server;
449 
450  void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
451 
452  Call call_;
453  ServerContext* ctx_;
458 };
459 
460 } // namespace grpc
461 
462 #endif // GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H
Common interface for all client side asynchronous streaming.
Definition: async_stream.h:48
Definition: async_stream.h:233
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: async_stream.h:411
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:187
void WritesDone(void *tag) GRPC_OVERRIDE
Signal the client is done with the writes.
Definition: async_stream.h:194
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:420
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: async_stream.h:362
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Request notification of the reading of the initial metadata.
Definition: async_stream.h:245
virtual void WritesDone(void *tag)=0
Signal the client is done with the writes.
Definition: call.h:431
virtual void Write(const W &msg, void *tag)=0
Request the writing of msg with identifying tag tag.
Server-side interface for asynchronous bi-directional streaming.
Definition: async_stream.h:404
Definition: service_type.h:52
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:371
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:253
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:311
virtual void WritesDone(void *tag)=0
Signal the client is done with the writes.
Definition: client_context.h:152
void FinishWithError(const Status &status, void *tag)
Definition: async_stream.h:333
Definition: async_stream.h:356
ClientAsyncWriter(ChannelInterface *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, R *response, void *tag)
Definition: async_stream.h:168
virtual ~AsyncReaderInterface()
Definition: async_stream.h:69
ServerAsyncReaderWriter(ServerContext *ctx)
Definition: async_stream.h:408
ClientAsyncReaderWriter(ChannelInterface *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, void *tag)
Definition: async_stream.h:236
Definition: async_stream.h:97
Definition: call.h:576
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:262
Client-side interface for asynchronous bi-directional streaming.
Definition: async_stream.h:222
Codegen interface for grpc::Channel.
Definition: channel_interface.h:64
ServerAsyncWriter(ServerContext *ctx)
Definition: async_stream.h:359
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Request notification of the reading of the initial metadata.
Definition: async_stream.h:179
An interface that can be fed a sequence of messages of type W.
Definition: async_stream.h:81
Definition: async_stream.h:165
Primary implementaiton of CallOpSetInterface.
Definition: call.h:524
Definition: server_context.h:90
void Finish(const W &msg, const Status &status, void *tag)
Definition: async_stream.h:317
A thin wrapper around grpc_completion_queue (see / src/core/surface/completion_queue.h).
Definition: completion_queue.h:81
#define GRPC_FINAL
Definition: config.h:71
virtual ~ClientAsyncStreamingInterface()
Definition: async_stream.h:50
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Request notification completion.
Definition: async_stream.h:200
void Finish(const Status &status, void *tag)
Definition: async_stream.h:382
virtual void ReadInitialMetadata(void *tag)=0
Request notification of the reading of the initial metadata.
Definition: rpc_method.h:43
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Request notification completion.
Definition: async_stream.h:134
void PerformOps(CallOpSetInterface *ops)
bool ok() const
Is the status OK?
Definition: status.h:67
Did it work? If it didn't, why?
Definition: status.h:45
void Finish(const Status &status, void *tag)
Definition: async_stream.h:437
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Request the writing of msg with identifying tag tag.
Definition: async_stream.h:426
virtual void Finish(Status *status, void *tag)=0
Request notification completion.
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: async_stream.h:302
void Read(R *msg, void *tag) GRPC_OVERRIDE
Read a message of type R into msg.
Definition: async_stream.h:125
Definition: async_stream.h:101
virtual ~AsyncWriterInterface()
Definition: async_stream.h:83
#define GRPC_OVERRIDE
Definition: config.h:77
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Request notification completion.
Definition: async_stream.h:275
Definition: async_stream.h:296
Definition: call.h:401
void WritesDone(void *tag) GRPC_OVERRIDE
Signal the client is done with the writes.
Definition: async_stream.h:269
ClientAsyncReader(ChannelInterface *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, const W &request, void *tag)
Create a stream and write the first request out.
Definition: async_stream.h:105
virtual void Read(R *msg, void *tag)=0
Read a message of type R into msg.
Definition: call.h:294
ServerAsyncReader(ServerContext *ctx)
Definition: async_stream.h:299
An interface that yields a sequence of messages of type R.
Definition: async_stream.h:67
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Request notification of the reading of the initial metadata.
Definition: async_stream.h:117
Common interface for client side asynchronous writing.
Definition: async_stream.h:155