|  | @@ -1,6 +1,6 @@
 | 
	
		
			
				|  |  |  /*
 | 
	
		
			
				|  |  |   *
 | 
	
		
			
				|  |  | - * Copyright 2014, Google Inc.
 | 
	
		
			
				|  |  | + * Copyright 2015, Google Inc.
 | 
	
		
			
				|  |  |   * All rights reserved.
 | 
	
		
			
				|  |  |   *
 | 
	
		
			
				|  |  |   * Redistribution and use in source and binary forms, with or without
 | 
	
	
		
			
				|  | @@ -34,7 +34,12 @@
 | 
	
		
			
				|  |  |  #ifndef __GRPCPP_STREAM_H__
 | 
	
		
			
				|  |  |  #define __GRPCPP_STREAM_H__
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -#include <grpc++/stream_context_interface.h>
 | 
	
		
			
				|  |  | +#include <grpc++/channel_interface.h>
 | 
	
		
			
				|  |  | +#include <grpc++/client_context.h>
 | 
	
		
			
				|  |  | +#include <grpc++/completion_queue.h>
 | 
	
		
			
				|  |  | +#include <grpc++/server_context.h>
 | 
	
		
			
				|  |  | +#include <grpc++/impl/call.h>
 | 
	
		
			
				|  |  | +#include <grpc++/impl/service_type.h>
 | 
	
		
			
				|  |  |  #include <grpc++/status.h>
 | 
	
		
			
				|  |  |  #include <grpc/support/log.h>
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -45,16 +50,12 @@ class ClientStreamingInterface {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  |    virtual ~ClientStreamingInterface() {}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  // Try to cancel the stream. Wait() still needs to be called to get the final
 | 
	
		
			
				|  |  | -  // status. Cancelling after the stream has finished has no effects.
 | 
	
		
			
				|  |  | -  virtual void Cancel() = 0;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    // Wait until the stream finishes, and return the final status. When the
 | 
	
		
			
				|  |  |    // client side declares it has no more message to send, either implicitly or
 | 
	
		
			
				|  |  |    // by calling WritesDone, it needs to make sure there is no more message to
 | 
	
		
			
				|  |  |    // be received from the server, either implicitly or by getting a false from
 | 
	
		
			
				|  |  |    // a Read(). Otherwise, this implicitly cancels the stream.
 | 
	
		
			
				|  |  | -  virtual const Status& Wait() = 0;
 | 
	
		
			
				|  |  | +  virtual Status Finish() = 0;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // An interface that yields a sequence of R messages.
 | 
	
	
		
			
				|  | @@ -82,147 +83,629 @@ class WriterInterface {
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  template <class R>
 | 
	
		
			
				|  |  | -class ClientReader : public ClientStreamingInterface,
 | 
	
		
			
				|  |  | -                     public ReaderInterface<R> {
 | 
	
		
			
				|  |  | +class ClientReader final : public ClientStreamingInterface,
 | 
	
		
			
				|  |  | +                           public ReaderInterface<R> {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  |    // Blocking create a stream and write the first request out.
 | 
	
		
			
				|  |  | -  explicit ClientReader(StreamContextInterface* context) : context_(context) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(context_);
 | 
	
		
			
				|  |  | -    context_->Start(true);
 | 
	
		
			
				|  |  | -    context_->Write(context_->request(), true);
 | 
	
		
			
				|  |  | +  ClientReader(ChannelInterface* channel, const RpcMethod& method,
 | 
	
		
			
				|  |  | +               ClientContext* context, const google::protobuf::Message& request)
 | 
	
		
			
				|  |  | +      : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddSendInitialMetadata(&context->send_initial_metadata_);
 | 
	
		
			
				|  |  | +    buf.AddSendMessage(request);
 | 
	
		
			
				|  |  | +    buf.AddClientSendClose();
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf);
 | 
	
		
			
				|  |  | +    cq_.Pluck(&buf);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  ~ClientReader() { delete context_; }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  virtual bool Read(R* msg) { return context_->Read(msg); }
 | 
	
		
			
				|  |  | +  // Blocking wait for initial metadata from server. The received metadata
 | 
	
		
			
				|  |  | +  // can only be accessed after this call returns. Should only be called before
 | 
	
		
			
				|  |  | +  // the first read. Calling this method is optional, and if it is not called
 | 
	
		
			
				|  |  | +  // the metadata will be available in ClientContext after the first read.
 | 
	
		
			
				|  |  | +  void WaitForInitialMetadata() {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!context_->initial_metadata_received_);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddRecvInitialMetadata(context_);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf);
 | 
	
		
			
				|  |  | +    GPR_ASSERT(cq_.Pluck(&buf));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual void Cancel() { context_->Cancel(); }
 | 
	
		
			
				|  |  | +  virtual bool Read(R* msg) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    if (!context_->initial_metadata_received_) {
 | 
	
		
			
				|  |  | +      buf.AddRecvInitialMetadata(context_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    buf.AddRecvMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf);
 | 
	
		
			
				|  |  | +    return cq_.Pluck(&buf) && buf.got_message;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual const Status& Wait() { return context_->Wait(); }
 | 
	
		
			
				|  |  | +  virtual Status Finish() override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    Status status;
 | 
	
		
			
				|  |  | +    buf.AddClientRecvStatus(context_, &status);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf);
 | 
	
		
			
				|  |  | +    GPR_ASSERT(cq_.Pluck(&buf));
 | 
	
		
			
				|  |  | +    return status;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  StreamContextInterface* const context_;
 | 
	
		
			
				|  |  | +  ClientContext* context_;
 | 
	
		
			
				|  |  | +  CompletionQueue cq_;
 | 
	
		
			
				|  |  | +  Call call_;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  template <class W>
 | 
	
		
			
				|  |  | -class ClientWriter : public ClientStreamingInterface,
 | 
	
		
			
				|  |  | -                     public WriterInterface<W> {
 | 
	
		
			
				|  |  | +class ClientWriter final : public ClientStreamingInterface,
 | 
	
		
			
				|  |  | +                           public WriterInterface<W> {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  |    // Blocking create a stream.
 | 
	
		
			
				|  |  | -  explicit ClientWriter(StreamContextInterface* context) : context_(context) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(context_);
 | 
	
		
			
				|  |  | -    context_->Start(false);
 | 
	
		
			
				|  |  | +  ClientWriter(ChannelInterface* channel, const RpcMethod& method,
 | 
	
		
			
				|  |  | +               ClientContext* context, google::protobuf::Message* response)
 | 
	
		
			
				|  |  | +      : context_(context),
 | 
	
		
			
				|  |  | +        response_(response),
 | 
	
		
			
				|  |  | +        call_(channel->CreateCall(method, context, &cq_)) {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddSendInitialMetadata(&context->send_initial_metadata_);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf);
 | 
	
		
			
				|  |  | +    cq_.Pluck(&buf);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  ~ClientWriter() { delete context_; }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  virtual bool Write(const W& msg) {
 | 
	
		
			
				|  |  | -    return context_->Write(const_cast<W*>(&msg), false);
 | 
	
		
			
				|  |  | +  virtual bool Write(const W& msg) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddSendMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf);
 | 
	
		
			
				|  |  | +    return cq_.Pluck(&buf);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual void WritesDone() { context_->Write(nullptr, true); }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  virtual void Cancel() { context_->Cancel(); }
 | 
	
		
			
				|  |  | +  virtual bool WritesDone() {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddClientSendClose();
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf);
 | 
	
		
			
				|  |  | +    return cq_.Pluck(&buf);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    // Read the final response and wait for the final status.
 | 
	
		
			
				|  |  | -  virtual const Status& Wait() {
 | 
	
		
			
				|  |  | -    bool success = context_->Read(context_->response());
 | 
	
		
			
				|  |  | -    if (!success) {
 | 
	
		
			
				|  |  | -      Cancel();
 | 
	
		
			
				|  |  | -    } else {
 | 
	
		
			
				|  |  | -      success = context_->Read(nullptr);
 | 
	
		
			
				|  |  | -      if (success) {
 | 
	
		
			
				|  |  | -        Cancel();
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    return context_->Wait();
 | 
	
		
			
				|  |  | +  virtual Status Finish() override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    Status status;
 | 
	
		
			
				|  |  | +    buf.AddRecvMessage(response_);
 | 
	
		
			
				|  |  | +    buf.AddClientRecvStatus(context_, &status);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf);
 | 
	
		
			
				|  |  | +    GPR_ASSERT(cq_.Pluck(&buf) && buf.got_message);
 | 
	
		
			
				|  |  | +    return status;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  StreamContextInterface* const context_;
 | 
	
		
			
				|  |  | +  ClientContext* context_;
 | 
	
		
			
				|  |  | +  google::protobuf::Message* const response_;
 | 
	
		
			
				|  |  | +  CompletionQueue cq_;
 | 
	
		
			
				|  |  | +  Call call_;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // Client-side interface for bi-directional streaming.
 | 
	
		
			
				|  |  |  template <class W, class R>
 | 
	
		
			
				|  |  | -class ClientReaderWriter : public ClientStreamingInterface,
 | 
	
		
			
				|  |  | -                           public WriterInterface<W>,
 | 
	
		
			
				|  |  | -                           public ReaderInterface<R> {
 | 
	
		
			
				|  |  | +class ClientReaderWriter final : public ClientStreamingInterface,
 | 
	
		
			
				|  |  | +                                 public WriterInterface<W>,
 | 
	
		
			
				|  |  | +                                 public ReaderInterface<R> {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  |    // Blocking create a stream.
 | 
	
		
			
				|  |  | -  explicit ClientReaderWriter(StreamContextInterface* context)
 | 
	
		
			
				|  |  | -      : context_(context) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(context_);
 | 
	
		
			
				|  |  | -    context_->Start(false);
 | 
	
		
			
				|  |  | +  ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
 | 
	
		
			
				|  |  | +                     ClientContext* context)
 | 
	
		
			
				|  |  | +      : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddSendInitialMetadata(&context->send_initial_metadata_);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf);
 | 
	
		
			
				|  |  | +    GPR_ASSERT(cq_.Pluck(&buf));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  // Blocking wait for initial metadata from server. The received metadata
 | 
	
		
			
				|  |  | +  // can only be accessed after this call returns. Should only be called before
 | 
	
		
			
				|  |  | +  // the first read. Calling this method is optional, and if it is not called
 | 
	
		
			
				|  |  | +  // the metadata will be available in ClientContext after the first read.
 | 
	
		
			
				|  |  | +  void WaitForInitialMetadata() {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!context_->initial_metadata_received_);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddRecvInitialMetadata(context_);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf);
 | 
	
		
			
				|  |  | +    GPR_ASSERT(cq_.Pluck(&buf));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual bool Read(R* msg) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    if (!context_->initial_metadata_received_) {
 | 
	
		
			
				|  |  | +      buf.AddRecvInitialMetadata(context_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    buf.AddRecvMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf);
 | 
	
		
			
				|  |  | +    return cq_.Pluck(&buf) && buf.got_message;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual bool Write(const W& msg) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddSendMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf);
 | 
	
		
			
				|  |  | +    return cq_.Pluck(&buf);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  ~ClientReaderWriter() { delete context_; }
 | 
	
		
			
				|  |  | +  virtual bool WritesDone() {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddClientSendClose();
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf);
 | 
	
		
			
				|  |  | +    return cq_.Pluck(&buf);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual Status Finish() override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    Status status;
 | 
	
		
			
				|  |  | +    buf.AddClientRecvStatus(context_, &status);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&buf);
 | 
	
		
			
				|  |  | +    GPR_ASSERT(cq_.Pluck(&buf));
 | 
	
		
			
				|  |  | +    return status;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  ClientContext* context_;
 | 
	
		
			
				|  |  | +  CompletionQueue cq_;
 | 
	
		
			
				|  |  | +  Call call_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class R>
 | 
	
		
			
				|  |  | +class ServerReader final : public ReaderInterface<R> {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void SendInitialMetadata() {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!ctx_->sent_initial_metadata_);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
 | 
	
		
			
				|  |  | +    ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +    call_->PerformOps(&buf);
 | 
	
		
			
				|  |  | +    call_->cq()->Pluck(&buf);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual bool Read(R* msg) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddRecvMessage(msg);
 | 
	
		
			
				|  |  | +    call_->PerformOps(&buf);
 | 
	
		
			
				|  |  | +    return call_->cq()->Pluck(&buf) && buf.got_message;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  Call* const call_;
 | 
	
		
			
				|  |  | +  ServerContext* const ctx_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class W>
 | 
	
		
			
				|  |  | +class ServerWriter final : public WriterInterface<W> {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void SendInitialMetadata() {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!ctx_->sent_initial_metadata_);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual bool Read(R* msg) { return context_->Read(msg); }
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
 | 
	
		
			
				|  |  | +    ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +    call_->PerformOps(&buf);
 | 
	
		
			
				|  |  | +    call_->cq()->Pluck(&buf);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual bool Write(const W& msg) {
 | 
	
		
			
				|  |  | -    return context_->Write(const_cast<W*>(&msg), false);
 | 
	
		
			
				|  |  | +  virtual bool Write(const W& msg) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    if (!ctx_->sent_initial_metadata_) {
 | 
	
		
			
				|  |  | +      buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
 | 
	
		
			
				|  |  | +      ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    buf.AddSendMessage(msg);
 | 
	
		
			
				|  |  | +    call_->PerformOps(&buf);
 | 
	
		
			
				|  |  | +    return call_->cq()->Pluck(&buf);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual void WritesDone() { context_->Write(nullptr, true); }
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  Call* const call_;
 | 
	
		
			
				|  |  | +  ServerContext* const ctx_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual void Cancel() { context_->Cancel(); }
 | 
	
		
			
				|  |  | +// Server-side interface for bi-directional streaming.
 | 
	
		
			
				|  |  | +template <class W, class R>
 | 
	
		
			
				|  |  | +class ServerReaderWriter final : public WriterInterface<W>,
 | 
	
		
			
				|  |  | +                                 public ReaderInterface<R> {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual const Status& Wait() { return context_->Wait(); }
 | 
	
		
			
				|  |  | +  void SendInitialMetadata() {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!ctx_->sent_initial_metadata_);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
 | 
	
		
			
				|  |  | +    ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +    call_->PerformOps(&buf);
 | 
	
		
			
				|  |  | +    call_->cq()->Pluck(&buf);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual bool Read(R* msg) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    buf.AddRecvMessage(msg);
 | 
	
		
			
				|  |  | +    call_->PerformOps(&buf);
 | 
	
		
			
				|  |  | +    return call_->cq()->Pluck(&buf) && buf.got_message;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual bool Write(const W& msg) override {
 | 
	
		
			
				|  |  | +    CallOpBuffer buf;
 | 
	
		
			
				|  |  | +    if (!ctx_->sent_initial_metadata_) {
 | 
	
		
			
				|  |  | +      buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
 | 
	
		
			
				|  |  | +      ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    buf.AddSendMessage(msg);
 | 
	
		
			
				|  |  | +    call_->PerformOps(&buf);
 | 
	
		
			
				|  |  | +    return call_->cq()->Pluck(&buf);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  StreamContextInterface* const context_;
 | 
	
		
			
				|  |  | +  Call* const call_;
 | 
	
		
			
				|  |  | +  ServerContext* const ctx_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// Async interfaces
 | 
	
		
			
				|  |  | +// Common interface for all client side streaming.
 | 
	
		
			
				|  |  | +class ClientAsyncStreamingInterface {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  virtual ~ClientAsyncStreamingInterface() {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void ReadInitialMetadata(void* tag) = 0;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Finish(Status* status, void* tag) = 0;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +// An interface that yields a sequence of R messages.
 | 
	
		
			
				|  |  |  template <class R>
 | 
	
		
			
				|  |  | -class ServerReader : public ReaderInterface<R> {
 | 
	
		
			
				|  |  | +class AsyncReaderInterface {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  virtual ~AsyncReaderInterface() {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Read(R* msg, void* tag) = 0;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// An interface that can be fed a sequence of W messages.
 | 
	
		
			
				|  |  | +template <class W>
 | 
	
		
			
				|  |  | +class AsyncWriterInterface {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  virtual ~AsyncWriterInterface() {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Write(const W& msg, void* tag) = 0;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class R>
 | 
	
		
			
				|  |  | +class ClientAsyncReader final : public ClientAsyncStreamingInterface,
 | 
	
		
			
				|  |  | +                                public AsyncReaderInterface<R> {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  // Create a stream and write the first request out.
 | 
	
		
			
				|  |  | +  ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
 | 
	
		
			
				|  |  | +                    const RpcMethod& method, ClientContext* context,
 | 
	
		
			
				|  |  | +                    const google::protobuf::Message& request, void* tag)
 | 
	
		
			
				|  |  | +      : context_(context), call_(channel->CreateCall(method, context, cq)) {
 | 
	
		
			
				|  |  | +    init_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
 | 
	
		
			
				|  |  | +    init_buf_.AddSendMessage(request);
 | 
	
		
			
				|  |  | +    init_buf_.AddClientSendClose();
 | 
	
		
			
				|  |  | +    call_.PerformOps(&init_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void ReadInitialMetadata(void* tag) override {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!context_->initial_metadata_received_);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    meta_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    meta_buf_.AddRecvInitialMetadata(context_);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&meta_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void Read(R* msg, void* tag) override {
 | 
	
		
			
				|  |  | +    read_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    if (!context_->initial_metadata_received_) {
 | 
	
		
			
				|  |  | +      read_buf_.AddRecvInitialMetadata(context_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    read_buf_.AddRecvMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&read_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void Finish(Status* status, void* tag) override {
 | 
	
		
			
				|  |  | +    finish_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    if (!context_->initial_metadata_received_) {
 | 
	
		
			
				|  |  | +      finish_buf_.AddRecvInitialMetadata(context_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    finish_buf_.AddClientRecvStatus(context_, status);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&finish_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  ClientContext* context_ = nullptr;
 | 
	
		
			
				|  |  | +  Call call_;
 | 
	
		
			
				|  |  | +  CallOpBuffer init_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer meta_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer read_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer finish_buf_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class W>
 | 
	
		
			
				|  |  | +class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
 | 
	
		
			
				|  |  | +                                public AsyncWriterInterface<W> {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
 | 
	
		
			
				|  |  | +                    const RpcMethod& method, ClientContext* context,
 | 
	
		
			
				|  |  | +                    google::protobuf::Message* response, void* tag)
 | 
	
		
			
				|  |  | +      : context_(context),
 | 
	
		
			
				|  |  | +        response_(response),
 | 
	
		
			
				|  |  | +        call_(channel->CreateCall(method, context, cq)) {
 | 
	
		
			
				|  |  | +    init_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&init_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void ReadInitialMetadata(void* tag) override {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!context_->initial_metadata_received_);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    meta_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    meta_buf_.AddRecvInitialMetadata(context_);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&meta_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void Write(const W& msg, void* tag) override {
 | 
	
		
			
				|  |  | +    write_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    write_buf_.AddSendMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&write_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void WritesDone(void* tag) {
 | 
	
		
			
				|  |  | +    writes_done_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    writes_done_buf_.AddClientSendClose();
 | 
	
		
			
				|  |  | +    call_.PerformOps(&writes_done_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void Finish(Status* status, void* tag) override {
 | 
	
		
			
				|  |  | +    finish_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    if (!context_->initial_metadata_received_) {
 | 
	
		
			
				|  |  | +      finish_buf_.AddRecvInitialMetadata(context_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    finish_buf_.AddRecvMessage(response_);
 | 
	
		
			
				|  |  | +    finish_buf_.AddClientRecvStatus(context_, status);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&finish_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  ClientContext* context_ = nullptr;
 | 
	
		
			
				|  |  | +  google::protobuf::Message* const response_;
 | 
	
		
			
				|  |  | +  Call call_;
 | 
	
		
			
				|  |  | +  CallOpBuffer init_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer meta_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer write_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer writes_done_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer finish_buf_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// Client-side interface for bi-directional streaming.
 | 
	
		
			
				|  |  | +template <class W, class R>
 | 
	
		
			
				|  |  | +class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
 | 
	
		
			
				|  |  | +                                      public AsyncWriterInterface<W>,
 | 
	
		
			
				|  |  | +                                      public AsyncReaderInterface<R> {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq,
 | 
	
		
			
				|  |  | +                          const RpcMethod& method, ClientContext* context,
 | 
	
		
			
				|  |  | +                          void* tag)
 | 
	
		
			
				|  |  | +      : context_(context), call_(channel->CreateCall(method, context, cq)) {
 | 
	
		
			
				|  |  | +    init_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&init_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void ReadInitialMetadata(void* tag) override {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!context_->initial_metadata_received_);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    meta_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    meta_buf_.AddRecvInitialMetadata(context_);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&meta_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void Read(R* msg, void* tag) override {
 | 
	
		
			
				|  |  | +    read_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    if (!context_->initial_metadata_received_) {
 | 
	
		
			
				|  |  | +      read_buf_.AddRecvInitialMetadata(context_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    read_buf_.AddRecvMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&read_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void Write(const W& msg, void* tag) override {
 | 
	
		
			
				|  |  | +    write_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    write_buf_.AddSendMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&write_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void WritesDone(void* tag) {
 | 
	
		
			
				|  |  | +    writes_done_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    writes_done_buf_.AddClientSendClose();
 | 
	
		
			
				|  |  | +    call_.PerformOps(&writes_done_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void Finish(Status* status, void* tag) override {
 | 
	
		
			
				|  |  | +    finish_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    if (!context_->initial_metadata_received_) {
 | 
	
		
			
				|  |  | +      finish_buf_.AddRecvInitialMetadata(context_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    finish_buf_.AddClientRecvStatus(context_, status);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&finish_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  ClientContext* context_ = nullptr;
 | 
	
		
			
				|  |  | +  Call call_;
 | 
	
		
			
				|  |  | +  CallOpBuffer init_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer meta_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer read_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer write_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer writes_done_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer finish_buf_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class W, class R>
 | 
	
		
			
				|  |  | +class ServerAsyncReader : public ServerAsyncStreamingInterface,
 | 
	
		
			
				|  |  | +                          public AsyncReaderInterface<R> {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  | -  explicit ServerReader(StreamContextInterface* context) : context_(context) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(context_);
 | 
	
		
			
				|  |  | -    context_->Start(true);
 | 
	
		
			
				|  |  | +  explicit ServerAsyncReader(ServerContext* ctx)
 | 
	
		
			
				|  |  | +      : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void SendInitialMetadata(void* tag) override {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!ctx_->sent_initial_metadata_);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    meta_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
 | 
	
		
			
				|  |  | +    ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +    call_.PerformOps(&meta_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void Read(R* msg, void* tag) override {
 | 
	
		
			
				|  |  | +    read_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    read_buf_.AddRecvMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&read_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void Finish(const W& msg, const Status& status, void* tag) {
 | 
	
		
			
				|  |  | +    finish_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    if (!ctx_->sent_initial_metadata_) {
 | 
	
		
			
				|  |  | +      finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
 | 
	
		
			
				|  |  | +      ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    // The response is dropped if the status is not OK.
 | 
	
		
			
				|  |  | +    if (status.IsOk()) {
 | 
	
		
			
				|  |  | +      finish_buf_.AddSendMessage(msg);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&finish_buf_);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual bool Read(R* msg) { return context_->Read(msg); }
 | 
	
		
			
				|  |  | +  void FinishWithError(const Status& status, void* tag) {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!status.IsOk());
 | 
	
		
			
				|  |  | +    finish_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    if (!ctx_->sent_initial_metadata_) {
 | 
	
		
			
				|  |  | +      finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
 | 
	
		
			
				|  |  | +      ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&finish_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  StreamContextInterface* const context_;  // not owned
 | 
	
		
			
				|  |  | +  void BindCall(Call* call) override { call_ = *call; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  Call call_;
 | 
	
		
			
				|  |  | +  ServerContext* ctx_;
 | 
	
		
			
				|  |  | +  CallOpBuffer meta_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer read_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer finish_buf_;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  template <class W>
 | 
	
		
			
				|  |  | -class ServerWriter : public WriterInterface<W> {
 | 
	
		
			
				|  |  | +class ServerAsyncWriter : public ServerAsyncStreamingInterface,
 | 
	
		
			
				|  |  | +                          public AsyncWriterInterface<W> {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  | -  explicit ServerWriter(StreamContextInterface* context) : context_(context) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(context_);
 | 
	
		
			
				|  |  | -    context_->Start(true);
 | 
	
		
			
				|  |  | -    context_->Read(context_->request());
 | 
	
		
			
				|  |  | +  explicit ServerAsyncWriter(ServerContext* ctx)
 | 
	
		
			
				|  |  | +      : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void SendInitialMetadata(void* tag) override {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!ctx_->sent_initial_metadata_);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    meta_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
 | 
	
		
			
				|  |  | +    ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +    call_.PerformOps(&meta_buf_);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual bool Write(const W& msg) {
 | 
	
		
			
				|  |  | -    return context_->Write(const_cast<W*>(&msg), false);
 | 
	
		
			
				|  |  | +  void Write(const W& msg, void* tag) override {
 | 
	
		
			
				|  |  | +    write_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    if (!ctx_->sent_initial_metadata_) {
 | 
	
		
			
				|  |  | +      write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
 | 
	
		
			
				|  |  | +      ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    write_buf_.AddSendMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&write_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void Finish(const Status& status, void* tag) {
 | 
	
		
			
				|  |  | +    finish_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    if (!ctx_->sent_initial_metadata_) {
 | 
	
		
			
				|  |  | +      finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
 | 
	
		
			
				|  |  | +      ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&finish_buf_);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  StreamContextInterface* const context_;  // not owned
 | 
	
		
			
				|  |  | +  void BindCall(Call* call) override { call_ = *call; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  Call call_;
 | 
	
		
			
				|  |  | +  ServerContext* ctx_;
 | 
	
		
			
				|  |  | +  CallOpBuffer meta_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer write_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer finish_buf_;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // Server-side interface for bi-directional streaming.
 | 
	
		
			
				|  |  |  template <class W, class R>
 | 
	
		
			
				|  |  | -class ServerReaderWriter : public WriterInterface<W>,
 | 
	
		
			
				|  |  | -                           public ReaderInterface<R> {
 | 
	
		
			
				|  |  | +class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface,
 | 
	
		
			
				|  |  | +                                public AsyncWriterInterface<W>,
 | 
	
		
			
				|  |  | +                                public AsyncReaderInterface<R> {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  | -  explicit ServerReaderWriter(StreamContextInterface* context)
 | 
	
		
			
				|  |  | -      : context_(context) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(context_);
 | 
	
		
			
				|  |  | -    context_->Start(true);
 | 
	
		
			
				|  |  | +  explicit ServerAsyncReaderWriter(ServerContext* ctx)
 | 
	
		
			
				|  |  | +      : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void SendInitialMetadata(void* tag) override {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!ctx_->sent_initial_metadata_);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    meta_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
 | 
	
		
			
				|  |  | +    ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +    call_.PerformOps(&meta_buf_);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual bool Read(R* msg) { return context_->Read(msg); }
 | 
	
		
			
				|  |  | +  virtual void Read(R* msg, void* tag) override {
 | 
	
		
			
				|  |  | +    read_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    read_buf_.AddRecvMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&read_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual bool Write(const W& msg) {
 | 
	
		
			
				|  |  | -    return context_->Write(const_cast<W*>(&msg), false);
 | 
	
		
			
				|  |  | +  virtual void Write(const W& msg, void* tag) override {
 | 
	
		
			
				|  |  | +    write_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    if (!ctx_->sent_initial_metadata_) {
 | 
	
		
			
				|  |  | +      write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
 | 
	
		
			
				|  |  | +      ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    write_buf_.AddSendMessage(msg);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&write_buf_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void Finish(const Status& status, void* tag) {
 | 
	
		
			
				|  |  | +    finish_buf_.Reset(tag);
 | 
	
		
			
				|  |  | +    if (!ctx_->sent_initial_metadata_) {
 | 
	
		
			
				|  |  | +      finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
 | 
	
		
			
				|  |  | +      ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
 | 
	
		
			
				|  |  | +    call_.PerformOps(&finish_buf_);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  StreamContextInterface* const context_;  // not owned
 | 
	
		
			
				|  |  | +  void BindCall(Call* call) override { call_ = *call; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  Call call_;
 | 
	
		
			
				|  |  | +  ServerContext* ctx_;
 | 
	
		
			
				|  |  | +  CallOpBuffer meta_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer read_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer write_buf_;
 | 
	
		
			
				|  |  | +  CallOpBuffer finish_buf_;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  }  // namespace grpc
 |