GRPC C++  1.0.0
sync_stream.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015, Google Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above
13  * copyright notice, this list of conditions and the following disclaimer
14  * in the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of Google Inc. nor the names of its
17  * contributors may be used to endorse or promote products derived from
18  * this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  */
33 
34 #ifndef GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
35 #define GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
36 
45 #include <grpc/impl/codegen/log.h>
46 
47 namespace grpc {
48 
51  public:
53 
64  virtual Status Finish() = 0;
65 };
66 
68 template <class R>
70  public:
71  virtual ~ReaderInterface() {}
72 
83  virtual bool Read(R* msg) = 0;
84 };
85 
87 template <class W>
89  public:
90  virtual ~WriterInterface() {}
91 
99  virtual bool Write(const W& msg, const WriteOptions& options) = 0;
100 
107  inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
108 };
109 
111 template <class R>
113  public ReaderInterface<R> {
114  public:
119  virtual void WaitForInitialMetadata() = 0;
120 };
121 
122 template <class R>
123 class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
124  public:
126  template <class W>
127  ClientReader(ChannelInterface* channel, const RpcMethod& method,
128  ClientContext* context, const W& request)
129  : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
132  ops;
133  ops.SendInitialMetadata(context->send_initial_metadata_,
134  context->initial_metadata_flags());
135  // TODO(ctiller): don't assert
136  GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
137  ops.ClientSendClose();
138  call_.PerformOps(&ops);
139  cq_.Pluck(&ops);
140  }
141 
143  GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
144 
146  ops.RecvInitialMetadata(context_);
147  call_.PerformOps(&ops);
148  cq_.Pluck(&ops);
149  }
150 
151  bool Read(R* msg) GRPC_OVERRIDE {
153  if (!context_->initial_metadata_received_) {
154  ops.RecvInitialMetadata(context_);
155  }
156  ops.RecvMessage(msg);
157  call_.PerformOps(&ops);
158  return cq_.Pluck(&ops) && ops.got_message;
159  }
160 
163  Status status;
164  ops.ClientRecvStatus(context_, &status);
165  call_.PerformOps(&ops);
166  GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
167  return status;
168  }
169 
170  private:
171  ClientContext* context_;
172  CompletionQueue cq_;
173  Call call_;
174 };
175 
177 template <class W>
179  public WriterInterface<W> {
180  public:
186  virtual bool WritesDone() = 0;
187 };
188 
189 template <class W>
190 class ClientWriter : public ClientWriterInterface<W> {
191  public:
193  template <class R>
194  ClientWriter(ChannelInterface* channel, const RpcMethod& method,
195  ClientContext* context, R* response)
196  : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
197  finish_ops_.RecvMessage(response);
198  finish_ops_.AllowNoMessage();
199 
201  ops.SendInitialMetadata(context->send_initial_metadata_,
202  context->initial_metadata_flags());
203  call_.PerformOps(&ops);
204  cq_.Pluck(&ops);
205  }
206 
208  GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
209 
211  ops.RecvInitialMetadata(context_);
212  call_.PerformOps(&ops);
213  cq_.Pluck(&ops); // status ignored
214  }
215 
217  bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
219  if (!ops.SendMessage(msg, options).ok()) {
220  return false;
221  }
222  call_.PerformOps(&ops);
223  return cq_.Pluck(&ops);
224  }
225 
228  ops.ClientSendClose();
229  call_.PerformOps(&ops);
230  return cq_.Pluck(&ops);
231  }
232 
235  Status status;
236  if (!context_->initial_metadata_received_) {
237  finish_ops_.RecvInitialMetadata(context_);
238  }
239  finish_ops_.ClientRecvStatus(context_, &status);
240  call_.PerformOps(&finish_ops_);
241  GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_));
242  return status;
243  }
244 
245  private:
246  ClientContext* context_;
249  finish_ops_;
250  CompletionQueue cq_;
251  Call call_;
252 };
253 
255 template <class W, class R>
257  public WriterInterface<W>,
258  public ReaderInterface<R> {
259  public:
264  virtual void WaitForInitialMetadata() = 0;
265 
270  virtual bool WritesDone() = 0;
271 };
272 
273 template <class W, class R>
274 class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
275  public:
278  ClientContext* context)
279  : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
281  ops.SendInitialMetadata(context->send_initial_metadata_,
282  context->initial_metadata_flags());
283  call_.PerformOps(&ops);
284  cq_.Pluck(&ops);
285  }
286 
288  GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
289 
291  ops.RecvInitialMetadata(context_);
292  call_.PerformOps(&ops);
293  cq_.Pluck(&ops); // status ignored
294  }
295 
296  bool Read(R* msg) GRPC_OVERRIDE {
298  if (!context_->initial_metadata_received_) {
299  ops.RecvInitialMetadata(context_);
300  }
301  ops.RecvMessage(msg);
302  call_.PerformOps(&ops);
303  return cq_.Pluck(&ops) && ops.got_message;
304  }
305 
307  bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
309  if (!ops.SendMessage(msg, options).ok()) return false;
310  call_.PerformOps(&ops);
311  return cq_.Pluck(&ops);
312  }
313 
316  ops.ClientSendClose();
317  call_.PerformOps(&ops);
318  return cq_.Pluck(&ops);
319  }
320 
323  if (!context_->initial_metadata_received_) {
324  ops.RecvInitialMetadata(context_);
325  }
326  Status status;
327  ops.ClientRecvStatus(context_, &status);
328  call_.PerformOps(&ops);
329  GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
330  return status;
331  }
332 
333  private:
334  ClientContext* context_;
335  CompletionQueue cq_;
336  Call call_;
337 };
338 
339 template <class R>
340 class ServerReader GRPC_FINAL : public ReaderInterface<R> {
341  public:
342  ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
343 
345  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
346 
348  ops.SendInitialMetadata(ctx_->initial_metadata_,
349  ctx_->initial_metadata_flags());
350  if (ctx_->compression_level_set()) {
351  ops.set_compression_level(ctx_->compression_level());
352  }
353  ctx_->sent_initial_metadata_ = true;
354  call_->PerformOps(&ops);
355  call_->cq()->Pluck(&ops);
356  }
357 
358  bool Read(R* msg) GRPC_OVERRIDE {
360  ops.RecvMessage(msg);
361  call_->PerformOps(&ops);
362  return call_->cq()->Pluck(&ops) && ops.got_message;
363  }
364 
365  private:
366  Call* const call_;
367  ServerContext* const ctx_;
368 };
369 
370 template <class W>
371 class ServerWriter GRPC_FINAL : public WriterInterface<W> {
372  public:
373  ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
374 
376  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
377 
379  ops.SendInitialMetadata(ctx_->initial_metadata_,
380  ctx_->initial_metadata_flags());
381  if (ctx_->compression_level_set()) {
382  ops.set_compression_level(ctx_->compression_level());
383  }
384  ctx_->sent_initial_metadata_ = true;
385  call_->PerformOps(&ops);
386  call_->cq()->Pluck(&ops);
387  }
388 
390  bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
392  if (!ops.SendMessage(msg, options).ok()) {
393  return false;
394  }
395  if (!ctx_->sent_initial_metadata_) {
396  ops.SendInitialMetadata(ctx_->initial_metadata_,
397  ctx_->initial_metadata_flags());
398  if (ctx_->compression_level_set()) {
399  ops.set_compression_level(ctx_->compression_level());
400  }
401  ctx_->sent_initial_metadata_ = true;
402  }
403  call_->PerformOps(&ops);
404  return call_->cq()->Pluck(&ops);
405  }
406 
407  private:
408  Call* const call_;
409  ServerContext* const ctx_;
410 };
411 
413 template <class W, class R>
414 class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
415  public ReaderInterface<R> {
416  public:
417  ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
418 
420  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
421 
423  ops.SendInitialMetadata(ctx_->initial_metadata_,
424  ctx_->initial_metadata_flags());
425  if (ctx_->compression_level_set()) {
426  ops.set_compression_level(ctx_->compression_level());
427  }
428  ctx_->sent_initial_metadata_ = true;
429  call_->PerformOps(&ops);
430  call_->cq()->Pluck(&ops);
431  }
432 
433  bool Read(R* msg) GRPC_OVERRIDE {
435  ops.RecvMessage(msg);
436  call_->PerformOps(&ops);
437  return call_->cq()->Pluck(&ops) && ops.got_message;
438  }
439 
441  bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
443  if (!ops.SendMessage(msg, options).ok()) {
444  return false;
445  }
446  if (!ctx_->sent_initial_metadata_) {
447  ops.SendInitialMetadata(ctx_->initial_metadata_,
448  ctx_->initial_metadata_flags());
449  if (ctx_->compression_level_set()) {
450  ops.set_compression_level(ctx_->compression_level());
451  }
452  ctx_->sent_initial_metadata_ = true;
453  }
454  call_->PerformOps(&ops);
455  return call_->cq()->Pluck(&ops);
456  }
457 
458  private:
459  Call* const call_;
460  ServerContext* const ctx_;
461 };
462 
463 } // namespace grpc
464 
465 #endif // GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
Definition: channel_interface.h:49
Client-side interface for streaming writes of message of type W.
Definition: sync_stream.h:178
Client-side interface for streaming reads of message of type R.
Definition: sync_stream.h:112
bool Read(R *msg) GRPC_OVERRIDE
Blocking read a message and parse to msg.
Definition: sync_stream.h:433
void SendInitialMetadata()
Definition: sync_stream.h:375
An interface that yields a sequence of messages of type R.
Definition: sync_stream.h:69
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:97
void SendInitialMetadata()
Definition: sync_stream.h:344
bool Read(R *msg) GRPC_OVERRIDE
Blocking read a message and parse to msg.
Definition: sync_stream.h:296
Definition: call.h:500
virtual ~ReaderInterface()
Definition: sync_stream.h:71
bool Write(const W &msg, const WriteOptions &options) GRPC_OVERRIDE
Blocking write msg to the stream with options.
Definition: sync_stream.h:390
Status Finish() GRPC_OVERRIDE
Wait until the stream finishes, and return the final status.
Definition: sync_stream.h:161
Definition: client_context.h:154
Definition: completion_queue.h:68
void WaitForInitialMetadata() GRPC_OVERRIDE
Blocking wait for initial metadata from server.
Definition: sync_stream.h:142
bool WritesDone() GRPC_OVERRIDE
Block until currently-pending writes are completed.
Definition: sync_stream.h:314
Definition: call.h:232
ServerReader(Call *call, ServerContext *ctx)
Definition: sync_stream.h:342
void SendInitialMetadata()
Definition: sync_stream.h:419
bool WritesDone() GRPC_OVERRIDE
Half close writing from the client.
Definition: sync_stream.h:226
bool Write(const W &msg, const WriteOptions &options) GRPC_OVERRIDE
Blocking write msg to the stream with options.
Definition: sync_stream.h:441
bool Write(const W &msg, const WriteOptions &options) GRPC_OVERRIDE
Blocking write msg to the stream with options.
Definition: sync_stream.h:217
Definition: call.h:407
virtual ~ClientStreamingInterface()
Definition: sync_stream.h:52
ClientReader(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const W &request)
Blocking create a stream and write the first request out.
Definition: sync_stream.h:127
Status Finish() GRPC_OVERRIDE
Read the final response and wait for the final status.
Definition: sync_stream.h:234
Definition: call.h:645
Definition: alarm.h:48
bool Read(R *msg) GRPC_OVERRIDE
Blocking read a message and parse to msg.
Definition: sync_stream.h:151
Codegen interface for grpc::Channel.
Definition: channel_interface.h:64
Primary implementaiton of CallOpSetInterface.
Definition: call.h:593
Definition: server_context.h:91
Per-message write options.
Definition: call.h:98
ClientReaderWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context)
Blocking create a stream.
Definition: sync_stream.h:277
bool Write(const W &msg)
Blocking write msg to the stream with default options.
Definition: sync_stream.h:107
Definition: completion_queue.h:70
A thin wrapper around grpc_completion_queue (see / src/core/surface/completion_queue.h).
Definition: completion_queue.h:97
Definition: rpc_method.h:43
virtual Status Finish()=0
Wait until the stream finishes, and return the final status.
void WaitForInitialMetadata() GRPC_OVERRIDE
Blocking wait for initial metadata from server.
Definition: sync_stream.h:287
ServerReaderWriter(Call *call, ServerContext *ctx)
Definition: sync_stream.h:417
Server-side interface for bi-directional streaming.
Definition: completion_queue.h:72
An interface that can be fed a sequence of messages of type W.
Definition: sync_stream.h:88
Client-side interface for bi-directional streaming.
Definition: sync_stream.h:256
Did it work? If it didn&#39;t, why?
Definition: status.h:45
bool Write(const W &msg, const WriteOptions &options) GRPC_OVERRIDE
Blocking write msg to the stream with options.
Definition: sync_stream.h:307
Common interface for all synchronous client side streaming.
Definition: sync_stream.h:50
ServerWriter(Call *call, ServerContext *ctx)
Definition: sync_stream.h:373
Definition: call.h:181
bool Read(R *msg) GRPC_OVERRIDE
Blocking read a message and parse to msg.
Definition: sync_stream.h:358
void WaitForInitialMetadata()
Definition: sync_stream.h:207
Definition: channel_interface.h:53
Definition: channel_interface.h:51
#define GRPC_OVERRIDE
Definition: config.h:78
Definition: call.h:470
ClientWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, R *response)
Blocking create a stream.
Definition: sync_stream.h:194
virtual ~WriterInterface()
Definition: sync_stream.h:90
Definition: call.h:353
Status Finish() GRPC_OVERRIDE
Wait until the stream finishes, and return the final status.
Definition: sync_stream.h:321