GRPC C++  1.24.0
server_callback_impl.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
20 
21 #include <atomic>
22 #include <functional>
23 #include <type_traits>
24 
34 
35 namespace grpc_impl {
36 
37 // Declare base class of all reactors as internal
38 namespace internal {
39 
40 // Forward declarations
41 template <class Request, class Response>
43 template <class Request, class Response>
45 template <class Request, class Response>
47 
49  public:
50  virtual ~ServerReactor() = default;
51  virtual void OnDone() = 0;
52  virtual void OnCancel() = 0;
53 
54  private:
56  template <class Request, class Response>
58  template <class Request, class Response>
60  template <class Request, class Response>
61  friend class CallbackBidiHandler;
62 
63  // The ServerReactor is responsible for tracking when it is safe to call
64  // OnCancel. This function should not be called until after OnStarted is done
65  // and the RPC has completed with a cancellation. This is tracked by counting
66  // how many of these conditions have been met and calling OnCancel when none
67  // remain unmet.
68 
69  void MaybeCallOnCancel() {
70  if (GPR_UNLIKELY(on_cancel_conditions_remaining_.fetch_sub(
71  1, std::memory_order_acq_rel) == 1)) {
72  OnCancel();
73  }
74  }
75 
76  std::atomic<intptr_t> on_cancel_conditions_remaining_{2};
77 };
78 
79 template <class Request, class Response>
81  : public ::grpc::experimental::MessageHolder<Request, Response> {
82  public:
84  this->set_request(&request_obj_);
85  this->set_response(&response_obj_);
86  }
87  void Release() override {
88  // the object is allocated in the call arena.
90  }
91 
92  private:
93  Request request_obj_;
94  Response response_obj_;
95 };
96 
97 } // namespace internal
98 
99 namespace experimental {
100 
101 // Forward declarations
102 template <class Request, class Response>
104 template <class Request, class Response>
106 template <class Request, class Response>
108 
109 // For unary RPCs, the exposed controller class is only an interface
110 // and the actual implementation is an internal class.
112  public:
113  virtual ~ServerCallbackRpcController() = default;
114 
115  // The method handler must call this function when it is done so that
116  // the library knows to free its resources
117  virtual void Finish(::grpc::Status s) = 0;
118 
119  // Allow the method handler to push out the initial metadata before
120  // the response and status are ready
121  virtual void SendInitialMetadata(std::function<void(bool)>) = 0;
122 
154  virtual void SetCancelCallback(std::function<void()> callback) = 0;
155  virtual void ClearCancelCallback() = 0;
156 
157  // NOTE: This is an API for advanced users who need custom allocators.
158  // Get and maybe mutate the allocator state associated with the current RPC.
159  virtual grpc::experimental::RpcAllocatorState* GetRpcAllocatorState() = 0;
160 };
161 
162 // NOTE: The actual streaming object classes are provided
163 // as API only to support mocking. There are no implementations of
164 // these class interfaces in the API.
165 template <class Request>
167  public:
169  virtual void Finish(::grpc::Status s) = 0;
170  virtual void SendInitialMetadata() = 0;
171  virtual void Read(Request* msg) = 0;
172 
173  protected:
174  template <class Response>
176  reactor->InternalBindReader(this);
177  }
178 };
179 
180 template <class Response>
182  public:
184 
185  virtual void Finish(::grpc::Status s) = 0;
186  virtual void SendInitialMetadata() = 0;
187  virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
188  virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
189  ::grpc::Status s) {
190  // Default implementation that can/should be overridden
191  Write(msg, std::move(options));
192  Finish(std::move(s));
193  }
194 
195  protected:
196  template <class Request>
198  reactor->InternalBindWriter(this);
199  }
200 };
201 
202 template <class Request, class Response>
204  public:
206 
207  virtual void Finish(::grpc::Status s) = 0;
208  virtual void SendInitialMetadata() = 0;
209  virtual void Read(Request* msg) = 0;
210  virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
211  virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
212  ::grpc::Status s) {
213  // Default implementation that can/should be overridden
214  Write(msg, std::move(options));
215  Finish(std::move(s));
216  }
217 
218  protected:
220  reactor->InternalBindStream(this);
221  }
222 };
223 
224 // The following classes are the reactor interfaces that are to be implemented
225 // by the user, returned as the result of the method handler for a callback
226 // method, and activated by the call to OnStarted. The library guarantees that
227 // OnStarted will be called for any reactor that has been created using a
228 // method handler registered on a service. No operation initiation method may be
229 // called until after the call to OnStarted.
230 // Note that none of the classes are pure; all reactions have a default empty
231 // reaction so that the user class only needs to override those classes that it
232 // cares about.
233 
235 template <class Request, class Response>
237  public:
238  ~ServerBidiReactor() = default;
239 
242 
246  void StartSendInitialMetadata() { stream_->SendInitialMetadata(); }
247 
252  void StartRead(Request* req) { stream_->Read(req); }
253 
259  void StartWrite(const Response* resp) {
260  StartWrite(resp, ::grpc::WriteOptions());
261  }
262 
269  void StartWrite(const Response* resp, ::grpc::WriteOptions options) {
270  stream_->Write(resp, std::move(options));
271  }
272 
286  void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
287  ::grpc::Status s) {
288  stream_->WriteAndFinish(resp, std::move(options), std::move(s));
289  }
290 
299  void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
300  StartWrite(resp, std::move(options.set_last_message()));
301  }
302 
309  void Finish(::grpc::Status s) { stream_->Finish(std::move(s)); }
310 
317  virtual void OnStarted(::grpc_impl::ServerContext* /*context*/) {}
318 
325  virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
326 
331  virtual void OnReadDone(bool /*ok*/) {}
332 
338  virtual void OnWriteDone(bool /*ok*/) {}
339 
343  void OnDone() override {}
344 
348  void OnCancel() override {}
349 
350  private:
351  friend class ServerCallbackReaderWriter<Request, Response>;
352  // May be overridden by internal implementation details. This is not a public
353  // customization point.
354  virtual void InternalBindStream(
356  stream_ = stream;
357  }
358 
360 };
361 
363 template <class Request, class Response>
365  public:
366  ~ServerReadReactor() = default;
367 
369  void StartSendInitialMetadata() { reader_->SendInitialMetadata(); }
370  void StartRead(Request* req) { reader_->Read(req); }
371  void Finish(::grpc::Status s) { reader_->Finish(std::move(s)); }
372 
379  virtual void OnStarted(::grpc_impl::ServerContext* /*context*/,
380  Response* /*resp*/) {}
381 
383  virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
384  virtual void OnReadDone(bool /*ok*/) {}
385  void OnDone() override {}
386  void OnCancel() override {}
387 
388  private:
389  friend class ServerCallbackReader<Request>;
390  // May be overridden by internal implementation details. This is not a public
391  // customization point.
392  virtual void InternalBindReader(ServerCallbackReader<Request>* reader) {
393  reader_ = reader;
394  }
395 
397 };
398 
400 template <class Request, class Response>
402  public:
403  ~ServerWriteReactor() = default;
404 
406  void StartSendInitialMetadata() { writer_->SendInitialMetadata(); }
407  void StartWrite(const Response* resp) {
408  StartWrite(resp, ::grpc::WriteOptions());
409  }
410  void StartWrite(const Response* resp, ::grpc::WriteOptions options) {
411  writer_->Write(resp, std::move(options));
412  }
413  void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
414  ::grpc::Status s) {
415  writer_->WriteAndFinish(resp, std::move(options), std::move(s));
416  }
417  void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
418  StartWrite(resp, std::move(options.set_last_message()));
419  }
420  void Finish(::grpc::Status s) { writer_->Finish(std::move(s)); }
421 
427  virtual void OnStarted(::grpc_impl::ServerContext* /*context*/,
428  const Request* /*req*/) {}
429 
431  virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
432  virtual void OnWriteDone(bool /*ok*/) {}
433  void OnDone() override {}
434  void OnCancel() override {}
435 
436  private:
437  friend class ServerCallbackWriter<Response>;
438  // May be overridden by internal implementation details. This is not a public
439  // customization point.
440  virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) {
441  writer_ = writer;
442  }
443 
445 };
446 
447 } // namespace experimental
448 
449 namespace internal {
450 
451 template <class Request, class Response>
453  : public experimental::ServerReadReactor<Request, Response> {
454  public:
455  void OnDone() override { delete this; }
456  void OnStarted(::grpc_impl::ServerContext*, Response*) override {
457  this->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
458  }
459 };
460 
461 template <class Request, class Response>
463  : public experimental::ServerWriteReactor<Request, Response> {
464  public:
465  void OnDone() override { delete this; }
466  void OnStarted(::grpc_impl::ServerContext*, const Request*) override {
467  this->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
468  }
469 };
470 
471 template <class Request, class Response>
473  : public experimental::ServerBidiReactor<Request, Response> {
474  public:
475  void OnDone() override { delete this; }
477  this->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
478  }
479 };
480 
481 template <class RequestType, class ResponseType>
483  public:
485  std::function<void(::grpc_impl::ServerContext*, const RequestType*,
486  ResponseType*,
488  func)
489  : func_(func) {}
490 
493  allocator) {
494  allocator_ = allocator;
495  }
496 
497  void RunHandler(const HandlerParameter& param) final {
498  // Arena allocate a controller structure (that includes request/response)
499  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
500  auto* allocator_state = static_cast<
502  param.internal_data);
503  auto* controller =
505  param.call->call(), sizeof(ServerCallbackRpcControllerImpl)))
506  ServerCallbackRpcControllerImpl(param.server_context, param.call,
507  allocator_state,
508  std::move(param.call_requester));
509  ::grpc::Status status = param.status;
510  if (status.ok()) {
511  // Call the actual function handler and expect the user to call finish
512  grpc::internal::CatchingCallback(func_, param.server_context,
513  controller->request(),
514  controller->response(), controller);
515  } else {
516  // if deserialization failed, we need to fail the call
517  controller->Finish(status);
518  }
519  }
520 
522  ::grpc::Status* status, void** handler_data) final {
523  grpc::ByteBuffer buf;
524  buf.set_buffer(req);
525  RequestType* request = nullptr;
527  allocator_state = nullptr;
528  if (allocator_ != nullptr) {
529  allocator_state = allocator_->AllocateMessages();
530  } else {
531  allocator_state =
535  }
536  *handler_data = allocator_state;
537  request = allocator_state->request();
538  *status =
540  buf.Release();
541  if (status->ok()) {
542  return request;
543  }
544  // Clean up on deserialization failure.
545  allocator_state->Release();
546  return nullptr;
547  }
548 
549  private:
550  std::function<void(::grpc_impl::ServerContext*, const RequestType*,
552  func_;
554  nullptr;
555 
556  // The implementation class of ServerCallbackRpcController is a private member
557  // of CallbackUnaryHandler since it is never exposed anywhere, and this allows
558  // it to take advantage of CallbackUnaryHandler's friendships.
559  class ServerCallbackRpcControllerImpl
561  public:
562  void Finish(::grpc::Status s) override {
563  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
564  &finish_ops_);
565  if (!ctx_->sent_initial_metadata_) {
566  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
567  ctx_->initial_metadata_flags());
568  if (ctx_->compression_level_set()) {
569  finish_ops_.set_compression_level(ctx_->compression_level());
570  }
571  ctx_->sent_initial_metadata_ = true;
572  }
573  // The response is dropped if the status is not OK.
574  if (s.ok()) {
575  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
576  finish_ops_.SendMessagePtr(response()));
577  } else {
578  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
579  }
580  finish_ops_.set_core_cq_tag(&finish_tag_);
581  call_.PerformOps(&finish_ops_);
582  }
583 
584  void SendInitialMetadata(std::function<void(bool)> f) override {
585  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
586  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
587  // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14
588  // and if performance of this operation matters
589  meta_tag_.Set(call_.call(),
590  [this, f](bool ok) {
591  f(ok);
592  MaybeDone();
593  },
594  &meta_ops_);
595  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
596  ctx_->initial_metadata_flags());
597  if (ctx_->compression_level_set()) {
598  meta_ops_.set_compression_level(ctx_->compression_level());
599  }
600  ctx_->sent_initial_metadata_ = true;
601  meta_ops_.set_core_cq_tag(&meta_tag_);
602  call_.PerformOps(&meta_ops_);
603  }
604 
605  // Neither SetCancelCallback nor ClearCancelCallback should affect the
606  // callbacks_outstanding_ count since they are paired and both must precede
607  // the invocation of Finish (if they are used at all)
608  void SetCancelCallback(std::function<void()> callback) override {
609  ctx_->SetCancelCallback(std::move(callback));
610  }
611 
612  void ClearCancelCallback() override { ctx_->ClearCancelCallback(); }
613 
614  grpc::experimental::RpcAllocatorState* GetRpcAllocatorState() override {
615  return allocator_state_;
616  }
617 
618  private:
619  friend class CallbackUnaryHandler<RequestType, ResponseType>;
620 
621  ServerCallbackRpcControllerImpl(
622  ServerContext* ctx, ::grpc::internal::Call* call,
624  allocator_state,
625  std::function<void()> call_requester)
626  : ctx_(ctx),
627  call_(*call),
628  allocator_state_(allocator_state),
629  call_requester_(std::move(call_requester)) {
630  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr);
631  }
632 
633  const RequestType* request() { return allocator_state_->request(); }
634  ResponseType* response() { return allocator_state_->response(); }
635 
636  void MaybeDone() {
637  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
638  1, std::memory_order_acq_rel) == 1)) {
639  grpc_call* call = call_.call();
640  auto call_requester = std::move(call_requester_);
641  allocator_state_->Release();
642  this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor
644  call_requester();
645  }
646  }
647 
649  meta_ops_;
654  finish_ops_;
656 
658  grpc::internal::Call call_;
660  allocator_state_;
661  std::function<void()> call_requester_;
662  std::atomic<intptr_t> callbacks_outstanding_{
663  2}; // reserve for Finish and CompletionOp
664  };
665 };
666 
667 template <class RequestType, class ResponseType>
669  public:
671  std::function<
673  func)
674  : func_(std::move(func)) {}
675  void RunHandler(const HandlerParameter& param) final {
676  // Arena allocate a reader structure (that includes response)
677  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
678 
680  param.status.ok()
683  func_)
684  : nullptr;
685 
686  if (reactor == nullptr) {
687  // if deserialization or reactor creator failed, we need to fail the call
689  }
690 
692  param.call->call(), sizeof(ServerCallbackReaderImpl)))
693  ServerCallbackReaderImpl(param.server_context, param.call,
694  std::move(param.call_requester), reactor);
695 
696  reader->BindReactor(reactor);
697  reactor->OnStarted(param.server_context, reader->response());
698  // The earliest that OnCancel can be called is after OnStarted is done.
699  reactor->MaybeCallOnCancel();
700  reader->MaybeDone();
701  }
702 
703  private:
704  std::function<experimental::ServerReadReactor<RequestType, ResponseType>*()>
705  func_;
706 
707  class ServerCallbackReaderImpl
708  : public experimental::ServerCallbackReader<RequestType> {
709  public:
710  void Finish(::grpc::Status s) override {
711  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
712  &finish_ops_);
713  if (!ctx_->sent_initial_metadata_) {
714  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
715  ctx_->initial_metadata_flags());
716  if (ctx_->compression_level_set()) {
717  finish_ops_.set_compression_level(ctx_->compression_level());
718  }
719  ctx_->sent_initial_metadata_ = true;
720  }
721  // The response is dropped if the status is not OK.
722  if (s.ok()) {
723  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
724  finish_ops_.SendMessagePtr(&resp_));
725  } else {
726  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
727  }
728  finish_ops_.set_core_cq_tag(&finish_tag_);
729  call_.PerformOps(&finish_ops_);
730  }
731 
732  void SendInitialMetadata() override {
733  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
734  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
735  meta_tag_.Set(call_.call(),
736  [this](bool ok) {
737  reactor_->OnSendInitialMetadataDone(ok);
738  MaybeDone();
739  },
740  &meta_ops_);
741  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
742  ctx_->initial_metadata_flags());
743  if (ctx_->compression_level_set()) {
744  meta_ops_.set_compression_level(ctx_->compression_level());
745  }
746  ctx_->sent_initial_metadata_ = true;
747  meta_ops_.set_core_cq_tag(&meta_tag_);
748  call_.PerformOps(&meta_ops_);
749  }
750 
751  void Read(RequestType* req) override {
752  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
753  read_ops_.RecvMessage(req);
754  call_.PerformOps(&read_ops_);
755  }
756 
757  private:
758  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
759 
760  ServerCallbackReaderImpl(
762  std::function<void()> call_requester,
764  : ctx_(ctx),
765  call_(*call),
766  call_requester_(std::move(call_requester)),
767  reactor_(reactor) {
768  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
769  read_tag_.Set(call_.call(),
770  [this](bool ok) {
771  reactor_->OnReadDone(ok);
772  MaybeDone();
773  },
774  &read_ops_);
775  read_ops_.set_core_cq_tag(&read_tag_);
776  }
777 
778  ~ServerCallbackReaderImpl() {}
779 
780  ResponseType* response() { return &resp_; }
781 
782  void MaybeDone() {
783  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
784  1, std::memory_order_acq_rel) == 1)) {
785  reactor_->OnDone();
786  grpc_call* call = call_.call();
787  auto call_requester = std::move(call_requester_);
788  this->~ServerCallbackReaderImpl(); // explicitly call destructor
790  call_requester();
791  }
792  }
793 
795  meta_ops_;
800  finish_ops_;
803  read_ops_;
805 
807  grpc::internal::Call call_;
808  ResponseType resp_;
809  std::function<void()> call_requester_;
811  std::atomic<intptr_t> callbacks_outstanding_{
812  3}; // reserve for OnStarted, Finish, and CompletionOp
813  };
814 };
815 
816 template <class RequestType, class ResponseType>
818  public:
820  std::function<
822  func)
823  : func_(std::move(func)) {}
824  void RunHandler(const HandlerParameter& param) final {
825  // Arena allocate a writer structure
826  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
827 
829  param.status.ok()
832  func_)
833  : nullptr;
834 
835  if (reactor == nullptr) {
836  // if deserialization or reactor creator failed, we need to fail the call
838  }
839 
841  param.call->call(), sizeof(ServerCallbackWriterImpl)))
842  ServerCallbackWriterImpl(param.server_context, param.call,
843  static_cast<RequestType*>(param.request),
844  std::move(param.call_requester), reactor);
845  writer->BindReactor(reactor);
846  reactor->OnStarted(param.server_context, writer->request());
847  // The earliest that OnCancel can be called is after OnStarted is done.
848  reactor->MaybeCallOnCancel();
849  writer->MaybeDone();
850  }
851 
853  ::grpc::Status* status, void** /*handler_data*/) final {
854  ::grpc::ByteBuffer buf;
855  buf.set_buffer(req);
856  auto* request =
858  call, sizeof(RequestType))) RequestType();
859  *status =
861  buf.Release();
862  if (status->ok()) {
863  return request;
864  }
865  request->~RequestType();
866  return nullptr;
867  }
868 
869  private:
870  std::function<experimental::ServerWriteReactor<RequestType, ResponseType>*()>
871  func_;
872 
873  class ServerCallbackWriterImpl
874  : public experimental::ServerCallbackWriter<ResponseType> {
875  public:
876  void Finish(::grpc::Status s) override {
877  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
878  &finish_ops_);
879  finish_ops_.set_core_cq_tag(&finish_tag_);
880 
881  if (!ctx_->sent_initial_metadata_) {
882  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
883  ctx_->initial_metadata_flags());
884  if (ctx_->compression_level_set()) {
885  finish_ops_.set_compression_level(ctx_->compression_level());
886  }
887  ctx_->sent_initial_metadata_ = true;
888  }
889  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
890  call_.PerformOps(&finish_ops_);
891  }
892 
893  void SendInitialMetadata() override {
894  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
895  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
896  meta_tag_.Set(call_.call(),
897  [this](bool ok) {
898  reactor_->OnSendInitialMetadataDone(ok);
899  MaybeDone();
900  },
901  &meta_ops_);
902  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
903  ctx_->initial_metadata_flags());
904  if (ctx_->compression_level_set()) {
905  meta_ops_.set_compression_level(ctx_->compression_level());
906  }
907  ctx_->sent_initial_metadata_ = true;
908  meta_ops_.set_core_cq_tag(&meta_tag_);
909  call_.PerformOps(&meta_ops_);
910  }
911 
912  void Write(const ResponseType* resp,
913  ::grpc::WriteOptions options) override {
914  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
915  if (options.is_last_message()) {
916  options.set_buffer_hint();
917  }
918  if (!ctx_->sent_initial_metadata_) {
919  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
920  ctx_->initial_metadata_flags());
921  if (ctx_->compression_level_set()) {
922  write_ops_.set_compression_level(ctx_->compression_level());
923  }
924  ctx_->sent_initial_metadata_ = true;
925  }
926  // TODO(vjpai): don't assert
927  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
928  call_.PerformOps(&write_ops_);
929  }
930 
931  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
932  ::grpc::Status s) override {
933  // This combines the write into the finish callback
934  // Don't send any message if the status is bad
935  if (s.ok()) {
936  // TODO(vjpai): don't assert
937  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
938  }
939  Finish(std::move(s));
940  }
941 
942  private:
943  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
944 
945  ServerCallbackWriterImpl(
947  const RequestType* req, std::function<void()> call_requester,
949  : ctx_(ctx),
950  call_(*call),
951  req_(req),
952  call_requester_(std::move(call_requester)),
953  reactor_(reactor) {
954  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
955  write_tag_.Set(call_.call(),
956  [this](bool ok) {
957  reactor_->OnWriteDone(ok);
958  MaybeDone();
959  },
960  &write_ops_);
961  write_ops_.set_core_cq_tag(&write_tag_);
962  }
963  ~ServerCallbackWriterImpl() { req_->~RequestType(); }
964 
965  const RequestType* request() { return req_; }
966 
967  void MaybeDone() {
968  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
969  1, std::memory_order_acq_rel) == 1)) {
970  reactor_->OnDone();
971  grpc_call* call = call_.call();
972  auto call_requester = std::move(call_requester_);
973  this->~ServerCallbackWriterImpl(); // explicitly call destructor
975  call_requester();
976  }
977  }
978 
980  meta_ops_;
985  finish_ops_;
987  grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
988  grpc::internal::CallOpSendMessage>
989  write_ops_;
991 
993  grpc::internal::Call call_;
994  const RequestType* req_;
995  std::function<void()> call_requester_;
997  std::atomic<intptr_t> callbacks_outstanding_{
998  3}; // reserve for OnStarted, Finish, and CompletionOp
999  };
1000 };
1001 
1002 template <class RequestType, class ResponseType>
1004  public:
1006  std::function<
1008  func)
1009  : func_(std::move(func)) {}
1010  void RunHandler(const HandlerParameter& param) final {
1011  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
1012 
1014  param.status.ok()
1017  func_)
1018  : nullptr;
1019 
1020  if (reactor == nullptr) {
1021  // if deserialization or reactor creator failed, we need to fail the call
1023  }
1024 
1026  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
1027  ServerCallbackReaderWriterImpl(param.server_context, param.call,
1028  std::move(param.call_requester),
1029  reactor);
1030 
1031  stream->BindReactor(reactor);
1032  reactor->OnStarted(param.server_context);
1033  // The earliest that OnCancel can be called is after OnStarted is done.
1034  reactor->MaybeCallOnCancel();
1035  stream->MaybeDone();
1036  }
1037 
1038  private:
1039  std::function<experimental::ServerBidiReactor<RequestType, ResponseType>*()>
1040  func_;
1041 
1042  class ServerCallbackReaderWriterImpl
1043  : public experimental::ServerCallbackReaderWriter<RequestType,
1044  ResponseType> {
1045  public:
1046  void Finish(::grpc::Status s) override {
1047  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
1048  &finish_ops_);
1049  finish_ops_.set_core_cq_tag(&finish_tag_);
1050 
1051  if (!ctx_->sent_initial_metadata_) {
1052  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1053  ctx_->initial_metadata_flags());
1054  if (ctx_->compression_level_set()) {
1055  finish_ops_.set_compression_level(ctx_->compression_level());
1056  }
1057  ctx_->sent_initial_metadata_ = true;
1058  }
1059  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
1060  call_.PerformOps(&finish_ops_);
1061  }
1062 
1063  void SendInitialMetadata() override {
1064  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
1065  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
1066  meta_tag_.Set(call_.call(),
1067  [this](bool ok) {
1068  reactor_->OnSendInitialMetadataDone(ok);
1069  MaybeDone();
1070  },
1071  &meta_ops_);
1072  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1073  ctx_->initial_metadata_flags());
1074  if (ctx_->compression_level_set()) {
1075  meta_ops_.set_compression_level(ctx_->compression_level());
1076  }
1077  ctx_->sent_initial_metadata_ = true;
1078  meta_ops_.set_core_cq_tag(&meta_tag_);
1079  call_.PerformOps(&meta_ops_);
1080  }
1081 
1082  void Write(const ResponseType* resp,
1083  ::grpc::WriteOptions options) override {
1084  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
1085  if (options.is_last_message()) {
1086  options.set_buffer_hint();
1087  }
1088  if (!ctx_->sent_initial_metadata_) {
1089  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1090  ctx_->initial_metadata_flags());
1091  if (ctx_->compression_level_set()) {
1092  write_ops_.set_compression_level(ctx_->compression_level());
1093  }
1094  ctx_->sent_initial_metadata_ = true;
1095  }
1096  // TODO(vjpai): don't assert
1097  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
1098  call_.PerformOps(&write_ops_);
1099  }
1100 
1101  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
1102  ::grpc::Status s) override {
1103  // Don't send any message if the status is bad
1104  if (s.ok()) {
1105  // TODO(vjpai): don't assert
1106  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
1107  }
1108  Finish(std::move(s));
1109  }
1110 
1111  void Read(RequestType* req) override {
1112  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
1113  read_ops_.RecvMessage(req);
1114  call_.PerformOps(&read_ops_);
1115  }
1116 
1117  private:
1118  friend class CallbackBidiHandler<RequestType, ResponseType>;
1119 
1120  ServerCallbackReaderWriterImpl(
1122  std::function<void()> call_requester,
1124  : ctx_(ctx),
1125  call_(*call),
1126  call_requester_(std::move(call_requester)),
1127  reactor_(reactor) {
1128  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
1129  write_tag_.Set(call_.call(),
1130  [this](bool ok) {
1131  reactor_->OnWriteDone(ok);
1132  MaybeDone();
1133  },
1134  &write_ops_);
1135  write_ops_.set_core_cq_tag(&write_tag_);
1136  read_tag_.Set(call_.call(),
1137  [this](bool ok) {
1138  reactor_->OnReadDone(ok);
1139  MaybeDone();
1140  },
1141  &read_ops_);
1142  read_ops_.set_core_cq_tag(&read_tag_);
1143  }
1144  ~ServerCallbackReaderWriterImpl() {}
1145 
1146  void MaybeDone() {
1147  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1148  1, std::memory_order_acq_rel) == 1)) {
1149  reactor_->OnDone();
1150  grpc_call* call = call_.call();
1151  auto call_requester = std::move(call_requester_);
1152  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
1154  call_requester();
1155  }
1156  }
1157 
1159  meta_ops_;
1164  finish_ops_;
1166  grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1167  grpc::internal::CallOpSendMessage>
1168  write_ops_;
1171  read_ops_;
1173 
1175  grpc::internal::Call call_;
1176  std::function<void()> call_requester_;
1178  std::atomic<intptr_t> callbacks_outstanding_{
1179  3}; // reserve for OnStarted, Finish, and CompletionOp
1180  };
1181 };
1182 
1183 } // namespace internal
1184 
1185 } // namespace grpc_impl
1186 
1187 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback_impl.h:103
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_impl.h:497
void OnStarted(::grpc_impl::ServerContext *) override
Notify the application that a streaming RPC has started and that it is now ok to call any operation i...
Definition: server_callback_impl.h:476
void OnStarted(::grpc_impl::ServerContext *, Response *) override
Similar to ServerBidiReactor::OnStarted, except that this also provides the response object that the ...
Definition: server_callback_impl.h:456
void StartWrite(const Response *resp, ::grpc::WriteOptions options)
Initiate a write operation with specified options.
Definition: server_callback_impl.h:269
void StartRead(Request *req)
Initiate a read operation.
Definition: server_callback_impl.h:252
virtual void WriteAndFinish(const Response *msg, ::grpc::WriteOptions options, ::grpc::Status s)
Definition: server_callback_impl.h:188
virtual ~ServerCallbackWriter()
Definition: server_callback_impl.h:183
void BindReactor(ServerReadReactor< Request, Response > *reactor)
Definition: server_callback_impl.h:175
virtual void OnStarted(::grpc_impl::ServerContext *)
Notify the application that a streaming RPC has started and that it is now ok to call any operation i...
Definition: server_callback_impl.h:317
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:128
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:145
virtual void grpc_call_ref(grpc_call *call)=0
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback_impl.h:383
virtual ~ServerCallbackReaderWriter()
Definition: server_callback_impl.h:205
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback_impl.h:105
Definition: server_callback_impl.h:80
Definition: server_callback_impl.h:472
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:167
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_impl.h:675
void OnCancel() override
Notifies the application that this RPC has been cancelled.
Definition: server_callback_impl.h:348
void SetMessageAllocator(::grpc::experimental::MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_impl.h:491
void StartSendInitialMetadata()
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback_impl.h:406
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite (or StartWriteLast) operation completed.
Definition: server_callback_impl.h:338
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: server_callback_impl.h:852
#define GPR_UNLIKELY(x)
Definition: port_platform.h:679
virtual void grpc_call_unref(grpc_call *call)=0
Definition: server_callback_impl.h:181
Definition: server_callback_impl.h:203
Definition: async_unary_call_impl.h:302
void StartWrite(const Response *resp)
Initiate a write operation.
Definition: server_callback_impl.h:259
void Finish(::grpc::Status s)
Definition: server_callback_impl.h:420
void OnCancel() override
Definition: server_callback_impl.h:434
Definition: grpc_types.h:40
void StartSendInitialMetadata()
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback_impl.h:369
virtual void WriteAndFinish(const Response *msg, ::grpc::WriteOptions options, ::grpc::Status s)
Definition: server_callback_impl.h:211
grpc_call * call() const
Definition: call.h:72
void Finish(::grpc::Status s)
Definition: server_callback_impl.h:371
::grpc_impl::ServerContext ServerContext
Definition: server_context.h:25
A ServerContext allows the person implementing a service handler to:
Definition: server_context_impl.h:118
void OnDone() override
Definition: server_callback_impl.h:455
void StartWriteAndFinish(const Response *resp, ::grpc::WriteOptions options, ::grpc::Status s)
Initiate a write operation with specified options and final RPC Status, which also causes any trailin...
Definition: server_callback_impl.h:286
Definition: server_callback_impl.h:462
void BindReactor(ServerWriteReactor< Request, Response > *reactor)
Definition: server_callback_impl.h:197
void OnDone() override
Notifies the application that all operations associated with this RPC have completed.
Definition: server_callback_impl.h:475
virtual void OnSendInitialMetadataDone(bool)
Notifies the application that an explicit StartSendInitialMetadata operation completed.
Definition: server_callback_impl.h:325
void StartWrite(const Response *resp, ::grpc::WriteOptions options)
Definition: server_callback_impl.h:410
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
virtual ~ServerCallbackReader()
Definition: server_callback_impl.h:168
CallbackServerStreamingHandler(std::function< experimental::ServerWriteReactor< RequestType, ResponseType > *()> func)
Definition: server_callback_impl.h:819
void StartWriteLast(const Response *resp, ::grpc::WriteOptions options)
Inform system of a planned write operation with specified options, but allow the library to schedule ...
Definition: server_callback_impl.h:299
ResponseT * response()
Definition: message_allocator.h:47
Definition: call_op_set.h:633
RequestT * request()
Definition: message_allocator.h:46
CallbackBidiHandler(std::function< experimental::ServerBidiReactor< RequestType, ResponseType > *()> func)
Definition: server_callback_impl.h:1005
void StartRead(Request *req)
Definition: server_callback_impl.h:370
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_impl.h:824
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback_impl.h:431
Definition: call_op_set.h:222
void OnCancel() override
Definition: server_callback_impl.h:386
void StartSendInitialMetadata()
Do NOT call any operation initiation method (names that start with Start) until after the library has...
Definition: server_callback_impl.h:246
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:151
Definition: call_op_set.h:292
Definition: server_callback_impl.h:111
void OnDone() override
Definition: server_callback_impl.h:385
virtual void OnReadDone(bool)
Definition: server_callback_impl.h:384
void BindReactor(ServerBidiReactor< Request, Response > *reactor)
Definition: server_callback_impl.h:219
Definition: server_callback_impl.h:166
virtual void OnWriteDone(bool)
Definition: server_callback_impl.h:432
Definition: byte_buffer.h:36
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue_impl.h:91
Definition: message_allocator.h:40
void StartWriteLast(const Response *resp, ::grpc::WriteOptions options)
Definition: server_callback_impl.h:417
ReturnType * CatchingReactorCreator(Func &&func, Args &&... args)
Definition: callback_common.h:51
Definition: rpc_service_method.h:44
DefaultMessageHolder()
Definition: server_callback_impl.h:83
Per-message write options.
Definition: call_op_set.h:85
void OnDone() override
Notifies the application that all operations associated with this RPC have completed.
Definition: server_callback_impl.h:343
void OnDone() override
Definition: server_callback_impl.h:433
CallbackUnaryHandler(std::function< void(::grpc_impl::ServerContext *, const RequestType *, ResponseType *, experimental::ServerCallbackRpcController *)> func)
Definition: server_callback_impl.h:484
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: server_callback_impl.h:331
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
Definition: server_callback_impl.h:46
void Finish(::grpc::Status s)
Indicate that the stream is to be finished and the trailing metadata and RPC status are to be sent...
Definition: server_callback_impl.h:309
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
void Release() override
Definition: server_callback_impl.h:87
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
bool ok() const
Is the status OK?
Definition: status.h:118
Base class for running an RPC handler.
Definition: rpc_service_method.h:41
void StartWrite(const Response *resp)
Definition: server_callback_impl.h:407
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **handler_data) final
Definition: server_callback_impl.h:521
Did it work? If it didn&#39;t, why?
Definition: status.h:31
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
void CatchingCallback(Func &&func, Args &&... args)
An exception-safe way of invoking a user-specified callback function.
Definition: callback_common.h:38
void OnDone() override
Definition: server_callback_impl.h:465
Definition: message_allocator.h:27
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:192
virtual void SendInitialMetadata(std::function< void(bool)>)=0
Definition: server_callback_impl.h:42
virtual void OnStarted(::grpc_impl::ServerContext *, const Request *)
Similar to ServerBidiReactor::OnStarted, except that this also provides the request object sent by th...
Definition: server_callback_impl.h:427
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_impl.h:1010
CallbackClientStreamingHandler(std::function< experimental::ServerReadReactor< RequestType, ResponseType > *()> func)
Definition: server_callback_impl.h:670
Definition: server_callback_impl.h:48
void StartWriteAndFinish(const Response *resp, ::grpc::WriteOptions options, ::grpc::Status s)
Definition: server_callback_impl.h:413
void OnStarted(::grpc_impl::ServerContext *, const Request *) override
Similar to ServerBidiReactor::OnStarted, except that this also provides the request object sent by th...
Definition: server_callback_impl.h:466
A sequence of bytes.
Definition: byte_buffer.h:72
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback_impl.h:107
virtual void OnStarted(::grpc_impl::ServerContext *, Response *)
Similar to ServerBidiReactor::OnStarted, except that this also provides the response object that the ...
Definition: server_callback_impl.h:379
Definition: server_callback_impl.h:452
Straightforward wrapping of the C call object.
Definition: call.h:38