|  | @@ -146,6 +146,7 @@ class ClientWriter final : public ClientStreamingInterface,
 | 
	
		
			
				|  |  |    virtual Status Finish() override {
 | 
	
		
			
				|  |  |      CallOpBuffer buf;
 | 
	
		
			
				|  |  |      Status status;
 | 
	
		
			
				|  |  | +    buf.AddRecvMessage(response_);
 | 
	
		
			
				|  |  |      buf.AddClientRecvStatus(&status);
 | 
	
		
			
				|  |  |      call_.PerformOps(&buf, (void *)4);
 | 
	
		
			
				|  |  |      GPR_ASSERT(cq_.Pluck((void *)4));
 | 
	
	
		
			
				|  | @@ -207,125 +208,255 @@ class ClientReaderWriter final : public ClientStreamingInterface,
 | 
	
		
			
				|  |  |  template <class R>
 | 
	
		
			
				|  |  |  class ServerReader final : public ReaderInterface<R> {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  | -  ServerReader(CompletionQueue* cq, Call* call) : cq_(cq), call_(call) {}
 | 
	
		
			
				|  |  | +  explicit ServerReader(Call* call) : call_(call) {}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    virtual bool Read(R* msg) override {
 | 
	
		
			
				|  |  |      CallOpBuffer buf;
 | 
	
		
			
				|  |  |      buf.AddRecvMessage(msg);
 | 
	
		
			
				|  |  |      call_->PerformOps(&buf, (void *)2);
 | 
	
		
			
				|  |  | -    return cq_->Pluck((void *)2);
 | 
	
		
			
				|  |  | +    return call_->cq()->Pluck((void *)2);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  CompletionQueue* cq_;
 | 
	
		
			
				|  |  |    Call* call_;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  template <class W>
 | 
	
		
			
				|  |  | -class ServerWriter : public WriterInterface<W> {
 | 
	
		
			
				|  |  | +class ServerWriter final : public WriterInterface<W> {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  | -  explicit ServerWriter(StreamContextInterface* context) : context_(context) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(context_);
 | 
	
		
			
				|  |  | -    context_->Start(true);
 | 
	
		
			
				|  |  | -    context_->Read(context_->request());
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  explicit ServerWriter(Call* call) : call_(call) {}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual bool Write(const W& msg) {
 | 
	
		
			
				|  |  | -    return context_->Write(const_cast<W*>(&msg), false);
 | 
	
		
			
				|  |  | +  virtual bool Write(const W& msg) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddSendMessage(msg);
 | 
	
		
			
				|  |  | +    call_->PerformOps(&buf, (void *)2);
 | 
	
		
			
				|  |  | +    return call_->cq()->Pluck((void *)2);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  StreamContextInterface* const context_;  // not owned
 | 
	
		
			
				|  |  | +  Call* call_;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // Server-side interface for bi-directional streaming.
 | 
	
		
			
				|  |  |  template <class W, class R>
 | 
	
		
			
				|  |  | -class ServerReaderWriter : public WriterInterface<W>,
 | 
	
		
			
				|  |  | +class ServerReaderWriter final : public WriterInterface<W>,
 | 
	
		
			
				|  |  |                             public ReaderInterface<R> {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  | -  explicit ServerReaderWriter(StreamContextInterface* context)
 | 
	
		
			
				|  |  | -      : context_(context) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(context_);
 | 
	
		
			
				|  |  | -    context_->Start(true);
 | 
	
		
			
				|  |  | +  explicit ServerReaderWriter(Call* call) : call_(call) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual bool Read(R* msg) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddRecvMessage(msg);
 | 
	
		
			
				|  |  | +    call_->PerformOps(&buf, (void *)2);
 | 
	
		
			
				|  |  | +    return call_->cq()->Pluck((void *)2);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual bool Write(const W& msg) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddSendMessage(msg);
 | 
	
		
			
				|  |  | +    call_->PerformOps(&buf, (void *)3);
 | 
	
		
			
				|  |  | +    return call_->cq()->Pluck((void *)3);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual bool Read(R* msg) { return context_->Read(msg); }
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  CompletionQueue* cq_;
 | 
	
		
			
				|  |  | +  Call* call_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// Async interfaces
 | 
	
		
			
				|  |  | +// Common interface for all client side streaming.
 | 
	
		
			
				|  |  | +class ClientAsyncStreamingInterface {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  virtual ~ClientAsyncStreamingInterface() {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Finish(Status* status, void* tag) = 0;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// An interface that yields a sequence of R messages.
 | 
	
		
			
				|  |  | +template <class R>
 | 
	
		
			
				|  |  | +class AsyncReaderInterface {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  virtual ~AsyncReaderInterface() {}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual bool Write(const W& msg) {
 | 
	
		
			
				|  |  | -    return context_->Write(const_cast<W*>(&msg), false);
 | 
	
		
			
				|  |  | +  virtual void Read(R* msg, void* tag) = 0;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// An interface that can be fed a sequence of W messages.
 | 
	
		
			
				|  |  | +template <class W>
 | 
	
		
			
				|  |  | +class AsyncWriterInterface {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  virtual ~Async WriterInterface() {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Write(const W& msg, void* tag) = 0;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class R>
 | 
	
		
			
				|  |  | +class ClientAsyncReader final : public ClientAsyncStreamingInterface,
 | 
	
		
			
				|  |  | +                           public AsyncReaderInterface<R> {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  // Blocking create a stream and write the first request out.
 | 
	
		
			
				|  |  | +  ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method,
 | 
	
		
			
				|  |  | +               ClientContext *context,
 | 
	
		
			
				|  |  | +               const google::protobuf::Message &request, void* tag)
 | 
	
		
			
				|  |  | +      : call_(channel->CreateCall(method, context, &cq_)) {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddSendMessage(request);
 | 
	
		
			
				|  |  | +    buf.AddClientSendClose();
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf, tag);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Read(R *msg, void* tag) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddRecvMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf, tag);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Finish(Status* status, void* tag) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddClientRecvStatus(status);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf, tag);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  StreamContextInterface* const context_;  // not owned
 | 
	
		
			
				|  |  | +  CompletionQueue cq_;
 | 
	
		
			
				|  |  | +  Call call_;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  template <class W>
 | 
	
		
			
				|  |  | -class ServerAsyncResponseWriter {
 | 
	
		
			
				|  |  | +class ClientWriter final : public ClientAsyncStreamingInterface,
 | 
	
		
			
				|  |  | +                           public WriterInterface<W> {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  | -  explicit ServerAsyncResponseWriter(StreamContextInterface* context) : context_(context) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(context_);
 | 
	
		
			
				|  |  | -    context_->Start(true);
 | 
	
		
			
				|  |  | -    context_->Read(context_->request());
 | 
	
		
			
				|  |  | +  // Blocking create a stream.
 | 
	
		
			
				|  |  | +  ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method,
 | 
	
		
			
				|  |  | +               ClientContext *context,
 | 
	
		
			
				|  |  | +               google::protobuf::Message *response)
 | 
	
		
			
				|  |  | +      : response_(response),
 | 
	
		
			
				|  |  | +        call_(channel->CreateCall(method, context, &cq_)) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Write(const W& msg, void* tag) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddSendMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf, tag);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual bool Write(const W& msg) {
 | 
	
		
			
				|  |  | -    return context_->Write(const_cast<W*>(&msg), false);
 | 
	
		
			
				|  |  | +  virtual void WritesDone(void* tag) {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddClientSendClose();
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf, tag);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Finish(Status* status, void* tag) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddRecvMessage(response_);
 | 
	
		
			
				|  |  | +    buf.AddClientRecvStatus(status);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf, tag);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  StreamContextInterface* const context_;  // not owned
 | 
	
		
			
				|  |  | +  google::protobuf::Message *const response_;
 | 
	
		
			
				|  |  | +  CompletionQueue cq_;
 | 
	
		
			
				|  |  | +  Call call_;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -template <class R>
 | 
	
		
			
				|  |  | -class ServerAsyncReader : public ReaderInterface<R> {
 | 
	
		
			
				|  |  | +// Client-side interface for bi-directional streaming.
 | 
	
		
			
				|  |  | +template <class W, class R>
 | 
	
		
			
				|  |  | +class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
 | 
	
		
			
				|  |  | +                                 public AsyncWriterInterface<W>,
 | 
	
		
			
				|  |  | +                                 public AsyncReaderInterface<R> {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  | -  explicit ServerAsyncReader(StreamContextInterface* context) : context_(context) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(context_);
 | 
	
		
			
				|  |  | -    context_->Start(true);
 | 
	
		
			
				|  |  | +  ClientAsyncReaderWriter(ChannelInterface *channel,
 | 
	
		
			
				|  |  | +                     const RpcMethod &method, ClientContext *context)
 | 
	
		
			
				|  |  | +      : call_(channel->CreateCall(method, context, &cq_)) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Read(R *msg, void* tag) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddRecvMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf, tag);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Write(const W& msg, void* tag) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddSendMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf, tag);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual bool WritesDone(void* tag) {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddClientSendClose();
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf, tag);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual bool Read(R* msg) { return context_->Read(msg); }
 | 
	
		
			
				|  |  | +  virtual void Finish(Status* status, void* tag) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    Status status;
 | 
	
		
			
				|  |  | +    buf.AddClientRecvStatus(status);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf, tag);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  StreamContextInterface* const context_;  // not owned
 | 
	
		
			
				|  |  | +  CompletionQueue cq_;
 | 
	
		
			
				|  |  | +  Call call_;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +// TODO(yangg) Move out of stream.h
 | 
	
		
			
				|  |  |  template <class W>
 | 
	
		
			
				|  |  | -class ServerAsyncWriter : public WriterInterface<W> {
 | 
	
		
			
				|  |  | +class ServerAsyncResponseWriter final {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  | -  explicit ServerAsyncWriter(StreamContextInterface* context) : context_(context) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(context_);
 | 
	
		
			
				|  |  | -    context_->Start(true);
 | 
	
		
			
				|  |  | -    context_->Read(context_->request());
 | 
	
		
			
				|  |  | +  explicit ServerAsyncResponseWriter(Call* call) : call_(call) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Write(const W& msg, void* tag) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddSendMessage(msg);
 | 
	
		
			
				|  |  | +    call_->PerformOps(&buf, tag);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual bool Write(const W& msg) {
 | 
	
		
			
				|  |  | -    return context_->Write(const_cast<W*>(&msg), false);
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  Call* call_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class R>
 | 
	
		
			
				|  |  | +class ServerAsyncReader : public AsyncReaderInterface<R> {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  explicit ServerAsyncReader(Call* call) : call_(call) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Read(R* msg, void* tag) {
 | 
	
		
			
				|  |  | +    // TODO
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  StreamContextInterface* const context_;  // not owned
 | 
	
		
			
				|  |  | +  Call* call_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class W>
 | 
	
		
			
				|  |  | +class ServerAsyncWriter : public AsyncWriterInterface<W> {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  explicit ServerAsyncWriter(Call* call) : call_(call) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Write(const W& msg, void* tag) {
 | 
	
		
			
				|  |  | +    // TODO
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  Call* call_;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // Server-side interface for bi-directional streaming.
 | 
	
		
			
				|  |  |  template <class W, class R>
 | 
	
		
			
				|  |  | -class ServerAsyncReaderWriter : public WriterInterface<W>,
 | 
	
		
			
				|  |  | -                           public ReaderInterface<R> {
 | 
	
		
			
				|  |  | +class ServerAsyncReaderWriter : public AsyncWriterInterface<W>,
 | 
	
		
			
				|  |  | +                           public AsyncReaderInterface<R> {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  | -  explicit ServerAsyncReaderWriter(StreamContextInterface* context)
 | 
	
		
			
				|  |  | -      : context_(context) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(context_);
 | 
	
		
			
				|  |  | -    context_->Start(true);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  explicit ServerAsyncReaderWriter(Call* call) : call_(call) {}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual bool Read(R* msg) { return context_->Read(msg); }
 | 
	
		
			
				|  |  | +  virtual void Read(R* msg, void* tag) {
 | 
	
		
			
				|  |  | +    // TODO
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual bool Write(const W& msg) {
 | 
	
		
			
				|  |  | -    return context_->Write(const_cast<W*>(&msg), false);
 | 
	
		
			
				|  |  | +  virtual void Write(const W& msg, void* tag) {
 | 
	
		
			
				|  |  | +    // TODO
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  StreamContextInterface* const context_;  // not owned
 | 
	
		
			
				|  |  | +  Call* call_;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  }  // namespace grpc
 |