GRPC C++  0.13.1-pre1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sync_stream.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015-2016, Google Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above
13  * copyright notice, this list of conditions and the following disclaimer
14  * in the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of Google Inc. nor the names of its
17  * contributors may be used to endorse or promote products derived from
18  * this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  */
33 
34 #ifndef GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
35 #define GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
36 
44 #include <grpc/impl/codegen/log.h>
45 
46 namespace grpc {
47 
50  public:
52 
63  virtual Status Finish() = 0;
64 };
65 
67 template <class R>
69  public:
70  virtual ~ReaderInterface() {}
71 
79  virtual bool Read(R* msg) = 0;
80 };
81 
83 template <class W>
85  public:
86  virtual ~WriterInterface() {}
87 
94  virtual bool Write(const W& msg, const WriteOptions& options) = 0;
95 
101  inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
102 };
103 
105 template <class R>
107  public ReaderInterface<R> {
108  public:
113  virtual void WaitForInitialMetadata() = 0;
114 };
115 
116 template <class R>
118  public:
120  template <class W>
121  ClientReader(ChannelInterface* channel, const RpcMethod& method,
122  ClientContext* context, const W& request)
123  : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
126  ops.SendInitialMetadata(context->send_initial_metadata_);
127  // TODO(ctiller): don't assert
128  GPR_ASSERT(ops.SendMessage(request).ok());
129  ops.ClientSendClose();
130  call_.PerformOps(&ops);
131  cq_.Pluck(&ops);
132  }
133 
135  GPR_ASSERT(!context_->initial_metadata_received_);
136 
138  ops.RecvInitialMetadata(context_);
139  call_.PerformOps(&ops);
140  cq_.Pluck(&ops);
141  }
142 
143  bool Read(R* msg) GRPC_OVERRIDE {
145  if (!context_->initial_metadata_received_) {
146  ops.RecvInitialMetadata(context_);
147  }
148  ops.RecvMessage(msg);
149  call_.PerformOps(&ops);
150  return cq_.Pluck(&ops) && ops.got_message;
151  }
152 
155  Status status;
156  ops.ClientRecvStatus(context_, &status);
157  call_.PerformOps(&ops);
158  GPR_ASSERT(cq_.Pluck(&ops));
159  return status;
160  }
161 
162  private:
163  ClientContext* context_;
164  CompletionQueue cq_;
165  Call call_;
166 };
167 
169 template <class W>
171  public WriterInterface<W> {
172  public:
177  virtual bool WritesDone() = 0;
178 };
179 
180 template <class W>
181 class ClientWriter : public ClientWriterInterface<W> {
182  public:
184  template <class R>
185  ClientWriter(ChannelInterface* channel, const RpcMethod& method,
186  ClientContext* context, R* response)
187  : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
188  finish_ops_.RecvMessage(response);
189 
191  ops.SendInitialMetadata(context->send_initial_metadata_);
192  call_.PerformOps(&ops);
193  cq_.Pluck(&ops);
194  }
195 
197  GPR_ASSERT(!context_->initial_metadata_received_);
198 
200  ops.RecvInitialMetadata(context_);
201  call_.PerformOps(&ops);
202  cq_.Pluck(&ops); // status ignored
203  }
204 
206  bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
208  if (!ops.SendMessage(msg, options).ok()) {
209  return false;
210  }
211  call_.PerformOps(&ops);
212  return cq_.Pluck(&ops);
213  }
214 
217  ops.ClientSendClose();
218  call_.PerformOps(&ops);
219  return cq_.Pluck(&ops);
220  }
221 
224  Status status;
225  if (!context_->initial_metadata_received_) {
226  finish_ops_.RecvInitialMetadata(context_);
227  }
228  finish_ops_.ClientRecvStatus(context_, &status);
229  call_.PerformOps(&finish_ops_);
230  GPR_ASSERT(cq_.Pluck(&finish_ops_));
231  return status;
232  }
233 
234  private:
235  ClientContext* context_;
237  CallOpClientRecvStatus> finish_ops_;
238  CompletionQueue cq_;
239  Call call_;
240 };
241 
243 template <class W, class R>
245  public WriterInterface<W>,
246  public ReaderInterface<R> {
247  public:
252  virtual void WaitForInitialMetadata() = 0;
253 
257  virtual bool WritesDone() = 0;
258 };
259 
260 template <class W, class R>
262  public:
265  ClientContext* context)
266  : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
268  ops.SendInitialMetadata(context->send_initial_metadata_);
269  call_.PerformOps(&ops);
270  cq_.Pluck(&ops);
271  }
272 
274  GPR_ASSERT(!context_->initial_metadata_received_);
275 
277  ops.RecvInitialMetadata(context_);
278  call_.PerformOps(&ops);
279  cq_.Pluck(&ops); // status ignored
280  }
281 
282  bool Read(R* msg) GRPC_OVERRIDE {
284  if (!context_->initial_metadata_received_) {
285  ops.RecvInitialMetadata(context_);
286  }
287  ops.RecvMessage(msg);
288  call_.PerformOps(&ops);
289  return cq_.Pluck(&ops) && ops.got_message;
290  }
291 
293  bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
295  if (!ops.SendMessage(msg, options).ok()) return false;
296  call_.PerformOps(&ops);
297  return cq_.Pluck(&ops);
298  }
299 
302  ops.ClientSendClose();
303  call_.PerformOps(&ops);
304  return cq_.Pluck(&ops);
305  }
306 
309  if (!context_->initial_metadata_received_) {
310  ops.RecvInitialMetadata(context_);
311  }
312  Status status;
313  ops.ClientRecvStatus(context_, &status);
314  call_.PerformOps(&ops);
315  GPR_ASSERT(cq_.Pluck(&ops));
316  return status;
317  }
318 
319  private:
320  ClientContext* context_;
321  CompletionQueue cq_;
322  Call call_;
323 };
324 
325 template <class R>
326 class ServerReader GRPC_FINAL : public ReaderInterface<R> {
327  public:
328  ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
329 
331  GPR_ASSERT(!ctx_->sent_initial_metadata_);
332 
334  ops.SendInitialMetadata(ctx_->initial_metadata_);
335  ctx_->sent_initial_metadata_ = true;
336  call_->PerformOps(&ops);
337  call_->cq()->Pluck(&ops);
338  }
339 
340  bool Read(R* msg) GRPC_OVERRIDE {
342  ops.RecvMessage(msg);
343  call_->PerformOps(&ops);
344  return call_->cq()->Pluck(&ops) && ops.got_message;
345  }
346 
347  private:
348  Call* const call_;
349  ServerContext* const ctx_;
350 };
351 
352 template <class W>
353 class ServerWriter GRPC_FINAL : public WriterInterface<W> {
354  public:
355  ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
356 
358  GPR_ASSERT(!ctx_->sent_initial_metadata_);
359 
361  ops.SendInitialMetadata(ctx_->initial_metadata_);
362  ctx_->sent_initial_metadata_ = true;
363  call_->PerformOps(&ops);
364  call_->cq()->Pluck(&ops);
365  }
366 
368  bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
370  if (!ops.SendMessage(msg, options).ok()) {
371  return false;
372  }
373  if (!ctx_->sent_initial_metadata_) {
374  ops.SendInitialMetadata(ctx_->initial_metadata_);
375  ctx_->sent_initial_metadata_ = true;
376  }
377  call_->PerformOps(&ops);
378  return call_->cq()->Pluck(&ops);
379  }
380 
381  private:
382  Call* const call_;
383  ServerContext* const ctx_;
384 };
385 
387 template <class W, class R>
388 class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
389  public ReaderInterface<R> {
390  public:
391  ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
392 
394  GPR_ASSERT(!ctx_->sent_initial_metadata_);
395 
397  ops.SendInitialMetadata(ctx_->initial_metadata_);
398  ctx_->sent_initial_metadata_ = true;
399  call_->PerformOps(&ops);
400  call_->cq()->Pluck(&ops);
401  }
402 
403  bool Read(R* msg) GRPC_OVERRIDE {
405  ops.RecvMessage(msg);
406  call_->PerformOps(&ops);
407  return call_->cq()->Pluck(&ops) && ops.got_message;
408  }
409 
411  bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
413  if (!ops.SendMessage(msg, options).ok()) {
414  return false;
415  }
416  if (!ctx_->sent_initial_metadata_) {
417  ops.SendInitialMetadata(ctx_->initial_metadata_);
418  ctx_->sent_initial_metadata_ = true;
419  }
420  call_->PerformOps(&ops);
421  return call_->cq()->Pluck(&ops);
422  }
423 
424  private:
425  Call* const call_;
426  ServerContext* const ctx_;
427 };
428 
429 } // namespace grpc
430 
431 #endif // GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
Definition: channel_interface.h:49
Client-side interface for streaming writes of message of type W.
Definition: sync_stream.h:170
virtual void WaitForInitialMetadata()=0
Blocking wait for initial metadata from server.
CompletionQueue * cq()
Definition: call.h:586
Client-side interface for streaming reads of message of type R.
Definition: sync_stream.h:106
bool Read(R *msg) GRPC_OVERRIDE
Blocking read a message and parse to msg.
Definition: sync_stream.h:403
void SendInitialMetadata()
Definition: sync_stream.h:357
An interface that yields a sequence of messages of type R.
Definition: sync_stream.h:68
void SendInitialMetadata()
Definition: sync_stream.h:330
bool Read(R *msg) GRPC_OVERRIDE
Blocking read a message and parse to msg.
Definition: sync_stream.h:282
Definition: call.h:431
virtual ~ReaderInterface()
Definition: sync_stream.h:70
bool Write(const W &msg, const WriteOptions &options) GRPC_OVERRIDE
Blocking write msg to the stream with options.
Definition: sync_stream.h:368
Status Finish() GRPC_OVERRIDE
Wait until the stream finishes, and return the final status.
Definition: sync_stream.h:153
Definition: client_context.h:152
void WaitForInitialMetadata() GRPC_OVERRIDE
Blocking wait for initial metadata from server.
Definition: sync_stream.h:134
bool WritesDone() GRPC_OVERRIDE
Block until writes are completed.
Definition: sync_stream.h:300
Definition: call.h:182
ServerReader(Call *call, ServerContext *ctx)
Definition: sync_stream.h:328
void SendInitialMetadata()
Definition: sync_stream.h:393
bool WritesDone() GRPC_OVERRIDE
Half close writing from the client.
Definition: sync_stream.h:215
bool Write(const W &msg, const WriteOptions &options) GRPC_OVERRIDE
Blocking write msg to the stream with options.
Definition: sync_stream.h:411
bool Write(const W &msg, const WriteOptions &options) GRPC_OVERRIDE
Blocking write msg to the stream with options.
Definition: sync_stream.h:206
Definition: call.h:338
virtual ~ClientStreamingInterface()
Definition: sync_stream.h:51
ClientReader(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const W &request)
Blocking create a stream and write the first request out.
Definition: sync_stream.h:121
Status Finish() GRPC_OVERRIDE
Read the final response and wait for the final status.
Definition: sync_stream.h:223
Definition: call.h:576
bool Read(R *msg) GRPC_OVERRIDE
Blocking read a message and parse to msg.
Definition: sync_stream.h:143
virtual bool WritesDone()=0
Half close writing from the client.
Codegen interface for grpc::Channel.
Definition: channel_interface.h:64
Primary implementaiton of CallOpSetInterface.
Definition: call.h:524
void ClientSendClose()
Definition: call.h:342
Definition: server_context.h:90
Per-message write options.
Definition: call.h:67
ClientReaderWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context)
Blocking create a stream.
Definition: sync_stream.h:264
virtual void WaitForInitialMetadata()=0
Blocking wait for initial metadata from server.
virtual bool WritesDone()=0
Block until writes are completed.
bool Write(const W &msg)
Blocking write msg to the stream with default options.
Definition: sync_stream.h:101
A thin wrapper around grpc_completion_queue (see / src/core/surface/completion_queue.h).
Definition: completion_queue.h:81
#define GRPC_FINAL
Definition: config.h:71
Definition: rpc_method.h:43
virtual Status Finish()=0
Wait until the stream finishes, and return the final status.
void WaitForInitialMetadata() GRPC_OVERRIDE
Blocking wait for initial metadata from server.
Definition: sync_stream.h:273
ServerReaderWriter(Call *call, ServerContext *ctx)
Definition: sync_stream.h:391
void PerformOps(CallOpSetInterface *ops)
An interface that can be fed a sequence of messages of type W.
Definition: sync_stream.h:84
Client-side interface for bi-directional streaming.
Definition: sync_stream.h:244
Did it work? If it didn't, why?
Definition: status.h:45
bool Write(const W &msg, const WriteOptions &options) GRPC_OVERRIDE
Blocking write msg to the stream with options.
Definition: sync_stream.h:293
virtual bool Read(R *msg)=0
Blocking read a message and parse to msg.
Common interface for all synchronous client side streaming.
Definition: sync_stream.h:49
ServerWriter(Call *call, ServerContext *ctx)
Definition: sync_stream.h:355
Definition: call.h:150
bool Read(R *msg) GRPC_OVERRIDE
Blocking read a message and parse to msg.
Definition: sync_stream.h:340
void WaitForInitialMetadata()
Definition: sync_stream.h:196
virtual bool Write(const W &msg, const WriteOptions &options)=0
Blocking write msg to the stream with options.
Definition: channel_interface.h:53
Definition: channel_interface.h:51
#define GRPC_OVERRIDE
Definition: config.h:77
Definition: call.h:401
ClientWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, R *response)
Blocking create a stream.
Definition: sync_stream.h:185
virtual ~WriterInterface()
Definition: sync_stream.h:86
Definition: call.h:294
Status Finish() GRPC_OVERRIDE
Wait until the stream finishes, and return the final status.
Definition: sync_stream.h:307