|  | @@ -19,7 +19,9 @@
 | 
	
		
			
				|  |  |  #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
 | 
	
		
			
				|  |  |  #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +#include <atomic>
 | 
	
		
			
				|  |  |  #include <functional>
 | 
	
		
			
				|  |  | +#include <type_traits>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #include <grpcpp/impl/codegen/call.h>
 | 
	
		
			
				|  |  |  #include <grpcpp/impl/codegen/call_op_set.h>
 | 
	
	
		
			
				|  | @@ -32,19 +34,33 @@
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  namespace grpc {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -// forward declarations
 | 
	
		
			
				|  |  | +// Declare base class of all reactors as internal
 | 
	
		
			
				|  |  |  namespace internal {
 | 
	
		
			
				|  |  | -template <class ServiceType, class RequestType, class ResponseType>
 | 
	
		
			
				|  |  | -class CallbackUnaryHandler;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class ServerReactor {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  virtual ~ServerReactor() = default;
 | 
	
		
			
				|  |  | +  virtual void OnDone() {}
 | 
	
		
			
				|  |  | +  virtual void OnCancel() {}
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  }  // namespace internal
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  namespace experimental {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +// Forward declarations
 | 
	
		
			
				|  |  | +template <class Request, class Response>
 | 
	
		
			
				|  |  | +class ServerReadReactor;
 | 
	
		
			
				|  |  | +template <class Request, class Response>
 | 
	
		
			
				|  |  | +class ServerWriteReactor;
 | 
	
		
			
				|  |  | +template <class Request, class Response>
 | 
	
		
			
				|  |  | +class ServerBidiReactor;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  // For unary RPCs, the exposed controller class is only an interface
 | 
	
		
			
				|  |  |  // and the actual implementation is an internal class.
 | 
	
		
			
				|  |  |  class ServerCallbackRpcController {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  | -  virtual ~ServerCallbackRpcController() {}
 | 
	
		
			
				|  |  | +  virtual ~ServerCallbackRpcController() = default;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    // The method handler must call this function when it is done so that
 | 
	
		
			
				|  |  |    // the library knows to free its resources
 | 
	
	
		
			
				|  | @@ -55,18 +71,193 @@ class ServerCallbackRpcController {
 | 
	
		
			
				|  |  |    virtual void SendInitialMetadata(std::function<void(bool)>) = 0;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +// NOTE: The actual streaming object classes are provided
 | 
	
		
			
				|  |  | +// as API only to support mocking. There are no implementations of
 | 
	
		
			
				|  |  | +// these class interfaces in the API.
 | 
	
		
			
				|  |  | +template <class Request>
 | 
	
		
			
				|  |  | +class ServerCallbackReader {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  virtual ~ServerCallbackReader() {}
 | 
	
		
			
				|  |  | +  virtual void Finish(Status s) = 0;
 | 
	
		
			
				|  |  | +  virtual void SendInitialMetadata() = 0;
 | 
	
		
			
				|  |  | +  virtual void Read(Request* msg) = 0;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + protected:
 | 
	
		
			
				|  |  | +  template <class Response>
 | 
	
		
			
				|  |  | +  void BindReactor(ServerReadReactor<Request, Response>* reactor) {
 | 
	
		
			
				|  |  | +    reactor->BindReader(this);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class Response>
 | 
	
		
			
				|  |  | +class ServerCallbackWriter {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  virtual ~ServerCallbackWriter() {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Finish(Status s) = 0;
 | 
	
		
			
				|  |  | +  virtual void SendInitialMetadata() = 0;
 | 
	
		
			
				|  |  | +  virtual void Write(const Response* msg, WriteOptions options) = 0;
 | 
	
		
			
				|  |  | +  virtual void WriteAndFinish(const Response* msg, WriteOptions options,
 | 
	
		
			
				|  |  | +                              Status s) {
 | 
	
		
			
				|  |  | +    // Default implementation that can/should be overridden
 | 
	
		
			
				|  |  | +    Write(msg, std::move(options));
 | 
	
		
			
				|  |  | +    Finish(std::move(s));
 | 
	
		
			
				|  |  | +  };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + protected:
 | 
	
		
			
				|  |  | +  template <class Request>
 | 
	
		
			
				|  |  | +  void BindReactor(ServerWriteReactor<Request, Response>* reactor) {
 | 
	
		
			
				|  |  | +    reactor->BindWriter(this);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class Request, class Response>
 | 
	
		
			
				|  |  | +class ServerCallbackReaderWriter {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  virtual ~ServerCallbackReaderWriter() {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  virtual void Finish(Status s) = 0;
 | 
	
		
			
				|  |  | +  virtual void SendInitialMetadata() = 0;
 | 
	
		
			
				|  |  | +  virtual void Read(Request* msg) = 0;
 | 
	
		
			
				|  |  | +  virtual void Write(const Response* msg, WriteOptions options) = 0;
 | 
	
		
			
				|  |  | +  virtual void WriteAndFinish(const Response* msg, WriteOptions options,
 | 
	
		
			
				|  |  | +                              Status s) {
 | 
	
		
			
				|  |  | +    // Default implementation that can/should be overridden
 | 
	
		
			
				|  |  | +    Write(msg, std::move(options));
 | 
	
		
			
				|  |  | +    Finish(std::move(s));
 | 
	
		
			
				|  |  | +  };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + protected:
 | 
	
		
			
				|  |  | +  void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
 | 
	
		
			
				|  |  | +    reactor->BindStream(this);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// The following classes are reactors that are to be implemented
 | 
	
		
			
				|  |  | +// by the user, returned as the result of the method handler for
 | 
	
		
			
				|  |  | +// a callback method, and activated by the call to OnStarted
 | 
	
		
			
				|  |  | +template <class Request, class Response>
 | 
	
		
			
				|  |  | +class ServerBidiReactor : public internal::ServerReactor {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  ~ServerBidiReactor() = default;
 | 
	
		
			
				|  |  | +  virtual void OnStarted(ServerContext*) {}
 | 
	
		
			
				|  |  | +  virtual void OnSendInitialMetadataDone(bool ok) {}
 | 
	
		
			
				|  |  | +  virtual void OnReadDone(bool ok) {}
 | 
	
		
			
				|  |  | +  virtual void OnWriteDone(bool ok) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void StartSendInitialMetadata() { stream_->SendInitialMetadata(); }
 | 
	
		
			
				|  |  | +  void StartRead(Request* msg) { stream_->Read(msg); }
 | 
	
		
			
				|  |  | +  void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); }
 | 
	
		
			
				|  |  | +  void StartWrite(const Response* msg, WriteOptions options) {
 | 
	
		
			
				|  |  | +    stream_->Write(msg, std::move(options));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  void StartWriteAndFinish(const Response* msg, WriteOptions options,
 | 
	
		
			
				|  |  | +                           Status s) {
 | 
	
		
			
				|  |  | +    stream_->WriteAndFinish(msg, std::move(options), std::move(s));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  void StartWriteLast(const Response* msg, WriteOptions options) {
 | 
	
		
			
				|  |  | +    StartWrite(msg, std::move(options.set_last_message()));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  void Finish(Status s) { stream_->Finish(std::move(s)); }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  friend class ServerCallbackReaderWriter<Request, Response>;
 | 
	
		
			
				|  |  | +  void BindStream(ServerCallbackReaderWriter<Request, Response>* stream) {
 | 
	
		
			
				|  |  | +    stream_ = stream;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  ServerCallbackReaderWriter<Request, Response>* stream_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class Request, class Response>
 | 
	
		
			
				|  |  | +class ServerReadReactor : public internal::ServerReactor {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  ~ServerReadReactor() = default;
 | 
	
		
			
				|  |  | +  virtual void OnStarted(ServerContext*, Response* resp) {}
 | 
	
		
			
				|  |  | +  virtual void OnSendInitialMetadataDone(bool ok) {}
 | 
	
		
			
				|  |  | +  virtual void OnReadDone(bool ok) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void StartSendInitialMetadata() { reader_->SendInitialMetadata(); }
 | 
	
		
			
				|  |  | +  void StartRead(Request* msg) { reader_->Read(msg); }
 | 
	
		
			
				|  |  | +  void Finish(Status s) { reader_->Finish(std::move(s)); }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  friend class ServerCallbackReader<Request>;
 | 
	
		
			
				|  |  | +  void BindReader(ServerCallbackReader<Request>* reader) { reader_ = reader; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  ServerCallbackReader<Request>* reader_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class Request, class Response>
 | 
	
		
			
				|  |  | +class ServerWriteReactor : public internal::ServerReactor {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  ~ServerWriteReactor() = default;
 | 
	
		
			
				|  |  | +  virtual void OnStarted(ServerContext*, const Request* req) {}
 | 
	
		
			
				|  |  | +  virtual void OnSendInitialMetadataDone(bool ok) {}
 | 
	
		
			
				|  |  | +  virtual void OnWriteDone(bool ok) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void StartSendInitialMetadata() { writer_->SendInitialMetadata(); }
 | 
	
		
			
				|  |  | +  void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); }
 | 
	
		
			
				|  |  | +  void StartWrite(const Response* msg, WriteOptions options) {
 | 
	
		
			
				|  |  | +    writer_->Write(msg, std::move(options));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  void StartWriteAndFinish(const Response* msg, WriteOptions options,
 | 
	
		
			
				|  |  | +                           Status s) {
 | 
	
		
			
				|  |  | +    writer_->WriteAndFinish(msg, std::move(options), std::move(s));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  void StartWriteLast(const Response* msg, WriteOptions options) {
 | 
	
		
			
				|  |  | +    StartWrite(msg, std::move(options.set_last_message()));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  void Finish(Status s) { writer_->Finish(std::move(s)); }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  friend class ServerCallbackWriter<Response>;
 | 
	
		
			
				|  |  | +  void BindWriter(ServerCallbackWriter<Response>* writer) { writer_ = writer; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  ServerCallbackWriter<Response>* writer_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  }  // namespace experimental
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  namespace internal {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -template <class ServiceType, class RequestType, class ResponseType>
 | 
	
		
			
				|  |  | +template <class Request, class Response>
 | 
	
		
			
				|  |  | +class UnimplementedReadReactor
 | 
	
		
			
				|  |  | +    : public experimental::ServerReadReactor<Request, Response> {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  void OnDone() override { delete this; }
 | 
	
		
			
				|  |  | +  void OnStarted(ServerContext*, Response*) override {
 | 
	
		
			
				|  |  | +    this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class Request, class Response>
 | 
	
		
			
				|  |  | +class UnimplementedWriteReactor
 | 
	
		
			
				|  |  | +    : public experimental::ServerWriteReactor<Request, Response> {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  void OnDone() override { delete this; }
 | 
	
		
			
				|  |  | +  void OnStarted(ServerContext*, const Request*) override {
 | 
	
		
			
				|  |  | +    this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class Request, class Response>
 | 
	
		
			
				|  |  | +class UnimplementedBidiReactor
 | 
	
		
			
				|  |  | +    : public experimental::ServerBidiReactor<Request, Response> {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  void OnDone() override { delete this; }
 | 
	
		
			
				|  |  | +  void OnStarted(ServerContext*) override {
 | 
	
		
			
				|  |  | +    this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class RequestType, class ResponseType>
 | 
	
		
			
				|  |  |  class CallbackUnaryHandler : public MethodHandler {
 | 
	
		
			
				|  |  |   public:
 | 
	
		
			
				|  |  |    CallbackUnaryHandler(
 | 
	
		
			
				|  |  |        std::function<void(ServerContext*, const RequestType*, ResponseType*,
 | 
	
		
			
				|  |  |                           experimental::ServerCallbackRpcController*)>
 | 
	
		
			
				|  |  | -          func,
 | 
	
		
			
				|  |  | -      ServiceType* service)
 | 
	
		
			
				|  |  | +          func)
 | 
	
		
			
				|  |  |        : func_(func) {}
 | 
	
		
			
				|  |  |    void RunHandler(const HandlerParameter& param) final {
 | 
	
		
			
				|  |  |      // Arena allocate a controller structure (that includes request/response)
 | 
	
	
		
			
				|  | @@ -81,9 +272,8 @@ class CallbackUnaryHandler : public MethodHandler {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      if (status.ok()) {
 | 
	
		
			
				|  |  |        // Call the actual function handler and expect the user to call finish
 | 
	
		
			
				|  |  | -      CatchingCallback(std::move(func_), param.server_context,
 | 
	
		
			
				|  |  | -                       controller->request(), controller->response(),
 | 
	
		
			
				|  |  | -                       controller);
 | 
	
		
			
				|  |  | +      CatchingCallback(func_, param.server_context, controller->request(),
 | 
	
		
			
				|  |  | +                       controller->response(), controller);
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  |        // if deserialization failed, we need to fail the call
 | 
	
		
			
				|  |  |        controller->Finish(status);
 | 
	
	
		
			
				|  | @@ -117,79 +307,579 @@ class CallbackUnaryHandler : public MethodHandler {
 | 
	
		
			
				|  |  |        : public experimental::ServerCallbackRpcController {
 | 
	
		
			
				|  |  |     public:
 | 
	
		
			
				|  |  |      void Finish(Status s) override {
 | 
	
		
			
				|  |  | -      finish_tag_.Set(
 | 
	
		
			
				|  |  | -          call_.call(),
 | 
	
		
			
				|  |  | -          [this](bool) {
 | 
	
		
			
				|  |  | -            grpc_call* call = call_.call();
 | 
	
		
			
				|  |  | -            auto call_requester = std::move(call_requester_);
 | 
	
		
			
				|  |  | -            this->~ServerCallbackRpcControllerImpl();  // explicitly call
 | 
	
		
			
				|  |  | -                                                       // destructor
 | 
	
		
			
				|  |  | -            g_core_codegen_interface->grpc_call_unref(call);
 | 
	
		
			
				|  |  | -            call_requester();
 | 
	
		
			
				|  |  | -          },
 | 
	
		
			
				|  |  | -          &finish_buf_);
 | 
	
		
			
				|  |  | +      finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
 | 
	
		
			
				|  |  | +                      &finish_ops_);
 | 
	
		
			
				|  |  |        if (!ctx_->sent_initial_metadata_) {
 | 
	
		
			
				|  |  | -        finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
 | 
	
		
			
				|  |  | +        finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
 | 
	
		
			
				|  |  |                                          ctx_->initial_metadata_flags());
 | 
	
		
			
				|  |  |          if (ctx_->compression_level_set()) {
 | 
	
		
			
				|  |  | -          finish_buf_.set_compression_level(ctx_->compression_level());
 | 
	
		
			
				|  |  | +          finish_ops_.set_compression_level(ctx_->compression_level());
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        // The response is dropped if the status is not OK.
 | 
	
		
			
				|  |  |        if (s.ok()) {
 | 
	
		
			
				|  |  | -        finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_,
 | 
	
		
			
				|  |  | -                                     finish_buf_.SendMessage(resp_));
 | 
	
		
			
				|  |  | +        finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
 | 
	
		
			
				|  |  | +                                     finish_ops_.SendMessage(resp_));
 | 
	
		
			
				|  |  |        } else {
 | 
	
		
			
				|  |  | -        finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, s);
 | 
	
		
			
				|  |  | +        finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -      finish_buf_.set_core_cq_tag(&finish_tag_);
 | 
	
		
			
				|  |  | -      call_.PerformOps(&finish_buf_);
 | 
	
		
			
				|  |  | +      finish_ops_.set_core_cq_tag(&finish_tag_);
 | 
	
		
			
				|  |  | +      call_.PerformOps(&finish_ops_);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      void SendInitialMetadata(std::function<void(bool)> f) override {
 | 
	
		
			
				|  |  |        GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      meta_tag_.Set(call_.call(), std::move(f), &meta_buf_);
 | 
	
		
			
				|  |  | -      meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
 | 
	
		
			
				|  |  | +      callbacks_outstanding_++;
 | 
	
		
			
				|  |  | +      // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14
 | 
	
		
			
				|  |  | +      //              and if performance of this operation matters
 | 
	
		
			
				|  |  | +      meta_tag_.Set(call_.call(),
 | 
	
		
			
				|  |  | +                    [this, f](bool ok) {
 | 
	
		
			
				|  |  | +                      f(ok);
 | 
	
		
			
				|  |  | +                      MaybeDone();
 | 
	
		
			
				|  |  | +                    },
 | 
	
		
			
				|  |  | +                    &meta_ops_);
 | 
	
		
			
				|  |  | +      meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
 | 
	
		
			
				|  |  |                                      ctx_->initial_metadata_flags());
 | 
	
		
			
				|  |  |        if (ctx_->compression_level_set()) {
 | 
	
		
			
				|  |  | -        meta_buf_.set_compression_level(ctx_->compression_level());
 | 
	
		
			
				|  |  | +        meta_ops_.set_compression_level(ctx_->compression_level());
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | -      meta_buf_.set_core_cq_tag(&meta_tag_);
 | 
	
		
			
				|  |  | -      call_.PerformOps(&meta_buf_);
 | 
	
		
			
				|  |  | +      meta_ops_.set_core_cq_tag(&meta_tag_);
 | 
	
		
			
				|  |  | +      call_.PerformOps(&meta_ops_);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |     private:
 | 
	
		
			
				|  |  | -    template <class SrvType, class ReqType, class RespType>
 | 
	
		
			
				|  |  | -    friend class CallbackUnaryHandler;
 | 
	
		
			
				|  |  | +    friend class CallbackUnaryHandler<RequestType, ResponseType>;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call,
 | 
	
		
			
				|  |  | -                                    RequestType* req,
 | 
	
		
			
				|  |  | +                                    const RequestType* req,
 | 
	
		
			
				|  |  |                                      std::function<void()> call_requester)
 | 
	
		
			
				|  |  |          : ctx_(ctx),
 | 
	
		
			
				|  |  |            call_(*call),
 | 
	
		
			
				|  |  |            req_(req),
 | 
	
		
			
				|  |  | -          call_requester_(std::move(call_requester)) {}
 | 
	
		
			
				|  |  | +          call_requester_(std::move(call_requester)) {
 | 
	
		
			
				|  |  | +      ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    RequestType* request() { return req_; }
 | 
	
		
			
				|  |  | +    const RequestType* request() { return req_; }
 | 
	
		
			
				|  |  |      ResponseType* response() { return &resp_; }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    CallOpSet<CallOpSendInitialMetadata> meta_buf_;
 | 
	
		
			
				|  |  | +    void MaybeDone() {
 | 
	
		
			
				|  |  | +      if (--callbacks_outstanding_ == 0) {
 | 
	
		
			
				|  |  | +        grpc_call* call = call_.call();
 | 
	
		
			
				|  |  | +        auto call_requester = std::move(call_requester_);
 | 
	
		
			
				|  |  | +        this->~ServerCallbackRpcControllerImpl();  // explicitly call destructor
 | 
	
		
			
				|  |  | +        g_core_codegen_interface->grpc_call_unref(call);
 | 
	
		
			
				|  |  | +        call_requester();
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    CallOpSet<CallOpSendInitialMetadata> meta_ops_;
 | 
	
		
			
				|  |  |      CallbackWithSuccessTag meta_tag_;
 | 
	
		
			
				|  |  |      CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
 | 
	
		
			
				|  |  |                CallOpServerSendStatus>
 | 
	
		
			
				|  |  | -        finish_buf_;
 | 
	
		
			
				|  |  | +        finish_ops_;
 | 
	
		
			
				|  |  |      CallbackWithSuccessTag finish_tag_;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      ServerContext* ctx_;
 | 
	
		
			
				|  |  |      Call call_;
 | 
	
		
			
				|  |  | -    RequestType* req_;
 | 
	
		
			
				|  |  | +    const RequestType* req_;
 | 
	
		
			
				|  |  |      ResponseType resp_;
 | 
	
		
			
				|  |  |      std::function<void()> call_requester_;
 | 
	
		
			
				|  |  | +    std::atomic_int callbacks_outstanding_{
 | 
	
		
			
				|  |  | +        2};  // reserve for Finish and CompletionOp
 | 
	
		
			
				|  |  | +  };
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class RequestType, class ResponseType>
 | 
	
		
			
				|  |  | +class CallbackClientStreamingHandler : public MethodHandler {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  CallbackClientStreamingHandler(
 | 
	
		
			
				|  |  | +      std::function<
 | 
	
		
			
				|  |  | +          experimental::ServerReadReactor<RequestType, ResponseType>*()>
 | 
	
		
			
				|  |  | +          func)
 | 
	
		
			
				|  |  | +      : func_(std::move(func)) {}
 | 
	
		
			
				|  |  | +  void RunHandler(const HandlerParameter& param) final {
 | 
	
		
			
				|  |  | +    // Arena allocate a reader structure (that includes response)
 | 
	
		
			
				|  |  | +    g_core_codegen_interface->grpc_call_ref(param.call->call());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    experimental::ServerReadReactor<RequestType, ResponseType>* reactor =
 | 
	
		
			
				|  |  | +        param.status.ok()
 | 
	
		
			
				|  |  | +            ? CatchingReactorCreator<
 | 
	
		
			
				|  |  | +                  experimental::ServerReadReactor<RequestType, ResponseType>>(
 | 
	
		
			
				|  |  | +                  func_)
 | 
	
		
			
				|  |  | +            : nullptr;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    if (reactor == nullptr) {
 | 
	
		
			
				|  |  | +      // if deserialization or reactor creator failed, we need to fail the call
 | 
	
		
			
				|  |  | +      reactor = new UnimplementedReadReactor<RequestType, ResponseType>;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    auto* reader = new (g_core_codegen_interface->grpc_call_arena_alloc(
 | 
	
		
			
				|  |  | +        param.call->call(), sizeof(ServerCallbackReaderImpl)))
 | 
	
		
			
				|  |  | +        ServerCallbackReaderImpl(param.server_context, param.call,
 | 
	
		
			
				|  |  | +                                 std::move(param.call_requester), reactor);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    reader->BindReactor(reactor);
 | 
	
		
			
				|  |  | +    reactor->OnStarted(param.server_context, reader->response());
 | 
	
		
			
				|  |  | +    reader->MaybeDone();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  std::function<experimental::ServerReadReactor<RequestType, ResponseType>*()>
 | 
	
		
			
				|  |  | +      func_;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  class ServerCallbackReaderImpl
 | 
	
		
			
				|  |  | +      : public experimental::ServerCallbackReader<RequestType> {
 | 
	
		
			
				|  |  | +   public:
 | 
	
		
			
				|  |  | +    void Finish(Status s) override {
 | 
	
		
			
				|  |  | +      finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
 | 
	
		
			
				|  |  | +                      &finish_ops_);
 | 
	
		
			
				|  |  | +      if (!ctx_->sent_initial_metadata_) {
 | 
	
		
			
				|  |  | +        finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
 | 
	
		
			
				|  |  | +                                        ctx_->initial_metadata_flags());
 | 
	
		
			
				|  |  | +        if (ctx_->compression_level_set()) {
 | 
	
		
			
				|  |  | +          finish_ops_.set_compression_level(ctx_->compression_level());
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      // The response is dropped if the status is not OK.
 | 
	
		
			
				|  |  | +      if (s.ok()) {
 | 
	
		
			
				|  |  | +        finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
 | 
	
		
			
				|  |  | +                                     finish_ops_.SendMessage(resp_));
 | 
	
		
			
				|  |  | +      } else {
 | 
	
		
			
				|  |  | +        finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      finish_ops_.set_core_cq_tag(&finish_tag_);
 | 
	
		
			
				|  |  | +      call_.PerformOps(&finish_ops_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void SendInitialMetadata() override {
 | 
	
		
			
				|  |  | +      GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
 | 
	
		
			
				|  |  | +      callbacks_outstanding_++;
 | 
	
		
			
				|  |  | +      meta_tag_.Set(call_.call(),
 | 
	
		
			
				|  |  | +                    [this](bool ok) {
 | 
	
		
			
				|  |  | +                      reactor_->OnSendInitialMetadataDone(ok);
 | 
	
		
			
				|  |  | +                      MaybeDone();
 | 
	
		
			
				|  |  | +                    },
 | 
	
		
			
				|  |  | +                    &meta_ops_);
 | 
	
		
			
				|  |  | +      meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
 | 
	
		
			
				|  |  | +                                    ctx_->initial_metadata_flags());
 | 
	
		
			
				|  |  | +      if (ctx_->compression_level_set()) {
 | 
	
		
			
				|  |  | +        meta_ops_.set_compression_level(ctx_->compression_level());
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +      meta_ops_.set_core_cq_tag(&meta_tag_);
 | 
	
		
			
				|  |  | +      call_.PerformOps(&meta_ops_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void Read(RequestType* req) override {
 | 
	
		
			
				|  |  | +      callbacks_outstanding_++;
 | 
	
		
			
				|  |  | +      read_ops_.RecvMessage(req);
 | 
	
		
			
				|  |  | +      call_.PerformOps(&read_ops_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +   private:
 | 
	
		
			
				|  |  | +    friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    ServerCallbackReaderImpl(
 | 
	
		
			
				|  |  | +        ServerContext* ctx, Call* call, std::function<void()> call_requester,
 | 
	
		
			
				|  |  | +        experimental::ServerReadReactor<RequestType, ResponseType>* reactor)
 | 
	
		
			
				|  |  | +        : ctx_(ctx),
 | 
	
		
			
				|  |  | +          call_(*call),
 | 
	
		
			
				|  |  | +          call_requester_(std::move(call_requester)),
 | 
	
		
			
				|  |  | +          reactor_(reactor) {
 | 
	
		
			
				|  |  | +      ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
 | 
	
		
			
				|  |  | +      read_tag_.Set(call_.call(),
 | 
	
		
			
				|  |  | +                    [this](bool ok) {
 | 
	
		
			
				|  |  | +                      reactor_->OnReadDone(ok);
 | 
	
		
			
				|  |  | +                      MaybeDone();
 | 
	
		
			
				|  |  | +                    },
 | 
	
		
			
				|  |  | +                    &read_ops_);
 | 
	
		
			
				|  |  | +      read_ops_.set_core_cq_tag(&read_tag_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    ~ServerCallbackReaderImpl() {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    ResponseType* response() { return &resp_; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void MaybeDone() {
 | 
	
		
			
				|  |  | +      if (--callbacks_outstanding_ == 0) {
 | 
	
		
			
				|  |  | +        reactor_->OnDone();
 | 
	
		
			
				|  |  | +        grpc_call* call = call_.call();
 | 
	
		
			
				|  |  | +        auto call_requester = std::move(call_requester_);
 | 
	
		
			
				|  |  | +        this->~ServerCallbackReaderImpl();  // explicitly call destructor
 | 
	
		
			
				|  |  | +        g_core_codegen_interface->grpc_call_unref(call);
 | 
	
		
			
				|  |  | +        call_requester();
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    CallOpSet<CallOpSendInitialMetadata> meta_ops_;
 | 
	
		
			
				|  |  | +    CallbackWithSuccessTag meta_tag_;
 | 
	
		
			
				|  |  | +    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
 | 
	
		
			
				|  |  | +              CallOpServerSendStatus>
 | 
	
		
			
				|  |  | +        finish_ops_;
 | 
	
		
			
				|  |  | +    CallbackWithSuccessTag finish_tag_;
 | 
	
		
			
				|  |  | +    CallOpSet<CallOpRecvMessage<RequestType>> read_ops_;
 | 
	
		
			
				|  |  | +    CallbackWithSuccessTag read_tag_;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    ServerContext* ctx_;
 | 
	
		
			
				|  |  | +    Call call_;
 | 
	
		
			
				|  |  | +    ResponseType resp_;
 | 
	
		
			
				|  |  | +    std::function<void()> call_requester_;
 | 
	
		
			
				|  |  | +    experimental::ServerReadReactor<RequestType, ResponseType>* reactor_;
 | 
	
		
			
				|  |  | +    std::atomic_int callbacks_outstanding_{
 | 
	
		
			
				|  |  | +        3};  // reserve for OnStarted, Finish, and CompletionOp
 | 
	
		
			
				|  |  | +  };
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class RequestType, class ResponseType>
 | 
	
		
			
				|  |  | +class CallbackServerStreamingHandler : public MethodHandler {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  CallbackServerStreamingHandler(
 | 
	
		
			
				|  |  | +      std::function<
 | 
	
		
			
				|  |  | +          experimental::ServerWriteReactor<RequestType, ResponseType>*()>
 | 
	
		
			
				|  |  | +          func)
 | 
	
		
			
				|  |  | +      : func_(std::move(func)) {}
 | 
	
		
			
				|  |  | +  void RunHandler(const HandlerParameter& param) final {
 | 
	
		
			
				|  |  | +    // Arena allocate a writer structure
 | 
	
		
			
				|  |  | +    g_core_codegen_interface->grpc_call_ref(param.call->call());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    experimental::ServerWriteReactor<RequestType, ResponseType>* reactor =
 | 
	
		
			
				|  |  | +        param.status.ok()
 | 
	
		
			
				|  |  | +            ? CatchingReactorCreator<
 | 
	
		
			
				|  |  | +                  experimental::ServerWriteReactor<RequestType, ResponseType>>(
 | 
	
		
			
				|  |  | +                  func_)
 | 
	
		
			
				|  |  | +            : nullptr;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    if (reactor == nullptr) {
 | 
	
		
			
				|  |  | +      // if deserialization or reactor creator failed, we need to fail the call
 | 
	
		
			
				|  |  | +      reactor = new UnimplementedWriteReactor<RequestType, ResponseType>;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    auto* writer = new (g_core_codegen_interface->grpc_call_arena_alloc(
 | 
	
		
			
				|  |  | +        param.call->call(), sizeof(ServerCallbackWriterImpl)))
 | 
	
		
			
				|  |  | +        ServerCallbackWriterImpl(param.server_context, param.call,
 | 
	
		
			
				|  |  | +                                 static_cast<RequestType*>(param.request),
 | 
	
		
			
				|  |  | +                                 std::move(param.call_requester), reactor);
 | 
	
		
			
				|  |  | +    writer->BindReactor(reactor);
 | 
	
		
			
				|  |  | +    reactor->OnStarted(param.server_context, writer->request());
 | 
	
		
			
				|  |  | +    writer->MaybeDone();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
 | 
	
		
			
				|  |  | +                    Status* status) final {
 | 
	
		
			
				|  |  | +    ByteBuffer buf;
 | 
	
		
			
				|  |  | +    buf.set_buffer(req);
 | 
	
		
			
				|  |  | +    auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
 | 
	
		
			
				|  |  | +        call, sizeof(RequestType))) RequestType();
 | 
	
		
			
				|  |  | +    *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
 | 
	
		
			
				|  |  | +    buf.Release();
 | 
	
		
			
				|  |  | +    if (status->ok()) {
 | 
	
		
			
				|  |  | +      return request;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    request->~RequestType();
 | 
	
		
			
				|  |  | +    return nullptr;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  std::function<experimental::ServerWriteReactor<RequestType, ResponseType>*()>
 | 
	
		
			
				|  |  | +      func_;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  class ServerCallbackWriterImpl
 | 
	
		
			
				|  |  | +      : public experimental::ServerCallbackWriter<ResponseType> {
 | 
	
		
			
				|  |  | +   public:
 | 
	
		
			
				|  |  | +    void Finish(Status s) override {
 | 
	
		
			
				|  |  | +      finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
 | 
	
		
			
				|  |  | +                      &finish_ops_);
 | 
	
		
			
				|  |  | +      finish_ops_.set_core_cq_tag(&finish_tag_);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      if (!ctx_->sent_initial_metadata_) {
 | 
	
		
			
				|  |  | +        finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
 | 
	
		
			
				|  |  | +                                        ctx_->initial_metadata_flags());
 | 
	
		
			
				|  |  | +        if (ctx_->compression_level_set()) {
 | 
	
		
			
				|  |  | +          finish_ops_.set_compression_level(ctx_->compression_level());
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
 | 
	
		
			
				|  |  | +      call_.PerformOps(&finish_ops_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void SendInitialMetadata() override {
 | 
	
		
			
				|  |  | +      GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
 | 
	
		
			
				|  |  | +      callbacks_outstanding_++;
 | 
	
		
			
				|  |  | +      meta_tag_.Set(call_.call(),
 | 
	
		
			
				|  |  | +                    [this](bool ok) {
 | 
	
		
			
				|  |  | +                      reactor_->OnSendInitialMetadataDone(ok);
 | 
	
		
			
				|  |  | +                      MaybeDone();
 | 
	
		
			
				|  |  | +                    },
 | 
	
		
			
				|  |  | +                    &meta_ops_);
 | 
	
		
			
				|  |  | +      meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
 | 
	
		
			
				|  |  | +                                    ctx_->initial_metadata_flags());
 | 
	
		
			
				|  |  | +      if (ctx_->compression_level_set()) {
 | 
	
		
			
				|  |  | +        meta_ops_.set_compression_level(ctx_->compression_level());
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +      meta_ops_.set_core_cq_tag(&meta_tag_);
 | 
	
		
			
				|  |  | +      call_.PerformOps(&meta_ops_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void Write(const ResponseType* resp, WriteOptions options) override {
 | 
	
		
			
				|  |  | +      callbacks_outstanding_++;
 | 
	
		
			
				|  |  | +      if (options.is_last_message()) {
 | 
	
		
			
				|  |  | +        options.set_buffer_hint();
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      if (!ctx_->sent_initial_metadata_) {
 | 
	
		
			
				|  |  | +        write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
 | 
	
		
			
				|  |  | +                                       ctx_->initial_metadata_flags());
 | 
	
		
			
				|  |  | +        if (ctx_->compression_level_set()) {
 | 
	
		
			
				|  |  | +          write_ops_.set_compression_level(ctx_->compression_level());
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      // TODO(vjpai): don't assert
 | 
	
		
			
				|  |  | +      GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*resp, options).ok());
 | 
	
		
			
				|  |  | +      call_.PerformOps(&write_ops_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void WriteAndFinish(const ResponseType* resp, WriteOptions options,
 | 
	
		
			
				|  |  | +                        Status s) override {
 | 
	
		
			
				|  |  | +      // This combines the write into the finish callback
 | 
	
		
			
				|  |  | +      // Don't send any message if the status is bad
 | 
	
		
			
				|  |  | +      if (s.ok()) {
 | 
	
		
			
				|  |  | +        // TODO(vjpai): don't assert
 | 
	
		
			
				|  |  | +        GPR_CODEGEN_ASSERT(finish_ops_.SendMessage(*resp, options).ok());
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      Finish(std::move(s));
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +   private:
 | 
	
		
			
				|  |  | +    friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    ServerCallbackWriterImpl(
 | 
	
		
			
				|  |  | +        ServerContext* ctx, Call* call, const RequestType* req,
 | 
	
		
			
				|  |  | +        std::function<void()> call_requester,
 | 
	
		
			
				|  |  | +        experimental::ServerWriteReactor<RequestType, ResponseType>* reactor)
 | 
	
		
			
				|  |  | +        : ctx_(ctx),
 | 
	
		
			
				|  |  | +          call_(*call),
 | 
	
		
			
				|  |  | +          req_(req),
 | 
	
		
			
				|  |  | +          call_requester_(std::move(call_requester)),
 | 
	
		
			
				|  |  | +          reactor_(reactor) {
 | 
	
		
			
				|  |  | +      ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
 | 
	
		
			
				|  |  | +      write_tag_.Set(call_.call(),
 | 
	
		
			
				|  |  | +                     [this](bool ok) {
 | 
	
		
			
				|  |  | +                       reactor_->OnWriteDone(ok);
 | 
	
		
			
				|  |  | +                       MaybeDone();
 | 
	
		
			
				|  |  | +                     },
 | 
	
		
			
				|  |  | +                     &write_ops_);
 | 
	
		
			
				|  |  | +      write_ops_.set_core_cq_tag(&write_tag_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    ~ServerCallbackWriterImpl() { req_->~RequestType(); }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    const RequestType* request() { return req_; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void MaybeDone() {
 | 
	
		
			
				|  |  | +      if (--callbacks_outstanding_ == 0) {
 | 
	
		
			
				|  |  | +        reactor_->OnDone();
 | 
	
		
			
				|  |  | +        grpc_call* call = call_.call();
 | 
	
		
			
				|  |  | +        auto call_requester = std::move(call_requester_);
 | 
	
		
			
				|  |  | +        this->~ServerCallbackWriterImpl();  // explicitly call destructor
 | 
	
		
			
				|  |  | +        g_core_codegen_interface->grpc_call_unref(call);
 | 
	
		
			
				|  |  | +        call_requester();
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    CallOpSet<CallOpSendInitialMetadata> meta_ops_;
 | 
	
		
			
				|  |  | +    CallbackWithSuccessTag meta_tag_;
 | 
	
		
			
				|  |  | +    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
 | 
	
		
			
				|  |  | +              CallOpServerSendStatus>
 | 
	
		
			
				|  |  | +        finish_ops_;
 | 
	
		
			
				|  |  | +    CallbackWithSuccessTag finish_tag_;
 | 
	
		
			
				|  |  | +    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
 | 
	
		
			
				|  |  | +    CallbackWithSuccessTag write_tag_;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    ServerContext* ctx_;
 | 
	
		
			
				|  |  | +    Call call_;
 | 
	
		
			
				|  |  | +    const RequestType* req_;
 | 
	
		
			
				|  |  | +    std::function<void()> call_requester_;
 | 
	
		
			
				|  |  | +    experimental::ServerWriteReactor<RequestType, ResponseType>* reactor_;
 | 
	
		
			
				|  |  | +    std::atomic_int callbacks_outstanding_{
 | 
	
		
			
				|  |  | +        3};  // reserve for OnStarted, Finish, and CompletionOp
 | 
	
		
			
				|  |  | +  };
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +template <class RequestType, class ResponseType>
 | 
	
		
			
				|  |  | +class CallbackBidiHandler : public MethodHandler {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  CallbackBidiHandler(
 | 
	
		
			
				|  |  | +      std::function<
 | 
	
		
			
				|  |  | +          experimental::ServerBidiReactor<RequestType, ResponseType>*()>
 | 
	
		
			
				|  |  | +          func)
 | 
	
		
			
				|  |  | +      : func_(std::move(func)) {}
 | 
	
		
			
				|  |  | +  void RunHandler(const HandlerParameter& param) final {
 | 
	
		
			
				|  |  | +    g_core_codegen_interface->grpc_call_ref(param.call->call());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    experimental::ServerBidiReactor<RequestType, ResponseType>* reactor =
 | 
	
		
			
				|  |  | +        param.status.ok()
 | 
	
		
			
				|  |  | +            ? CatchingReactorCreator<
 | 
	
		
			
				|  |  | +                  experimental::ServerBidiReactor<RequestType, ResponseType>>(
 | 
	
		
			
				|  |  | +                  func_)
 | 
	
		
			
				|  |  | +            : nullptr;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    if (reactor == nullptr) {
 | 
	
		
			
				|  |  | +      // if deserialization or reactor creator failed, we need to fail the call
 | 
	
		
			
				|  |  | +      reactor = new UnimplementedBidiReactor<RequestType, ResponseType>;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    auto* stream = new (g_core_codegen_interface->grpc_call_arena_alloc(
 | 
	
		
			
				|  |  | +        param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
 | 
	
		
			
				|  |  | +        ServerCallbackReaderWriterImpl(param.server_context, param.call,
 | 
	
		
			
				|  |  | +                                       std::move(param.call_requester),
 | 
	
		
			
				|  |  | +                                       reactor);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    stream->BindReactor(reactor);
 | 
	
		
			
				|  |  | +    reactor->OnStarted(param.server_context);
 | 
	
		
			
				|  |  | +    stream->MaybeDone();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  std::function<experimental::ServerBidiReactor<RequestType, ResponseType>*()>
 | 
	
		
			
				|  |  | +      func_;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  class ServerCallbackReaderWriterImpl
 | 
	
		
			
				|  |  | +      : public experimental::ServerCallbackReaderWriter<RequestType,
 | 
	
		
			
				|  |  | +                                                        ResponseType> {
 | 
	
		
			
				|  |  | +   public:
 | 
	
		
			
				|  |  | +    void Finish(Status s) override {
 | 
	
		
			
				|  |  | +      finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
 | 
	
		
			
				|  |  | +                      &finish_ops_);
 | 
	
		
			
				|  |  | +      finish_ops_.set_core_cq_tag(&finish_tag_);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      if (!ctx_->sent_initial_metadata_) {
 | 
	
		
			
				|  |  | +        finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
 | 
	
		
			
				|  |  | +                                        ctx_->initial_metadata_flags());
 | 
	
		
			
				|  |  | +        if (ctx_->compression_level_set()) {
 | 
	
		
			
				|  |  | +          finish_ops_.set_compression_level(ctx_->compression_level());
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
 | 
	
		
			
				|  |  | +      call_.PerformOps(&finish_ops_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void SendInitialMetadata() override {
 | 
	
		
			
				|  |  | +      GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
 | 
	
		
			
				|  |  | +      callbacks_outstanding_++;
 | 
	
		
			
				|  |  | +      meta_tag_.Set(call_.call(),
 | 
	
		
			
				|  |  | +                    [this](bool ok) {
 | 
	
		
			
				|  |  | +                      reactor_->OnSendInitialMetadataDone(ok);
 | 
	
		
			
				|  |  | +                      MaybeDone();
 | 
	
		
			
				|  |  | +                    },
 | 
	
		
			
				|  |  | +                    &meta_ops_);
 | 
	
		
			
				|  |  | +      meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
 | 
	
		
			
				|  |  | +                                    ctx_->initial_metadata_flags());
 | 
	
		
			
				|  |  | +      if (ctx_->compression_level_set()) {
 | 
	
		
			
				|  |  | +        meta_ops_.set_compression_level(ctx_->compression_level());
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +      meta_ops_.set_core_cq_tag(&meta_tag_);
 | 
	
		
			
				|  |  | +      call_.PerformOps(&meta_ops_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void Write(const ResponseType* resp, WriteOptions options) override {
 | 
	
		
			
				|  |  | +      callbacks_outstanding_++;
 | 
	
		
			
				|  |  | +      if (options.is_last_message()) {
 | 
	
		
			
				|  |  | +        options.set_buffer_hint();
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      if (!ctx_->sent_initial_metadata_) {
 | 
	
		
			
				|  |  | +        write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
 | 
	
		
			
				|  |  | +                                       ctx_->initial_metadata_flags());
 | 
	
		
			
				|  |  | +        if (ctx_->compression_level_set()) {
 | 
	
		
			
				|  |  | +          write_ops_.set_compression_level(ctx_->compression_level());
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        ctx_->sent_initial_metadata_ = true;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      // TODO(vjpai): don't assert
 | 
	
		
			
				|  |  | +      GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*resp, options).ok());
 | 
	
		
			
				|  |  | +      call_.PerformOps(&write_ops_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void WriteAndFinish(const ResponseType* resp, WriteOptions options,
 | 
	
		
			
				|  |  | +                        Status s) override {
 | 
	
		
			
				|  |  | +      // Don't send any message if the status is bad
 | 
	
		
			
				|  |  | +      if (s.ok()) {
 | 
	
		
			
				|  |  | +        // TODO(vjpai): don't assert
 | 
	
		
			
				|  |  | +        GPR_CODEGEN_ASSERT(finish_ops_.SendMessage(*resp, options).ok());
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      Finish(std::move(s));
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void Read(RequestType* req) override {
 | 
	
		
			
				|  |  | +      callbacks_outstanding_++;
 | 
	
		
			
				|  |  | +      read_ops_.RecvMessage(req);
 | 
	
		
			
				|  |  | +      call_.PerformOps(&read_ops_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +   private:
 | 
	
		
			
				|  |  | +    friend class CallbackBidiHandler<RequestType, ResponseType>;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    ServerCallbackReaderWriterImpl(
 | 
	
		
			
				|  |  | +        ServerContext* ctx, Call* call, std::function<void()> call_requester,
 | 
	
		
			
				|  |  | +        experimental::ServerBidiReactor<RequestType, ResponseType>* reactor)
 | 
	
		
			
				|  |  | +        : ctx_(ctx),
 | 
	
		
			
				|  |  | +          call_(*call),
 | 
	
		
			
				|  |  | +          call_requester_(std::move(call_requester)),
 | 
	
		
			
				|  |  | +          reactor_(reactor) {
 | 
	
		
			
				|  |  | +      ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
 | 
	
		
			
				|  |  | +      write_tag_.Set(call_.call(),
 | 
	
		
			
				|  |  | +                     [this](bool ok) {
 | 
	
		
			
				|  |  | +                       reactor_->OnWriteDone(ok);
 | 
	
		
			
				|  |  | +                       MaybeDone();
 | 
	
		
			
				|  |  | +                     },
 | 
	
		
			
				|  |  | +                     &write_ops_);
 | 
	
		
			
				|  |  | +      write_ops_.set_core_cq_tag(&write_tag_);
 | 
	
		
			
				|  |  | +      read_tag_.Set(call_.call(),
 | 
	
		
			
				|  |  | +                    [this](bool ok) {
 | 
	
		
			
				|  |  | +                      reactor_->OnReadDone(ok);
 | 
	
		
			
				|  |  | +                      MaybeDone();
 | 
	
		
			
				|  |  | +                    },
 | 
	
		
			
				|  |  | +                    &read_ops_);
 | 
	
		
			
				|  |  | +      read_ops_.set_core_cq_tag(&read_tag_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    ~ServerCallbackReaderWriterImpl() {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void MaybeDone() {
 | 
	
		
			
				|  |  | +      if (--callbacks_outstanding_ == 0) {
 | 
	
		
			
				|  |  | +        reactor_->OnDone();
 | 
	
		
			
				|  |  | +        grpc_call* call = call_.call();
 | 
	
		
			
				|  |  | +        auto call_requester = std::move(call_requester_);
 | 
	
		
			
				|  |  | +        this->~ServerCallbackReaderWriterImpl();  // explicitly call destructor
 | 
	
		
			
				|  |  | +        g_core_codegen_interface->grpc_call_unref(call);
 | 
	
		
			
				|  |  | +        call_requester();
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    CallOpSet<CallOpSendInitialMetadata> meta_ops_;
 | 
	
		
			
				|  |  | +    CallbackWithSuccessTag meta_tag_;
 | 
	
		
			
				|  |  | +    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
 | 
	
		
			
				|  |  | +              CallOpServerSendStatus>
 | 
	
		
			
				|  |  | +        finish_ops_;
 | 
	
		
			
				|  |  | +    CallbackWithSuccessTag finish_tag_;
 | 
	
		
			
				|  |  | +    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
 | 
	
		
			
				|  |  | +    CallbackWithSuccessTag write_tag_;
 | 
	
		
			
				|  |  | +    CallOpSet<CallOpRecvMessage<RequestType>> read_ops_;
 | 
	
		
			
				|  |  | +    CallbackWithSuccessTag read_tag_;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    ServerContext* ctx_;
 | 
	
		
			
				|  |  | +    Call call_;
 | 
	
		
			
				|  |  | +    std::function<void()> call_requester_;
 | 
	
		
			
				|  |  | +    experimental::ServerBidiReactor<RequestType, ResponseType>* reactor_;
 | 
	
		
			
				|  |  | +    std::atomic_int callbacks_outstanding_{
 | 
	
		
			
				|  |  | +        3};  // reserve for OnStarted, Finish, and CompletionOp
 | 
	
		
			
				|  |  |    };
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 |