GRPC C++  1.23.0
server_callback_impl.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
20 
21 #include <atomic>
22 #include <functional>
23 #include <type_traits>
24 
34 
35 namespace grpc_impl {
36 
37 // Declare base class of all reactors as internal
38 namespace internal {
39 
40 // Forward declarations
41 template <class Request, class Response>
43 template <class Request, class Response>
45 template <class Request, class Response>
47 
49  public:
50  virtual ~ServerReactor() = default;
51  virtual void OnDone() = 0;
52  virtual void OnCancel() = 0;
53 
54  private:
56  template <class Request, class Response>
58  template <class Request, class Response>
60  template <class Request, class Response>
61  friend class CallbackBidiHandler;
62 
63  // The ServerReactor is responsible for tracking when it is safe to call
64  // OnCancel. This function should not be called until after OnStarted is done
65  // and the RPC has completed with a cancellation. This is tracked by counting
66  // how many of these conditions have been met and calling OnCancel when none
67  // remain unmet.
68 
69  void MaybeCallOnCancel() {
70  if (GPR_UNLIKELY(on_cancel_conditions_remaining_.fetch_sub(
71  1, std::memory_order_acq_rel) == 1)) {
72  OnCancel();
73  }
74  }
75 
76  std::atomic<intptr_t> on_cancel_conditions_remaining_{2};
77 };
78 
79 template <class Request, class Response>
81  : public ::grpc::experimental::MessageHolder<Request, Response> {
82  public:
84  this->set_request(&request_obj_);
85  this->set_response(&response_obj_);
86  }
87  void Release() override {
88  // the object is allocated in the call arena.
90  }
91 
92  private:
93  Request request_obj_;
94  Response response_obj_;
95 };
96 
97 } // namespace internal
98 
99 namespace experimental {
100 
101 // Forward declarations
102 template <class Request, class Response>
104 template <class Request, class Response>
106 template <class Request, class Response>
108 
109 // For unary RPCs, the exposed controller class is only an interface
110 // and the actual implementation is an internal class.
112  public:
113  virtual ~ServerCallbackRpcController() = default;
114 
115  // The method handler must call this function when it is done so that
116  // the library knows to free its resources
117  virtual void Finish(::grpc::Status s) = 0;
118 
119  // Allow the method handler to push out the initial metadata before
120  // the response and status are ready
121  virtual void SendInitialMetadata(std::function<void(bool)>) = 0;
122 
154  virtual void SetCancelCallback(std::function<void()> callback) = 0;
155  virtual void ClearCancelCallback() = 0;
156 
157  // NOTE: This is an API for advanced users who need custom allocators.
158  // Get and maybe mutate the allocator state associated with the current RPC.
159  virtual grpc::experimental::RpcAllocatorState* GetRpcAllocatorState() = 0;
160 };
161 
162 // NOTE: The actual streaming object classes are provided
163 // as API only to support mocking. There are no implementations of
164 // these class interfaces in the API.
165 template <class Request>
167  public:
169  virtual void Finish(::grpc::Status s) = 0;
170  virtual void SendInitialMetadata() = 0;
171  virtual void Read(Request* msg) = 0;
172 
173  protected:
174  template <class Response>
176  reactor->InternalBindReader(this);
177  }
178 };
179 
180 template <class Response>
182  public:
184 
185  virtual void Finish(::grpc::Status s) = 0;
186  virtual void SendInitialMetadata() = 0;
187  virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
188  virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
189  ::grpc::Status s) {
190  // Default implementation that can/should be overridden
191  Write(msg, std::move(options));
192  Finish(std::move(s));
193  }
194 
195  protected:
196  template <class Request>
198  reactor->InternalBindWriter(this);
199  }
200 };
201 
202 template <class Request, class Response>
204  public:
206 
207  virtual void Finish(::grpc::Status s) = 0;
208  virtual void SendInitialMetadata() = 0;
209  virtual void Read(Request* msg) = 0;
210  virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
211  virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
212  ::grpc::Status s) {
213  // Default implementation that can/should be overridden
214  Write(msg, std::move(options));
215  Finish(std::move(s));
216  }
217 
218  protected:
220  reactor->InternalBindStream(this);
221  }
222 };
223 
224 // The following classes are the reactor interfaces that are to be implemented
225 // by the user, returned as the result of the method handler for a callback
226 // method, and activated by the call to OnStarted. The library guarantees that
227 // OnStarted will be called for any reactor that has been created using a
228 // method handler registered on a service. No operation initiation method may be
229 // called until after the call to OnStarted.
230 // Note that none of the classes are pure; all reactions have a default empty
231 // reaction so that the user class only needs to override those classes that it
232 // cares about.
233 
235 template <class Request, class Response>
237  public:
238  ~ServerBidiReactor() = default;
239 
242 
246  void StartSendInitialMetadata() { stream_->SendInitialMetadata(); }
247 
252  void StartRead(Request* req) { stream_->Read(req); }
253 
259  void StartWrite(const Response* resp) {
260  StartWrite(resp, ::grpc::WriteOptions());
261  }
262 
269  void StartWrite(const Response* resp, ::grpc::WriteOptions options) {
270  stream_->Write(resp, std::move(options));
271  }
272 
286  void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
287  ::grpc::Status s) {
288  stream_->WriteAndFinish(resp, std::move(options), std::move(s));
289  }
290 
299  void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
300  StartWrite(resp, std::move(options.set_last_message()));
301  }
302 
309  void Finish(::grpc::Status s) { stream_->Finish(std::move(s)); }
310 
317  virtual void OnStarted(::grpc_impl::ServerContext* context) {}
318 
325  virtual void OnSendInitialMetadataDone(bool ok) {}
326 
331  virtual void OnReadDone(bool ok) {}
332 
338  virtual void OnWriteDone(bool ok) {}
339 
343  void OnDone() override {}
344 
348  void OnCancel() override {}
349 
350  private:
351  friend class ServerCallbackReaderWriter<Request, Response>;
352  // May be overridden by internal implementation details. This is not a public
353  // customization point.
354  virtual void InternalBindStream(
356  stream_ = stream;
357  }
358 
360 };
361 
363 template <class Request, class Response>
365  public:
366  ~ServerReadReactor() = default;
367 
369  void StartSendInitialMetadata() { reader_->SendInitialMetadata(); }
370  void StartRead(Request* req) { reader_->Read(req); }
371  void Finish(::grpc::Status s) { reader_->Finish(std::move(s)); }
372 
379  virtual void OnStarted(::grpc_impl::ServerContext* context, Response* resp) {}
380 
382  virtual void OnSendInitialMetadataDone(bool ok) {}
383  virtual void OnReadDone(bool ok) {}
384  void OnDone() override {}
385  void OnCancel() override {}
386 
387  private:
388  friend class ServerCallbackReader<Request>;
389  // May be overridden by internal implementation details. This is not a public
390  // customization point.
391  virtual void InternalBindReader(ServerCallbackReader<Request>* reader) {
392  reader_ = reader;
393  }
394 
396 };
397 
399 template <class Request, class Response>
401  public:
402  ~ServerWriteReactor() = default;
403 
405  void StartSendInitialMetadata() { writer_->SendInitialMetadata(); }
406  void StartWrite(const Response* resp) {
407  StartWrite(resp, ::grpc::WriteOptions());
408  }
409  void StartWrite(const Response* resp, ::grpc::WriteOptions options) {
410  writer_->Write(resp, std::move(options));
411  }
412  void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
413  ::grpc::Status s) {
414  writer_->WriteAndFinish(resp, std::move(options), std::move(s));
415  }
416  void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
417  StartWrite(resp, std::move(options.set_last_message()));
418  }
419  void Finish(::grpc::Status s) { writer_->Finish(std::move(s)); }
420 
426  virtual void OnStarted(::grpc_impl::ServerContext* context,
427  const Request* req) {}
428 
430  virtual void OnSendInitialMetadataDone(bool ok) {}
431  virtual void OnWriteDone(bool ok) {}
432  void OnDone() override {}
433  void OnCancel() override {}
434 
435  private:
436  friend class ServerCallbackWriter<Response>;
437  // May be overridden by internal implementation details. This is not a public
438  // customization point.
439  virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) {
440  writer_ = writer;
441  }
442 
444 };
445 
446 } // namespace experimental
447 
448 namespace internal {
449 
450 template <class Request, class Response>
452  : public experimental::ServerReadReactor<Request, Response> {
453  public:
454  void OnDone() override { delete this; }
455  void OnStarted(::grpc_impl::ServerContext*, Response*) override {
456  this->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
457  }
458 };
459 
460 template <class Request, class Response>
462  : public experimental::ServerWriteReactor<Request, Response> {
463  public:
464  void OnDone() override { delete this; }
465  void OnStarted(::grpc_impl::ServerContext*, const Request*) override {
466  this->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
467  }
468 };
469 
470 template <class Request, class Response>
472  : public experimental::ServerBidiReactor<Request, Response> {
473  public:
474  void OnDone() override { delete this; }
476  this->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
477  }
478 };
479 
480 template <class RequestType, class ResponseType>
482  public:
484  std::function<void(::grpc_impl::ServerContext*, const RequestType*,
485  ResponseType*,
487  func)
488  : func_(func) {}
489 
492  allocator) {
493  allocator_ = allocator;
494  }
495 
496  void RunHandler(const HandlerParameter& param) final {
497  // Arena allocate a controller structure (that includes request/response)
498  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
499  auto* allocator_state = static_cast<
501  param.internal_data);
502  auto* controller =
504  param.call->call(), sizeof(ServerCallbackRpcControllerImpl)))
505  ServerCallbackRpcControllerImpl(param.server_context, param.call,
506  allocator_state,
507  std::move(param.call_requester));
508  ::grpc::Status status = param.status;
509  if (status.ok()) {
510  // Call the actual function handler and expect the user to call finish
511  grpc::internal::CatchingCallback(func_, param.server_context,
512  controller->request(),
513  controller->response(), controller);
514  } else {
515  // if deserialization failed, we need to fail the call
516  controller->Finish(status);
517  }
518  }
519 
521  ::grpc::Status* status, void** handler_data) final {
522  grpc::ByteBuffer buf;
523  buf.set_buffer(req);
524  RequestType* request = nullptr;
526  allocator_state = nullptr;
527  if (allocator_ != nullptr) {
528  allocator_state = allocator_->AllocateMessages();
529  } else {
530  allocator_state =
534  }
535  *handler_data = allocator_state;
536  request = allocator_state->request();
537  *status =
539  buf.Release();
540  if (status->ok()) {
541  return request;
542  }
543  // Clean up on deserialization failure.
544  allocator_state->Release();
545  return nullptr;
546  }
547 
548  private:
549  std::function<void(::grpc_impl::ServerContext*, const RequestType*,
551  func_;
553  nullptr;
554 
555  // The implementation class of ServerCallbackRpcController is a private member
556  // of CallbackUnaryHandler since it is never exposed anywhere, and this allows
557  // it to take advantage of CallbackUnaryHandler's friendships.
558  class ServerCallbackRpcControllerImpl
560  public:
561  void Finish(::grpc::Status s) override {
562  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
563  &finish_ops_);
564  if (!ctx_->sent_initial_metadata_) {
565  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
566  ctx_->initial_metadata_flags());
567  if (ctx_->compression_level_set()) {
568  finish_ops_.set_compression_level(ctx_->compression_level());
569  }
570  ctx_->sent_initial_metadata_ = true;
571  }
572  // The response is dropped if the status is not OK.
573  if (s.ok()) {
574  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
575  finish_ops_.SendMessagePtr(response()));
576  } else {
577  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
578  }
579  finish_ops_.set_core_cq_tag(&finish_tag_);
580  call_.PerformOps(&finish_ops_);
581  }
582 
583  void SendInitialMetadata(std::function<void(bool)> f) override {
584  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
585  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
586  // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14
587  // and if performance of this operation matters
588  meta_tag_.Set(call_.call(),
589  [this, f](bool ok) {
590  f(ok);
591  MaybeDone();
592  },
593  &meta_ops_);
594  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
595  ctx_->initial_metadata_flags());
596  if (ctx_->compression_level_set()) {
597  meta_ops_.set_compression_level(ctx_->compression_level());
598  }
599  ctx_->sent_initial_metadata_ = true;
600  meta_ops_.set_core_cq_tag(&meta_tag_);
601  call_.PerformOps(&meta_ops_);
602  }
603 
604  // Neither SetCancelCallback nor ClearCancelCallback should affect the
605  // callbacks_outstanding_ count since they are paired and both must precede
606  // the invocation of Finish (if they are used at all)
607  void SetCancelCallback(std::function<void()> callback) override {
608  ctx_->SetCancelCallback(std::move(callback));
609  }
610 
611  void ClearCancelCallback() override { ctx_->ClearCancelCallback(); }
612 
613  grpc::experimental::RpcAllocatorState* GetRpcAllocatorState() override {
614  return allocator_state_;
615  }
616 
617  private:
618  friend class CallbackUnaryHandler<RequestType, ResponseType>;
619 
620  ServerCallbackRpcControllerImpl(
621  ServerContext* ctx, ::grpc::internal::Call* call,
623  allocator_state,
624  std::function<void()> call_requester)
625  : ctx_(ctx),
626  call_(*call),
627  allocator_state_(allocator_state),
628  call_requester_(std::move(call_requester)) {
629  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr);
630  }
631 
632  const RequestType* request() { return allocator_state_->request(); }
633  ResponseType* response() { return allocator_state_->response(); }
634 
635  void MaybeDone() {
636  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
637  1, std::memory_order_acq_rel) == 1)) {
638  grpc_call* call = call_.call();
639  auto call_requester = std::move(call_requester_);
640  allocator_state_->Release();
641  this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor
643  call_requester();
644  }
645  }
646 
648  meta_ops_;
653  finish_ops_;
655 
657  grpc::internal::Call call_;
659  allocator_state_;
660  std::function<void()> call_requester_;
661  std::atomic<intptr_t> callbacks_outstanding_{
662  2}; // reserve for Finish and CompletionOp
663  };
664 };
665 
666 template <class RequestType, class ResponseType>
668  public:
670  std::function<
672  func)
673  : func_(std::move(func)) {}
674  void RunHandler(const HandlerParameter& param) final {
675  // Arena allocate a reader structure (that includes response)
676  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
677 
679  param.status.ok()
682  func_)
683  : nullptr;
684 
685  if (reactor == nullptr) {
686  // if deserialization or reactor creator failed, we need to fail the call
688  }
689 
691  param.call->call(), sizeof(ServerCallbackReaderImpl)))
692  ServerCallbackReaderImpl(param.server_context, param.call,
693  std::move(param.call_requester), reactor);
694 
695  reader->BindReactor(reactor);
696  reactor->OnStarted(param.server_context, reader->response());
697  // The earliest that OnCancel can be called is after OnStarted is done.
698  reactor->MaybeCallOnCancel();
699  reader->MaybeDone();
700  }
701 
702  private:
703  std::function<experimental::ServerReadReactor<RequestType, ResponseType>*()>
704  func_;
705 
706  class ServerCallbackReaderImpl
707  : public experimental::ServerCallbackReader<RequestType> {
708  public:
709  void Finish(::grpc::Status s) override {
710  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
711  &finish_ops_);
712  if (!ctx_->sent_initial_metadata_) {
713  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
714  ctx_->initial_metadata_flags());
715  if (ctx_->compression_level_set()) {
716  finish_ops_.set_compression_level(ctx_->compression_level());
717  }
718  ctx_->sent_initial_metadata_ = true;
719  }
720  // The response is dropped if the status is not OK.
721  if (s.ok()) {
722  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
723  finish_ops_.SendMessagePtr(&resp_));
724  } else {
725  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
726  }
727  finish_ops_.set_core_cq_tag(&finish_tag_);
728  call_.PerformOps(&finish_ops_);
729  }
730 
731  void SendInitialMetadata() override {
732  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
733  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
734  meta_tag_.Set(call_.call(),
735  [this](bool ok) {
736  reactor_->OnSendInitialMetadataDone(ok);
737  MaybeDone();
738  },
739  &meta_ops_);
740  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
741  ctx_->initial_metadata_flags());
742  if (ctx_->compression_level_set()) {
743  meta_ops_.set_compression_level(ctx_->compression_level());
744  }
745  ctx_->sent_initial_metadata_ = true;
746  meta_ops_.set_core_cq_tag(&meta_tag_);
747  call_.PerformOps(&meta_ops_);
748  }
749 
750  void Read(RequestType* req) override {
751  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
752  read_ops_.RecvMessage(req);
753  call_.PerformOps(&read_ops_);
754  }
755 
756  private:
757  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
758 
759  ServerCallbackReaderImpl(
761  std::function<void()> call_requester,
763  : ctx_(ctx),
764  call_(*call),
765  call_requester_(std::move(call_requester)),
766  reactor_(reactor) {
767  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
768  read_tag_.Set(call_.call(),
769  [this](bool ok) {
770  reactor_->OnReadDone(ok);
771  MaybeDone();
772  },
773  &read_ops_);
774  read_ops_.set_core_cq_tag(&read_tag_);
775  }
776 
777  ~ServerCallbackReaderImpl() {}
778 
779  ResponseType* response() { return &resp_; }
780 
781  void MaybeDone() {
782  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
783  1, std::memory_order_acq_rel) == 1)) {
784  reactor_->OnDone();
785  grpc_call* call = call_.call();
786  auto call_requester = std::move(call_requester_);
787  this->~ServerCallbackReaderImpl(); // explicitly call destructor
789  call_requester();
790  }
791  }
792 
794  meta_ops_;
799  finish_ops_;
802  read_ops_;
804 
806  grpc::internal::Call call_;
807  ResponseType resp_;
808  std::function<void()> call_requester_;
810  std::atomic<intptr_t> callbacks_outstanding_{
811  3}; // reserve for OnStarted, Finish, and CompletionOp
812  };
813 };
814 
815 template <class RequestType, class ResponseType>
817  public:
819  std::function<
821  func)
822  : func_(std::move(func)) {}
823  void RunHandler(const HandlerParameter& param) final {
824  // Arena allocate a writer structure
825  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
826 
828  param.status.ok()
831  func_)
832  : nullptr;
833 
834  if (reactor == nullptr) {
835  // if deserialization or reactor creator failed, we need to fail the call
837  }
838 
840  param.call->call(), sizeof(ServerCallbackWriterImpl)))
841  ServerCallbackWriterImpl(param.server_context, param.call,
842  static_cast<RequestType*>(param.request),
843  std::move(param.call_requester), reactor);
844  writer->BindReactor(reactor);
845  reactor->OnStarted(param.server_context, writer->request());
846  // The earliest that OnCancel can be called is after OnStarted is done.
847  reactor->MaybeCallOnCancel();
848  writer->MaybeDone();
849  }
850 
852  ::grpc::Status* status, void** handler_data) final {
853  ::grpc::ByteBuffer buf;
854  buf.set_buffer(req);
855  auto* request =
857  call, sizeof(RequestType))) RequestType();
858  *status =
860  buf.Release();
861  if (status->ok()) {
862  return request;
863  }
864  request->~RequestType();
865  return nullptr;
866  }
867 
868  private:
869  std::function<experimental::ServerWriteReactor<RequestType, ResponseType>*()>
870  func_;
871 
872  class ServerCallbackWriterImpl
873  : public experimental::ServerCallbackWriter<ResponseType> {
874  public:
875  void Finish(::grpc::Status s) override {
876  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
877  &finish_ops_);
878  finish_ops_.set_core_cq_tag(&finish_tag_);
879 
880  if (!ctx_->sent_initial_metadata_) {
881  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
882  ctx_->initial_metadata_flags());
883  if (ctx_->compression_level_set()) {
884  finish_ops_.set_compression_level(ctx_->compression_level());
885  }
886  ctx_->sent_initial_metadata_ = true;
887  }
888  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
889  call_.PerformOps(&finish_ops_);
890  }
891 
892  void SendInitialMetadata() override {
893  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
894  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
895  meta_tag_.Set(call_.call(),
896  [this](bool ok) {
897  reactor_->OnSendInitialMetadataDone(ok);
898  MaybeDone();
899  },
900  &meta_ops_);
901  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
902  ctx_->initial_metadata_flags());
903  if (ctx_->compression_level_set()) {
904  meta_ops_.set_compression_level(ctx_->compression_level());
905  }
906  ctx_->sent_initial_metadata_ = true;
907  meta_ops_.set_core_cq_tag(&meta_tag_);
908  call_.PerformOps(&meta_ops_);
909  }
910 
911  void Write(const ResponseType* resp,
912  ::grpc::WriteOptions options) override {
913  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
914  if (options.is_last_message()) {
915  options.set_buffer_hint();
916  }
917  if (!ctx_->sent_initial_metadata_) {
918  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
919  ctx_->initial_metadata_flags());
920  if (ctx_->compression_level_set()) {
921  write_ops_.set_compression_level(ctx_->compression_level());
922  }
923  ctx_->sent_initial_metadata_ = true;
924  }
925  // TODO(vjpai): don't assert
926  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
927  call_.PerformOps(&write_ops_);
928  }
929 
930  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
931  ::grpc::Status s) override {
932  // This combines the write into the finish callback
933  // Don't send any message if the status is bad
934  if (s.ok()) {
935  // TODO(vjpai): don't assert
936  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
937  }
938  Finish(std::move(s));
939  }
940 
941  private:
942  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
943 
944  ServerCallbackWriterImpl(
946  const RequestType* req, std::function<void()> call_requester,
948  : ctx_(ctx),
949  call_(*call),
950  req_(req),
951  call_requester_(std::move(call_requester)),
952  reactor_(reactor) {
953  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
954  write_tag_.Set(call_.call(),
955  [this](bool ok) {
956  reactor_->OnWriteDone(ok);
957  MaybeDone();
958  },
959  &write_ops_);
960  write_ops_.set_core_cq_tag(&write_tag_);
961  }
962  ~ServerCallbackWriterImpl() { req_->~RequestType(); }
963 
964  const RequestType* request() { return req_; }
965 
966  void MaybeDone() {
967  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
968  1, std::memory_order_acq_rel) == 1)) {
969  reactor_->OnDone();
970  grpc_call* call = call_.call();
971  auto call_requester = std::move(call_requester_);
972  this->~ServerCallbackWriterImpl(); // explicitly call destructor
974  call_requester();
975  }
976  }
977 
979  meta_ops_;
984  finish_ops_;
986  grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
987  grpc::internal::CallOpSendMessage>
988  write_ops_;
990 
992  grpc::internal::Call call_;
993  const RequestType* req_;
994  std::function<void()> call_requester_;
996  std::atomic<intptr_t> callbacks_outstanding_{
997  3}; // reserve for OnStarted, Finish, and CompletionOp
998  };
999 };
1000 
1001 template <class RequestType, class ResponseType>
1003  public:
1005  std::function<
1007  func)
1008  : func_(std::move(func)) {}
1009  void RunHandler(const HandlerParameter& param) final {
1010  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
1011 
1013  param.status.ok()
1016  func_)
1017  : nullptr;
1018 
1019  if (reactor == nullptr) {
1020  // if deserialization or reactor creator failed, we need to fail the call
1022  }
1023 
1025  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
1026  ServerCallbackReaderWriterImpl(param.server_context, param.call,
1027  std::move(param.call_requester),
1028  reactor);
1029 
1030  stream->BindReactor(reactor);
1031  reactor->OnStarted(param.server_context);
1032  // The earliest that OnCancel can be called is after OnStarted is done.
1033  reactor->MaybeCallOnCancel();
1034  stream->MaybeDone();
1035  }
1036 
1037  private:
1038  std::function<experimental::ServerBidiReactor<RequestType, ResponseType>*()>
1039  func_;
1040 
1041  class ServerCallbackReaderWriterImpl
1042  : public experimental::ServerCallbackReaderWriter<RequestType,
1043  ResponseType> {
1044  public:
1045  void Finish(::grpc::Status s) override {
1046  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
1047  &finish_ops_);
1048  finish_ops_.set_core_cq_tag(&finish_tag_);
1049 
1050  if (!ctx_->sent_initial_metadata_) {
1051  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1052  ctx_->initial_metadata_flags());
1053  if (ctx_->compression_level_set()) {
1054  finish_ops_.set_compression_level(ctx_->compression_level());
1055  }
1056  ctx_->sent_initial_metadata_ = true;
1057  }
1058  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
1059  call_.PerformOps(&finish_ops_);
1060  }
1061 
1062  void SendInitialMetadata() override {
1063  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
1064  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
1065  meta_tag_.Set(call_.call(),
1066  [this](bool ok) {
1067  reactor_->OnSendInitialMetadataDone(ok);
1068  MaybeDone();
1069  },
1070  &meta_ops_);
1071  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1072  ctx_->initial_metadata_flags());
1073  if (ctx_->compression_level_set()) {
1074  meta_ops_.set_compression_level(ctx_->compression_level());
1075  }
1076  ctx_->sent_initial_metadata_ = true;
1077  meta_ops_.set_core_cq_tag(&meta_tag_);
1078  call_.PerformOps(&meta_ops_);
1079  }
1080 
1081  void Write(const ResponseType* resp,
1082  ::grpc::WriteOptions options) override {
1083  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
1084  if (options.is_last_message()) {
1085  options.set_buffer_hint();
1086  }
1087  if (!ctx_->sent_initial_metadata_) {
1088  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1089  ctx_->initial_metadata_flags());
1090  if (ctx_->compression_level_set()) {
1091  write_ops_.set_compression_level(ctx_->compression_level());
1092  }
1093  ctx_->sent_initial_metadata_ = true;
1094  }
1095  // TODO(vjpai): don't assert
1096  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
1097  call_.PerformOps(&write_ops_);
1098  }
1099 
1100  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
1101  ::grpc::Status s) override {
1102  // Don't send any message if the status is bad
1103  if (s.ok()) {
1104  // TODO(vjpai): don't assert
1105  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
1106  }
1107  Finish(std::move(s));
1108  }
1109 
1110  void Read(RequestType* req) override {
1111  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
1112  read_ops_.RecvMessage(req);
1113  call_.PerformOps(&read_ops_);
1114  }
1115 
1116  private:
1117  friend class CallbackBidiHandler<RequestType, ResponseType>;
1118 
1119  ServerCallbackReaderWriterImpl(
1121  std::function<void()> call_requester,
1123  : ctx_(ctx),
1124  call_(*call),
1125  call_requester_(std::move(call_requester)),
1126  reactor_(reactor) {
1127  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
1128  write_tag_.Set(call_.call(),
1129  [this](bool ok) {
1130  reactor_->OnWriteDone(ok);
1131  MaybeDone();
1132  },
1133  &write_ops_);
1134  write_ops_.set_core_cq_tag(&write_tag_);
1135  read_tag_.Set(call_.call(),
1136  [this](bool ok) {
1137  reactor_->OnReadDone(ok);
1138  MaybeDone();
1139  },
1140  &read_ops_);
1141  read_ops_.set_core_cq_tag(&read_tag_);
1142  }
1143  ~ServerCallbackReaderWriterImpl() {}
1144 
1145  void MaybeDone() {
1146  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1147  1, std::memory_order_acq_rel) == 1)) {
1148  reactor_->OnDone();
1149  grpc_call* call = call_.call();
1150  auto call_requester = std::move(call_requester_);
1151  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
1153  call_requester();
1154  }
1155  }
1156 
1158  meta_ops_;
1163  finish_ops_;
1165  grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1166  grpc::internal::CallOpSendMessage>
1167  write_ops_;
1170  read_ops_;
1172 
1174  grpc::internal::Call call_;
1175  std::function<void()> call_requester_;
1177  std::atomic<intptr_t> callbacks_outstanding_{
1178  3}; // reserve for OnStarted, Finish, and CompletionOp
1179  };
1180 };
1181 
1182 } // namespace internal
1183 
1184 } // namespace grpc_impl
1185 
1186 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback_impl.h:103
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_impl.h:496
void OnStarted(::grpc_impl::ServerContext *) override
Notify the application that a streaming RPC has started and that it is now ok to call any operation i...
Definition: server_callback_impl.h:475
void OnStarted(::grpc_impl::ServerContext *, Response *) override
Similar to ServerBidiReactor::OnStarted, except that this also provides the response object that the ...
Definition: server_callback_impl.h:455
void StartWrite(const Response *resp, ::grpc::WriteOptions options)
Initiate a write operation with specified options.
Definition: server_callback_impl.h:269
void StartRead(Request *req)
Initiate a read operation.
Definition: server_callback_impl.h:252
virtual void WriteAndFinish(const Response *msg, ::grpc::WriteOptions options, ::grpc::Status s)
Definition: server_callback_impl.h:188
virtual void OnStarted(::grpc_impl::ServerContext *context, Response *resp)
Similar to ServerBidiReactor::OnStarted, except that this also provides the response object that the ...
Definition: server_callback_impl.h:379
virtual ~ServerCallbackWriter()
Definition: server_callback_impl.h:183
void BindReactor(ServerReadReactor< Request, Response > *reactor)
Definition: server_callback_impl.h:175
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:145
virtual void grpc_call_ref(grpc_call *call)=0
virtual ~ServerCallbackReaderWriter()
Definition: server_callback_impl.h:205
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback_impl.h:105
Definition: server_callback_impl.h:80
Definition: server_callback_impl.h:471
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_impl.h:674
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **handler_data) final
Definition: server_callback_impl.h:851
void OnCancel() override
Notifies the application that this RPC has been cancelled.
Definition: server_callback_impl.h:348
virtual void OnStarted(::grpc_impl::ServerContext *context, const Request *req)
Similar to ServerBidiReactor::OnStarted, except that this also provides the request object sent by th...
Definition: server_callback_impl.h:426
void SetMessageAllocator(::grpc::experimental::MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_impl.h:490
void StartSendInitialMetadata()
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback_impl.h:405
#define GPR_UNLIKELY(x)
Definition: port_platform.h:654
virtual void grpc_call_unref(grpc_call *call)=0
Definition: server_callback_impl.h:181
Definition: server_callback_impl.h:203
Definition: async_unary_call_impl.h:302
void StartWrite(const Response *resp)
Initiate a write operation.
Definition: server_callback_impl.h:259
void Finish(::grpc::Status s)
Definition: server_callback_impl.h:419
virtual void OnReadDone(bool ok)
Definition: server_callback_impl.h:383
void OnCancel() override
Definition: server_callback_impl.h:433
Definition: grpc_types.h:40
void StartSendInitialMetadata()
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback_impl.h:369
virtual void WriteAndFinish(const Response *msg, ::grpc::WriteOptions options, ::grpc::Status s)
Definition: server_callback_impl.h:211
grpc_call * call() const
Definition: call.h:72
void Finish(::grpc::Status s)
Definition: server_callback_impl.h:371
::grpc_impl::ServerContext ServerContext
Definition: server_context.h:25
A ServerContext allows the person implementing a service handler to:
Definition: server_context_impl.h:118
void OnDone() override
Definition: server_callback_impl.h:454
void StartWriteAndFinish(const Response *resp, ::grpc::WriteOptions options, ::grpc::Status s)
Initiate a write operation with specified options and final RPC Status, which also causes any trailin...
Definition: server_callback_impl.h:286
Definition: server_callback_impl.h:461
void BindReactor(ServerWriteReactor< Request, Response > *reactor)
Definition: server_callback_impl.h:197
void OnDone() override
Notifies the application that all operations associated with this RPC have completed.
Definition: server_callback_impl.h:474
void StartWrite(const Response *resp, ::grpc::WriteOptions options)
Definition: server_callback_impl.h:409
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
virtual ~ServerCallbackReader()
Definition: server_callback_impl.h:168
CallbackServerStreamingHandler(std::function< experimental::ServerWriteReactor< RequestType, ResponseType > *()> func)
Definition: server_callback_impl.h:818
void StartWriteLast(const Response *resp, ::grpc::WriteOptions options)
Inform system of a planned write operation with specified options, but allow the library to schedule ...
Definition: server_callback_impl.h:299
ResponseT * response()
Definition: message_allocator.h:47
Definition: call_op_set.h:629
RequestT * request()
Definition: message_allocator.h:46
CallbackBidiHandler(std::function< experimental::ServerBidiReactor< RequestType, ResponseType > *()> func)
Definition: server_callback_impl.h:1004
void StartRead(Request *req)
Definition: server_callback_impl.h:370
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_impl.h:823
Definition: call_op_set.h:218
virtual void OnStarted(::grpc_impl::ServerContext *context)
Notify the application that a streaming RPC has started and that it is now ok to call any operation i...
Definition: server_callback_impl.h:317
void OnCancel() override
Definition: server_callback_impl.h:385
void StartSendInitialMetadata()
Do NOT call any operation initiation method (names that start with Start) until after the library has...
Definition: server_callback_impl.h:246
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:151
virtual void OnSendInitialMetadataDone(bool ok)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback_impl.h:382
Definition: call_op_set.h:288
Definition: server_callback_impl.h:111
void OnDone() override
Definition: server_callback_impl.h:384
void BindReactor(ServerBidiReactor< Request, Response > *reactor)
Definition: server_callback_impl.h:219
Definition: server_callback_impl.h:166
virtual void OnSendInitialMetadataDone(bool ok)
Notifies the application that an explicit StartSendInitialMetadata operation completed.
Definition: server_callback_impl.h:325
Definition: byte_buffer.h:36
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue_impl.h:91
Definition: message_allocator.h:40
virtual void OnSendInitialMetadataDone(bool ok)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback_impl.h:430
void StartWriteLast(const Response *resp, ::grpc::WriteOptions options)
Definition: server_callback_impl.h:416
ReturnType * CatchingReactorCreator(Func &&func, Args &&... args)
Definition: callback_common.h:51
Definition: rpc_service_method.h:44
virtual void OnReadDone(bool ok)
Notifies the application that a StartRead operation completed.
Definition: server_callback_impl.h:331
DefaultMessageHolder()
Definition: server_callback_impl.h:83
Per-message write options.
Definition: call_op_set.h:85
void OnDone() override
Notifies the application that all operations associated with this RPC have completed.
Definition: server_callback_impl.h:343
void OnDone() override
Definition: server_callback_impl.h:432
CallbackUnaryHandler(std::function< void(::grpc_impl::ServerContext *, const RequestType *, ResponseType *, experimental::ServerCallbackRpcController *)> func)
Definition: server_callback_impl.h:483
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
Definition: server_callback_impl.h:46
void Finish(::grpc::Status s)
Indicate that the stream is to be finished and the trailing metadata and RPC status are to be sent...
Definition: server_callback_impl.h:309
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
void Release() override
Definition: server_callback_impl.h:87
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
bool ok() const
Is the status OK?
Definition: status.h:118
virtual void OnWriteDone(bool ok)
Definition: server_callback_impl.h:431
Base class for running an RPC handler.
Definition: rpc_service_method.h:41
void StartWrite(const Response *resp)
Definition: server_callback_impl.h:406
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **handler_data) final
Definition: server_callback_impl.h:520
Did it work? If it didn&#39;t, why?
Definition: status.h:31
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
void CatchingCallback(Func &&func, Args &&... args)
An exception-safe way of invoking a user-specified callback function.
Definition: callback_common.h:38
void OnDone() override
Definition: server_callback_impl.h:464
Definition: message_allocator.h:27
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
virtual void SendInitialMetadata(std::function< void(bool)>)=0
Definition: server_callback_impl.h:42
virtual void OnWriteDone(bool ok)
Notifies the application that a StartWrite (or StartWriteLast) operation completed.
Definition: server_callback_impl.h:338
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_impl.h:1009
CallbackClientStreamingHandler(std::function< experimental::ServerReadReactor< RequestType, ResponseType > *()> func)
Definition: server_callback_impl.h:669
Definition: server_callback_impl.h:48
void StartWriteAndFinish(const Response *resp, ::grpc::WriteOptions options, ::grpc::Status s)
Definition: server_callback_impl.h:412
void OnStarted(::grpc_impl::ServerContext *, const Request *) override
Similar to ServerBidiReactor::OnStarted, except that this also provides the request object sent by th...
Definition: server_callback_impl.h:465
A sequence of bytes.
Definition: byte_buffer.h:72
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback_impl.h:107
Definition: server_callback_impl.h:451
Straightforward wrapping of the C call object.
Definition: call.h:38