34 #ifndef GRPCXX_STREAM_H 
   35 #define GRPCXX_STREAM_H 
   44 #include <grpc/support/log.h> 
   71   virtual bool Read(R* msg) = 0;
 
   84   inline bool Write(
const W& msg) {
 
  103       : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
 
  106     ops.SendInitialMetadata(context->send_initial_metadata_);
 
  108     GPR_ASSERT(ops.SendMessage(request).ok());
 
  119     GPR_ASSERT(!context_->initial_metadata_received_);
 
  122     ops.RecvInitialMetadata(context_);
 
  129     if (!context_->initial_metadata_received_) {
 
  130       ops.RecvInitialMetadata(context_);
 
  132     ops.RecvMessage(msg);
 
  134     return cq_.Pluck(&ops) && ops.got_message;
 
  140     ops.ClientRecvStatus(context_, &status);
 
  142     GPR_ASSERT(cq_.Pluck(&ops));
 
  166       : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
 
  167     finish_ops_.RecvMessage(response);
 
  170     ops.SendInitialMetadata(context->send_initial_metadata_);
 
  178     if (!ops.SendMessage(msg, options).ok()) {
 
  182     return cq_.Pluck(&ops);
 
  187     ops.ClientSendClose();
 
  189     return cq_.Pluck(&ops);
 
  195     finish_ops_.ClientRecvStatus(context_, &status);
 
  197     GPR_ASSERT(cq_.Pluck(&finish_ops_));
 
  209 template <
class W, 
class R>
 
  218 template <
class W, 
class R>
 
  224       : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
 
  226     ops.SendInitialMetadata(context->send_initial_metadata_);
 
  236     GPR_ASSERT(!context_->initial_metadata_received_);
 
  239     ops.RecvInitialMetadata(context_);
 
  246     if (!context_->initial_metadata_received_) {
 
  247       ops.RecvInitialMetadata(context_);
 
  249     ops.RecvMessage(msg);
 
  251     return cq_.Pluck(&ops) && ops.got_message;
 
  257     if (!ops.SendMessage(msg, options).ok()) 
return false;
 
  259     return cq_.Pluck(&ops);
 
  264     ops.ClientSendClose();
 
  266     return cq_.Pluck(&ops);
 
  272     ops.ClientRecvStatus(context_, &status);
 
  274     GPR_ASSERT(cq_.Pluck(&ops));
 
  285 class ServerReader 
GRPC_FINAL : 
public ReaderInterface<R> {
 
  290     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
  293     ops.SendInitialMetadata(ctx_->initial_metadata_);
 
  294     ctx_->sent_initial_metadata_ = 
true;
 
  296     call_->
cq()->Pluck(&ops);
 
  301     ops.RecvMessage(msg);
 
  303     return call_->
cq()->Pluck(&ops) && ops.got_message;
 
  312 class ServerWriter 
GRPC_FINAL : 
public WriterInterface<W> {
 
  317     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
  320     ops.SendInitialMetadata(ctx_->initial_metadata_);
 
  321     ctx_->sent_initial_metadata_ = 
true;
 
  323     call_->
cq()->Pluck(&ops);
 
  329     if (!ops.SendMessage(msg, options).ok()) {
 
  332     if (!ctx_->sent_initial_metadata_) {
 
  333       ops.SendInitialMetadata(ctx_->initial_metadata_);
 
  334       ctx_->sent_initial_metadata_ = 
true;
 
  337     return call_->
cq()->Pluck(&ops);
 
  346 template <
class W, 
class R>
 
  347 class ServerReaderWriter 
GRPC_FINAL : 
public WriterInterface<W>,
 
  348                                       public ReaderInterface<R> {
 
  353     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
  356     ops.SendInitialMetadata(ctx_->initial_metadata_);
 
  357     ctx_->sent_initial_metadata_ = 
true;
 
  359     call_->
cq()->Pluck(&ops);
 
  364     ops.RecvMessage(msg);
 
  366     return call_->
cq()->Pluck(&ops) && ops.got_message;
 
  372     if (!ops.SendMessage(msg, options).ok()) {
 
  375     if (!ctx_->sent_initial_metadata_) {
 
  376       ops.SendInitialMetadata(ctx_->initial_metadata_);
 
  377       ctx_->sent_initial_metadata_ = 
true;
 
  380     return call_->
cq()->Pluck(&ops);
 
  405   virtual void Read(R* msg, 
void* tag) = 0;
 
  414   virtual void Write(
const W& msg, 
void* tag) = 0;
 
  428                     const W& request, 
void* tag)
 
  429       : context_(context), call_(channel->CreateCall(method, context, cq)) {
 
  430     init_ops_.set_output_tag(tag);
 
  431     init_ops_.SendInitialMetadata(context->send_initial_metadata_);
 
  433     GPR_ASSERT(init_ops_.SendMessage(request).ok());
 
  434     init_ops_.ClientSendClose();
 
  439     GPR_ASSERT(!context_->initial_metadata_received_);
 
  441     meta_ops_.set_output_tag(tag);
 
  442     meta_ops_.RecvInitialMetadata(context_);
 
  447     read_ops_.set_output_tag(tag);
 
  448     if (!context_->initial_metadata_received_) {
 
  449       read_ops_.RecvInitialMetadata(context_);
 
  451     read_ops_.RecvMessage(msg);
 
  456     finish_ops_.set_output_tag(tag);
 
  457     if (!context_->initial_metadata_received_) {
 
  458       finish_ops_.RecvInitialMetadata(context_);
 
  460     finish_ops_.ClientRecvStatus(context_, status);
 
  487                     R* response, 
void* tag)
 
  488       : context_(context), call_(channel->CreateCall(method, context, cq)) {
 
  489     finish_ops_.RecvMessage(response);
 
  491     init_ops_.set_output_tag(tag);
 
  492     init_ops_.SendInitialMetadata(context->send_initial_metadata_);
 
  497     GPR_ASSERT(!context_->initial_metadata_received_);
 
  499     meta_ops_.set_output_tag(tag);
 
  500     meta_ops_.RecvInitialMetadata(context_);
 
  505     write_ops_.set_output_tag(tag);
 
  507     GPR_ASSERT(write_ops_.SendMessage(msg).ok());
 
  512     writes_done_ops_.set_output_tag(tag);
 
  513     writes_done_ops_.ClientSendClose();
 
  518     finish_ops_.set_output_tag(tag);
 
  519     if (!context_->initial_metadata_received_) {
 
  520       finish_ops_.RecvInitialMetadata(context_);
 
  522     finish_ops_.ClientRecvStatus(context_, status);
 
  538 template <
class W, 
class R>
 
  546 template <
class W, 
class R>
 
  553       : context_(context), call_(channel->CreateCall(method, context, cq)) {
 
  554     init_ops_.set_output_tag(tag);
 
  555     init_ops_.SendInitialMetadata(context->send_initial_metadata_);
 
  560     GPR_ASSERT(!context_->initial_metadata_received_);
 
  562     meta_ops_.set_output_tag(tag);
 
  563     meta_ops_.RecvInitialMetadata(context_);
 
  568     read_ops_.set_output_tag(tag);
 
  569     if (!context_->initial_metadata_received_) {
 
  570       read_ops_.RecvInitialMetadata(context_);
 
  572     read_ops_.RecvMessage(msg);
 
  577     write_ops_.set_output_tag(tag);
 
  579     GPR_ASSERT(write_ops_.SendMessage(msg).ok());
 
  584     writes_done_ops_.set_output_tag(tag);
 
  585     writes_done_ops_.ClientSendClose();
 
  590     finish_ops_.set_output_tag(tag);
 
  591     if (!context_->initial_metadata_received_) {
 
  592       finish_ops_.RecvInitialMetadata(context_);
 
  594     finish_ops_.ClientRecvStatus(context_, status);
 
  609 template <
class W, 
class R>
 
  610 class ServerAsyncReader 
GRPC_FINAL : 
public ServerAsyncStreamingInterface,
 
  611                                      public AsyncReaderInterface<R> {
 
  614       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
 
  617     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
  619     meta_ops_.set_output_tag(tag);
 
  620     meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
 
  621     ctx_->sent_initial_metadata_ = 
true;
 
  626     read_ops_.set_output_tag(tag);
 
  627     read_ops_.RecvMessage(msg);
 
  632     finish_ops_.set_output_tag(tag);
 
  633     if (!ctx_->sent_initial_metadata_) {
 
  634       finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
 
  635       ctx_->sent_initial_metadata_ = 
true;
 
  639       finish_ops_.ServerSendStatus(
 
  640           ctx_->trailing_metadata_,
 
  641           finish_ops_.SendMessage(msg));
 
  643       finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
 
  649     GPR_ASSERT(!status.
ok());
 
  650     finish_ops_.set_output_tag(tag);
 
  651     if (!ctx_->sent_initial_metadata_) {
 
  652       finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
 
  653       ctx_->sent_initial_metadata_ = 
true;
 
  655     finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
 
  664   CallOpSet<CallOpSendInitialMetadata> meta_ops_;
 
  665   CallOpSet<CallOpRecvMessage<R>> read_ops_;
 
  666   CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
 
  667             CallOpServerSendStatus> finish_ops_;
 
  671 class ServerAsyncWriter 
GRPC_FINAL : 
public ServerAsyncStreamingInterface,
 
  672                                      public AsyncWriterInterface<W> {
 
  675       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
 
  678     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
  680     meta_ops_.set_output_tag(tag);
 
  681     meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
 
  682     ctx_->sent_initial_metadata_ = 
true;
 
  687     write_ops_.set_output_tag(tag);
 
  688     if (!ctx_->sent_initial_metadata_) {
 
  689       write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
 
  690       ctx_->sent_initial_metadata_ = 
true;
 
  693     GPR_ASSERT(write_ops_.SendMessage(msg).ok());
 
  698     finish_ops_.set_output_tag(tag);
 
  699     if (!ctx_->sent_initial_metadata_) {
 
  700       finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
 
  701       ctx_->sent_initial_metadata_ = 
true;
 
  703     finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
 
  712   CallOpSet<CallOpSendInitialMetadata> meta_ops_;
 
  713   CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
 
  714   CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
 
  718 template <
class W, 
class R>
 
  719 class ServerAsyncReaderWriter 
GRPC_FINAL : 
public ServerAsyncStreamingInterface,
 
  720                                            public AsyncWriterInterface<W>,
 
  721                                            public AsyncReaderInterface<R> {
 
  724       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
 
  727     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
  729     meta_ops_.set_output_tag(tag);
 
  730     meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
 
  731     ctx_->sent_initial_metadata_ = 
true;
 
  736     read_ops_.set_output_tag(tag);
 
  737     read_ops_.RecvMessage(msg);
 
  742     write_ops_.set_output_tag(tag);
 
  743     if (!ctx_->sent_initial_metadata_) {
 
  744       write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
 
  745       ctx_->sent_initial_metadata_ = 
true;
 
  748     GPR_ASSERT(write_ops_.SendMessage(msg).ok());
 
  753     finish_ops_.set_output_tag(tag);
 
  754     if (!ctx_->sent_initial_metadata_) {
 
  755       finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
 
  756       ctx_->sent_initial_metadata_ = 
true;
 
  758     finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
 
  767   CallOpSet<CallOpSendInitialMetadata> meta_ops_;
 
  768   CallOpSet<CallOpRecvMessage<R>> read_ops_;
 
  769   CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
 
  770   CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
 
  775 #endif  // GRPCXX_STREAM_H 
Definition: client_context.h:70
Definition: client_context.h:60
bool Read(R *msg) GRPC_OVERRIDE
Definition: stream.h:244
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Definition: stream.h:504
void Read(R *msg, void *tag) GRPC_OVERRIDE
Definition: stream.h:735
void WritesDone(void *tag) GRPC_OVERRIDE
Definition: stream.h:511
virtual void WaitForInitialMetadata()=0
CompletionQueue * cq()
Definition: call.h:575
void SendInitialMetadata()
Definition: stream.h:352
void SendInitialMetadata()
Definition: stream.h:316
ServerReaderWriter(Call *call, ServerContext *ctx)
Definition: stream.h:350
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: stream.h:677
void SendInitialMetadata()
Definition: stream.h:289
virtual void WritesDone(void *tag)=0
virtual void Write(const W &msg, void *tag)=0
virtual ~ReaderInterface()
Definition: stream.h:65
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Definition: stream.h:686
#define GRPC_FINAL
Definition: config.h:71
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Definition: stream.h:576
void Read(R *msg, void *tag) GRPC_OVERRIDE
Definition: stream.h:625
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: stream.h:726
bool Write(const W &msg, const WriteOptions &options) GRPC_OVERRIDE
Definition: stream.h:327
Status Finish() GRPC_OVERRIDE
Definition: stream.h:137
virtual void WritesDone(void *tag)=0
Definition: client_context.h:74
Status Finish() GRPC_OVERRIDE
Definition: stream.h:269
void FinishWithError(const Status &status, void *tag)
Definition: stream.h:648
ClientAsyncWriter(ChannelInterface *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, R *response, void *tag)
Definition: stream.h:485
void Write(const W &msg, void *tag) GRPC_OVERRIDE
Definition: stream.h:741
ServerReader(Call *call, ServerContext *ctx)
Definition: stream.h:287
bool WritesDone() GRPC_OVERRIDE
Definition: stream.h:185
virtual ~AsyncReaderInterface()
Definition: stream.h:403
bool Write(const W &msg, const WriteOptions &options) GRPC_OVERRIDE
Definition: stream.h:176
virtual ~ClientStreamingInterface()
Definition: stream.h:51
ClientReader(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const W &request)
Definition: stream.h:101
bool Read(R *msg) GRPC_OVERRIDE
Definition: stream.h:362
void WritesDone(void *tag) GRPC_OVERRIDE
Definition: stream.h:583
Status Finish() GRPC_OVERRIDE
Definition: stream.h:193
bool Read(R *msg) GRPC_OVERRIDE
Definition: stream.h:127
virtual bool WritesDone()=0
Definition: channel_interface.h:52
ServerAsyncWriter(ServerContext *ctx)
Definition: stream.h:674
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Definition: stream.h:589
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: stream.h:496
Definition: client_context.h:68
Primary implementaiton of CallOpSetInterface. 
Definition: call.h:506
void ClientSendClose()
Definition: call.h:345
Definition: server_context.h:86
void Finish(const W &msg, const Status &status, void *tag)
Definition: stream.h:631
Per-message write options. 
Definition: call.h:64
virtual void WaitForInitialMetadata()=0
virtual bool WritesDone()=0
bool Write(const W &msg)
Definition: stream.h:84
Definition: completion_queue.h:87
virtual ~ClientAsyncStreamingInterface()
Definition: stream.h:392
ClientAsyncReaderWriter(ChannelInterface *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, void *tag)
Definition: stream.h:550
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Definition: stream.h:517
void Finish(const Status &status, void *tag)
Definition: stream.h:697
virtual void ReadInitialMetadata(void *tag)=0
Definition: rpc_method.h:39
virtual Status Finish()=0
bool Write(const W &msg, const WriteOptions &options) GRPC_OVERRIDE
Definition: stream.h:370
void Finish(Status *status, void *tag) GRPC_OVERRIDE
Definition: stream.h:455
void PerformOps(CallOpSetInterface *ops)
bool ok() const 
Definition: status.h:55
ClientReaderWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context)
Definition: stream.h:222
virtual bool Read(R *msg)=0
virtual void Finish(Status *status, void *tag)=0
void SendInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: stream.h:616
ServerWriter(Call *call, ServerContext *ctx)
Definition: stream.h:314
void WaitForInitialMetadata()
Definition: stream.h:235
bool Read(R *msg) GRPC_OVERRIDE
Definition: stream.h:299
void Read(R *msg, void *tag) GRPC_OVERRIDE
Definition: stream.h:446
virtual bool Write(const W &msg, const WriteOptions &options)=0
void Read(R *msg, void *tag) GRPC_OVERRIDE
Definition: stream.h:567
Definition: client_context.h:64
ServerAsyncReaderWriter(ServerContext *ctx)
Definition: stream.h:723
Definition: client_context.h:66
virtual ~AsyncWriterInterface()
Definition: stream.h:412
void Finish(const Status &status, void *tag)
Definition: stream.h:752
void WaitForInitialMetadata()
Definition: stream.h:118
#define GRPC_OVERRIDE
Definition: config.h:77
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: stream.h:559
Definition: client_context.h:62
ClientAsyncReader(ChannelInterface *channel, CompletionQueue *cq, const RpcMethod &method, ClientContext *context, const W &request, void *tag)
Definition: stream.h:426
ClientWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, R *response)
Definition: stream.h:164
bool Write(const W &msg, const WriteOptions &options) GRPC_OVERRIDE
Definition: stream.h:255
virtual void Read(R *msg, void *tag)=0
virtual ~WriterInterface()
Definition: stream.h:78
ServerAsyncReader(ServerContext *ctx)
Definition: stream.h:613
bool WritesDone() GRPC_OVERRIDE
Definition: stream.h:262
void ReadInitialMetadata(void *tag) GRPC_OVERRIDE
Definition: stream.h:438