GRPC C++  1.22.0
server_callback.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
20 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
21 
22 #include <atomic>
23 #include <functional>
24 #include <type_traits>
25 
35 
36 namespace grpc {
37 
38 // Declare base class of all reactors as internal
39 namespace internal {
40 
41 // Forward declarations
42 template <class Request, class Response>
44 template <class Request, class Response>
46 template <class Request, class Response>
48 
50  public:
51  virtual ~ServerReactor() = default;
52  virtual void OnDone() = 0;
53  virtual void OnCancel() = 0;
54 
55  private:
57  template <class Request, class Response>
59  template <class Request, class Response>
61  template <class Request, class Response>
62  friend class CallbackBidiHandler;
63 
64  // The ServerReactor is responsible for tracking when it is safe to call
65  // OnCancel. This function should not be called until after OnStarted is done
66  // and the RPC has completed with a cancellation. This is tracked by counting
67  // how many of these conditions have been met and calling OnCancel when none
68  // remain unmet.
69 
70  void MaybeCallOnCancel() {
71  if (on_cancel_conditions_remaining_.fetch_sub(
72  1, std::memory_order_acq_rel) == 1) {
73  OnCancel();
74  }
75  }
76 
77  std::atomic_int on_cancel_conditions_remaining_{2};
78 };
79 
80 template <class Request, class Response>
82  : public experimental::MessageHolder<Request, Response> {
83  public:
85  this->set_request(&request_obj_);
86  this->set_response(&response_obj_);
87  }
88  void Release() override {
89  // the object is allocated in the call arena.
91  }
92 
93  private:
94  Request request_obj_;
95  Response response_obj_;
96 };
97 
98 } // namespace internal
99 
100 namespace experimental {
101 
102 // Forward declarations
103 template <class Request, class Response>
105 template <class Request, class Response>
107 template <class Request, class Response>
109 
110 // For unary RPCs, the exposed controller class is only an interface
111 // and the actual implementation is an internal class.
113  public:
114  virtual ~ServerCallbackRpcController() = default;
115 
116  // The method handler must call this function when it is done so that
117  // the library knows to free its resources
118  virtual void Finish(Status s) = 0;
119 
120  // Allow the method handler to push out the initial metadata before
121  // the response and status are ready
122  virtual void SendInitialMetadata(std::function<void(bool)>) = 0;
123 
155  virtual void SetCancelCallback(std::function<void()> callback) = 0;
156  virtual void ClearCancelCallback() = 0;
157 
158  // NOTE: This is an API for advanced users who need custom allocators.
159  // Get and maybe mutate the allocator state associated with the current RPC.
160  virtual RpcAllocatorState* GetRpcAllocatorState() = 0;
161 };
162 
163 // NOTE: The actual streaming object classes are provided
164 // as API only to support mocking. There are no implementations of
165 // these class interfaces in the API.
166 template <class Request>
168  public:
170  virtual void Finish(Status s) = 0;
171  virtual void SendInitialMetadata() = 0;
172  virtual void Read(Request* msg) = 0;
173 
174  protected:
175  template <class Response>
177  reactor->BindReader(this);
178  }
179 };
180 
181 template <class Response>
183  public:
185 
186  virtual void Finish(Status s) = 0;
187  virtual void SendInitialMetadata() = 0;
188  virtual void Write(const Response* msg, WriteOptions options) = 0;
189  virtual void WriteAndFinish(const Response* msg, WriteOptions options,
190  Status s) {
191  // Default implementation that can/should be overridden
192  Write(msg, std::move(options));
193  Finish(std::move(s));
194  }
195 
196  protected:
197  template <class Request>
199  reactor->BindWriter(this);
200  }
201 };
202 
203 template <class Request, class Response>
205  public:
207 
208  virtual void Finish(Status s) = 0;
209  virtual void SendInitialMetadata() = 0;
210  virtual void Read(Request* msg) = 0;
211  virtual void Write(const Response* msg, WriteOptions options) = 0;
212  virtual void WriteAndFinish(const Response* msg, WriteOptions options,
213  Status s) {
214  // Default implementation that can/should be overridden
215  Write(msg, std::move(options));
216  Finish(std::move(s));
217  }
218 
219  protected:
221  reactor->BindStream(this);
222  }
223 };
224 
225 // The following classes are the reactor interfaces that are to be implemented
226 // by the user, returned as the result of the method handler for a callback
227 // method, and activated by the call to OnStarted. The library guarantees that
228 // OnStarted will be called for any reactor that has been created using a
229 // method handler registered on a service. No operation initiation method may be
230 // called until after the call to OnStarted.
231 // Note that none of the classes are pure; all reactions have a default empty
232 // reaction so that the user class only needs to override those classes that it
233 // cares about.
234 
236 template <class Request, class Response>
238  public:
239  ~ServerBidiReactor() = default;
240 
243 
247  void StartSendInitialMetadata() { stream_->SendInitialMetadata(); }
248 
253  void StartRead(Request* req) { stream_->Read(req); }
254 
260  void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); }
261 
268  void StartWrite(const Response* resp, WriteOptions options) {
269  stream_->Write(resp, std::move(options));
270  }
271 
285  void StartWriteAndFinish(const Response* resp, WriteOptions options,
286  Status s) {
287  stream_->WriteAndFinish(resp, std::move(options), std::move(s));
288  }
289 
298  void StartWriteLast(const Response* resp, WriteOptions options) {
299  StartWrite(resp, std::move(options.set_last_message()));
300  }
301 
308  void Finish(Status s) { stream_->Finish(std::move(s)); }
309 
316  virtual void OnStarted(::grpc_impl::ServerContext* context) {}
317 
324  virtual void OnSendInitialMetadataDone(bool ok) {}
325 
330  virtual void OnReadDone(bool ok) {}
331 
337  virtual void OnWriteDone(bool ok) {}
338 
342  void OnDone() override {}
343 
347  void OnCancel() override {}
348 
349  private:
350  friend class ServerCallbackReaderWriter<Request, Response>;
351  virtual void BindStream(
353  stream_ = stream;
354  }
355 
357 };
358 
360 template <class Request, class Response>
362  public:
363  ~ServerReadReactor() = default;
364 
366  void StartSendInitialMetadata() { reader_->SendInitialMetadata(); }
367  void StartRead(Request* req) { reader_->Read(req); }
368  void Finish(Status s) { reader_->Finish(std::move(s)); }
369 
376  virtual void OnStarted(::grpc_impl::ServerContext* context, Response* resp) {}
377 
379  virtual void OnSendInitialMetadataDone(bool ok) {}
380  virtual void OnReadDone(bool ok) {}
381  void OnDone() override {}
382  void OnCancel() override {}
383 
384  private:
385  friend class ServerCallbackReader<Request>;
386  virtual void BindReader(ServerCallbackReader<Request>* reader) {
387  reader_ = reader;
388  }
389 
391 };
392 
394 template <class Request, class Response>
396  public:
397  ~ServerWriteReactor() = default;
398 
400  void StartSendInitialMetadata() { writer_->SendInitialMetadata(); }
401  void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); }
402  void StartWrite(const Response* resp, WriteOptions options) {
403  writer_->Write(resp, std::move(options));
404  }
405  void StartWriteAndFinish(const Response* resp, WriteOptions options,
406  Status s) {
407  writer_->WriteAndFinish(resp, std::move(options), std::move(s));
408  }
409  void StartWriteLast(const Response* resp, WriteOptions options) {
410  StartWrite(resp, std::move(options.set_last_message()));
411  }
412  void Finish(Status s) { writer_->Finish(std::move(s)); }
413 
419  virtual void OnStarted(::grpc_impl::ServerContext* context,
420  const Request* req) {}
421 
423  virtual void OnSendInitialMetadataDone(bool ok) {}
424  virtual void OnWriteDone(bool ok) {}
425  void OnDone() override {}
426  void OnCancel() override {}
427 
428  private:
429  friend class ServerCallbackWriter<Response>;
430  virtual void BindWriter(ServerCallbackWriter<Response>* writer) {
431  writer_ = writer;
432  }
433 
435 };
436 
437 } // namespace experimental
438 
439 namespace internal {
440 
441 template <class Request, class Response>
443  : public experimental::ServerReadReactor<Request, Response> {
444  public:
445  void OnDone() override { delete this; }
446  void OnStarted(::grpc_impl::ServerContext*, Response*) override {
447  this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
448  }
449 };
450 
451 template <class Request, class Response>
453  : public experimental::ServerWriteReactor<Request, Response> {
454  public:
455  void OnDone() override { delete this; }
456  void OnStarted(::grpc_impl::ServerContext*, const Request*) override {
457  this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
458  }
459 };
460 
461 template <class Request, class Response>
463  : public experimental::ServerBidiReactor<Request, Response> {
464  public:
465  void OnDone() override { delete this; }
467  this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
468  }
469 };
470 
471 template <class RequestType, class ResponseType>
472 class CallbackUnaryHandler : public MethodHandler {
473  public:
475  std::function<void(::grpc_impl::ServerContext*, const RequestType*,
476  ResponseType*,
478  func)
479  : func_(func) {}
480 
483  allocator_ = allocator;
484  }
485 
486  void RunHandler(const HandlerParameter& param) final {
487  // Arena allocate a controller structure (that includes request/response)
488  g_core_codegen_interface->grpc_call_ref(param.call->call());
489  auto* allocator_state =
491  param.internal_data);
492  auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc(
493  param.call->call(), sizeof(ServerCallbackRpcControllerImpl)))
494  ServerCallbackRpcControllerImpl(param.server_context, param.call,
495  allocator_state,
496  std::move(param.call_requester));
497  Status status = param.status;
498  if (status.ok()) {
499  // Call the actual function handler and expect the user to call finish
500  CatchingCallback(func_, param.server_context, controller->request(),
501  controller->response(), controller);
502  } else {
503  // if deserialization failed, we need to fail the call
504  controller->Finish(status);
505  }
506  }
507 
508  void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status,
509  void** handler_data) final {
510  ByteBuffer buf;
511  buf.set_buffer(req);
512  RequestType* request = nullptr;
514  nullptr;
515  if (allocator_ != nullptr) {
516  allocator_state = allocator_->AllocateMessages();
517  } else {
518  allocator_state = new (g_core_codegen_interface->grpc_call_arena_alloc(
521  }
522  *handler_data = allocator_state;
523  request = allocator_state->request();
524  *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
525  buf.Release();
526  if (status->ok()) {
527  return request;
528  }
529  // Clean up on deserialization failure.
530  allocator_state->Release();
531  return nullptr;
532  }
533 
534  private:
535  std::function<void(::grpc_impl::ServerContext*, const RequestType*,
537  func_;
539  nullptr;
540 
541  // The implementation class of ServerCallbackRpcController is a private member
542  // of CallbackUnaryHandler since it is never exposed anywhere, and this allows
543  // it to take advantage of CallbackUnaryHandler's friendships.
544  class ServerCallbackRpcControllerImpl
546  public:
547  void Finish(Status s) override {
548  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
549  &finish_ops_);
550  if (!ctx_->sent_initial_metadata_) {
551  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
552  ctx_->initial_metadata_flags());
553  if (ctx_->compression_level_set()) {
554  finish_ops_.set_compression_level(ctx_->compression_level());
555  }
556  ctx_->sent_initial_metadata_ = true;
557  }
558  // The response is dropped if the status is not OK.
559  if (s.ok()) {
560  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
561  finish_ops_.SendMessagePtr(response()));
562  } else {
563  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
564  }
565  finish_ops_.set_core_cq_tag(&finish_tag_);
566  call_.PerformOps(&finish_ops_);
567  }
568 
569  void SendInitialMetadata(std::function<void(bool)> f) override {
570  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
571  callbacks_outstanding_++;
572  // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14
573  // and if performance of this operation matters
574  meta_tag_.Set(call_.call(),
575  [this, f](bool ok) {
576  f(ok);
577  MaybeDone();
578  },
579  &meta_ops_);
580  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
581  ctx_->initial_metadata_flags());
582  if (ctx_->compression_level_set()) {
583  meta_ops_.set_compression_level(ctx_->compression_level());
584  }
585  ctx_->sent_initial_metadata_ = true;
586  meta_ops_.set_core_cq_tag(&meta_tag_);
587  call_.PerformOps(&meta_ops_);
588  }
589 
590  // Neither SetCancelCallback nor ClearCancelCallback should affect the
591  // callbacks_outstanding_ count since they are paired and both must precede
592  // the invocation of Finish (if they are used at all)
593  void SetCancelCallback(std::function<void()> callback) override {
594  ctx_->SetCancelCallback(std::move(callback));
595  }
596 
597  void ClearCancelCallback() override { ctx_->ClearCancelCallback(); }
598 
599  experimental::RpcAllocatorState* GetRpcAllocatorState() override {
600  return allocator_state_;
601  }
602 
603  private:
604  friend class CallbackUnaryHandler<RequestType, ResponseType>;
605 
606  ServerCallbackRpcControllerImpl(
607  ::grpc_impl::ServerContext* ctx, Call* call,
609  std::function<void()> call_requester)
610  : ctx_(ctx),
611  call_(*call),
612  allocator_state_(allocator_state),
613  call_requester_(std::move(call_requester)) {
614  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr);
615  }
616 
617  const RequestType* request() { return allocator_state_->request(); }
618  ResponseType* response() { return allocator_state_->response(); }
619 
620  void MaybeDone() {
621  if (--callbacks_outstanding_ == 0) {
622  grpc_call* call = call_.call();
623  auto call_requester = std::move(call_requester_);
624  allocator_state_->Release();
625  this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor
627  call_requester();
628  }
629  }
630 
632  CallbackWithSuccessTag meta_tag_;
635  finish_ops_;
636  CallbackWithSuccessTag finish_tag_;
637 
639  Call call_;
641  allocator_state_;
642  std::function<void()> call_requester_;
643  std::atomic_int callbacks_outstanding_{
644  2}; // reserve for Finish and CompletionOp
645  };
646 };
647 
648 template <class RequestType, class ResponseType>
650  public:
652  std::function<
654  func)
655  : func_(std::move(func)) {}
656  void RunHandler(const HandlerParameter& param) final {
657  // Arena allocate a reader structure (that includes response)
658  g_core_codegen_interface->grpc_call_ref(param.call->call());
659 
661  param.status.ok()
664  func_)
665  : nullptr;
666 
667  if (reactor == nullptr) {
668  // if deserialization or reactor creator failed, we need to fail the call
670  }
671 
672  auto* reader = new (g_core_codegen_interface->grpc_call_arena_alloc(
673  param.call->call(), sizeof(ServerCallbackReaderImpl)))
674  ServerCallbackReaderImpl(param.server_context, param.call,
675  std::move(param.call_requester), reactor);
676 
677  reader->BindReactor(reactor);
678  reactor->OnStarted(param.server_context, reader->response());
679  // The earliest that OnCancel can be called is after OnStarted is done.
680  reactor->MaybeCallOnCancel();
681  reader->MaybeDone();
682  }
683 
684  private:
685  std::function<experimental::ServerReadReactor<RequestType, ResponseType>*()>
686  func_;
687 
688  class ServerCallbackReaderImpl
689  : public experimental::ServerCallbackReader<RequestType> {
690  public:
691  void Finish(Status s) override {
692  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
693  &finish_ops_);
694  if (!ctx_->sent_initial_metadata_) {
695  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
696  ctx_->initial_metadata_flags());
697  if (ctx_->compression_level_set()) {
698  finish_ops_.set_compression_level(ctx_->compression_level());
699  }
700  ctx_->sent_initial_metadata_ = true;
701  }
702  // The response is dropped if the status is not OK.
703  if (s.ok()) {
704  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
705  finish_ops_.SendMessagePtr(&resp_));
706  } else {
707  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
708  }
709  finish_ops_.set_core_cq_tag(&finish_tag_);
710  call_.PerformOps(&finish_ops_);
711  }
712 
713  void SendInitialMetadata() override {
714  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
715  callbacks_outstanding_++;
716  meta_tag_.Set(call_.call(),
717  [this](bool ok) {
718  reactor_->OnSendInitialMetadataDone(ok);
719  MaybeDone();
720  },
721  &meta_ops_);
722  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
723  ctx_->initial_metadata_flags());
724  if (ctx_->compression_level_set()) {
725  meta_ops_.set_compression_level(ctx_->compression_level());
726  }
727  ctx_->sent_initial_metadata_ = true;
728  meta_ops_.set_core_cq_tag(&meta_tag_);
729  call_.PerformOps(&meta_ops_);
730  }
731 
732  void Read(RequestType* req) override {
733  callbacks_outstanding_++;
734  read_ops_.RecvMessage(req);
735  call_.PerformOps(&read_ops_);
736  }
737 
738  private:
739  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
740 
741  ServerCallbackReaderImpl(
742  ::grpc_impl::ServerContext* ctx, Call* call,
743  std::function<void()> call_requester,
745  : ctx_(ctx),
746  call_(*call),
747  call_requester_(std::move(call_requester)),
748  reactor_(reactor) {
749  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
750  read_tag_.Set(call_.call(),
751  [this](bool ok) {
752  reactor_->OnReadDone(ok);
753  MaybeDone();
754  },
755  &read_ops_);
756  read_ops_.set_core_cq_tag(&read_tag_);
757  }
758 
759  ~ServerCallbackReaderImpl() {}
760 
761  ResponseType* response() { return &resp_; }
762 
763  void MaybeDone() {
764  if (--callbacks_outstanding_ == 0) {
765  reactor_->OnDone();
766  grpc_call* call = call_.call();
767  auto call_requester = std::move(call_requester_);
768  this->~ServerCallbackReaderImpl(); // explicitly call destructor
770  call_requester();
771  }
772  }
773 
775  CallbackWithSuccessTag meta_tag_;
778  finish_ops_;
779  CallbackWithSuccessTag finish_tag_;
781  CallbackWithSuccessTag read_tag_;
782 
784  Call call_;
785  ResponseType resp_;
786  std::function<void()> call_requester_;
788  std::atomic_int callbacks_outstanding_{
789  3}; // reserve for OnStarted, Finish, and CompletionOp
790  };
791 };
792 
793 template <class RequestType, class ResponseType>
795  public:
797  std::function<
799  func)
800  : func_(std::move(func)) {}
801  void RunHandler(const HandlerParameter& param) final {
802  // Arena allocate a writer structure
803  g_core_codegen_interface->grpc_call_ref(param.call->call());
804 
806  param.status.ok()
809  func_)
810  : nullptr;
811 
812  if (reactor == nullptr) {
813  // if deserialization or reactor creator failed, we need to fail the call
815  }
816 
817  auto* writer = new (g_core_codegen_interface->grpc_call_arena_alloc(
818  param.call->call(), sizeof(ServerCallbackWriterImpl)))
819  ServerCallbackWriterImpl(param.server_context, param.call,
820  static_cast<RequestType*>(param.request),
821  std::move(param.call_requester), reactor);
822  writer->BindReactor(reactor);
823  reactor->OnStarted(param.server_context, writer->request());
824  // The earliest that OnCancel can be called is after OnStarted is done.
825  reactor->MaybeCallOnCancel();
826  writer->MaybeDone();
827  }
828 
829  void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status,
830  void** handler_data) final {
831  ByteBuffer buf;
832  buf.set_buffer(req);
833  auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
834  call, sizeof(RequestType))) RequestType();
835  *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
836  buf.Release();
837  if (status->ok()) {
838  return request;
839  }
840  request->~RequestType();
841  return nullptr;
842  }
843 
844  private:
845  std::function<experimental::ServerWriteReactor<RequestType, ResponseType>*()>
846  func_;
847 
848  class ServerCallbackWriterImpl
849  : public experimental::ServerCallbackWriter<ResponseType> {
850  public:
851  void Finish(Status s) override {
852  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
853  &finish_ops_);
854  finish_ops_.set_core_cq_tag(&finish_tag_);
855 
856  if (!ctx_->sent_initial_metadata_) {
857  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
858  ctx_->initial_metadata_flags());
859  if (ctx_->compression_level_set()) {
860  finish_ops_.set_compression_level(ctx_->compression_level());
861  }
862  ctx_->sent_initial_metadata_ = true;
863  }
864  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
865  call_.PerformOps(&finish_ops_);
866  }
867 
868  void SendInitialMetadata() override {
869  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
870  callbacks_outstanding_++;
871  meta_tag_.Set(call_.call(),
872  [this](bool ok) {
873  reactor_->OnSendInitialMetadataDone(ok);
874  MaybeDone();
875  },
876  &meta_ops_);
877  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
878  ctx_->initial_metadata_flags());
879  if (ctx_->compression_level_set()) {
880  meta_ops_.set_compression_level(ctx_->compression_level());
881  }
882  ctx_->sent_initial_metadata_ = true;
883  meta_ops_.set_core_cq_tag(&meta_tag_);
884  call_.PerformOps(&meta_ops_);
885  }
886 
887  void Write(const ResponseType* resp, WriteOptions options) override {
888  callbacks_outstanding_++;
889  if (options.is_last_message()) {
890  options.set_buffer_hint();
891  }
892  if (!ctx_->sent_initial_metadata_) {
893  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
894  ctx_->initial_metadata_flags());
895  if (ctx_->compression_level_set()) {
896  write_ops_.set_compression_level(ctx_->compression_level());
897  }
898  ctx_->sent_initial_metadata_ = true;
899  }
900  // TODO(vjpai): don't assert
901  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
902  call_.PerformOps(&write_ops_);
903  }
904 
905  void WriteAndFinish(const ResponseType* resp, WriteOptions options,
906  Status s) override {
907  // This combines the write into the finish callback
908  // Don't send any message if the status is bad
909  if (s.ok()) {
910  // TODO(vjpai): don't assert
911  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
912  }
913  Finish(std::move(s));
914  }
915 
916  private:
917  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
918 
919  ServerCallbackWriterImpl(
920  ::grpc_impl::ServerContext* ctx, Call* call, const RequestType* req,
921  std::function<void()> call_requester,
923  : ctx_(ctx),
924  call_(*call),
925  req_(req),
926  call_requester_(std::move(call_requester)),
927  reactor_(reactor) {
928  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
929  write_tag_.Set(call_.call(),
930  [this](bool ok) {
931  reactor_->OnWriteDone(ok);
932  MaybeDone();
933  },
934  &write_ops_);
935  write_ops_.set_core_cq_tag(&write_tag_);
936  }
937  ~ServerCallbackWriterImpl() { req_->~RequestType(); }
938 
939  const RequestType* request() { return req_; }
940 
941  void MaybeDone() {
942  if (--callbacks_outstanding_ == 0) {
943  reactor_->OnDone();
944  grpc_call* call = call_.call();
945  auto call_requester = std::move(call_requester_);
946  this->~ServerCallbackWriterImpl(); // explicitly call destructor
948  call_requester();
949  }
950  }
951 
953  CallbackWithSuccessTag meta_tag_;
956  finish_ops_;
957  CallbackWithSuccessTag finish_tag_;
959  CallbackWithSuccessTag write_tag_;
960 
962  Call call_;
963  const RequestType* req_;
964  std::function<void()> call_requester_;
966  std::atomic_int callbacks_outstanding_{
967  3}; // reserve for OnStarted, Finish, and CompletionOp
968  };
969 };
970 
971 template <class RequestType, class ResponseType>
972 class CallbackBidiHandler : public MethodHandler {
973  public:
975  std::function<
977  func)
978  : func_(std::move(func)) {}
979  void RunHandler(const HandlerParameter& param) final {
980  g_core_codegen_interface->grpc_call_ref(param.call->call());
981 
983  param.status.ok()
986  func_)
987  : nullptr;
988 
989  if (reactor == nullptr) {
990  // if deserialization or reactor creator failed, we need to fail the call
992  }
993 
994  auto* stream = new (g_core_codegen_interface->grpc_call_arena_alloc(
995  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
996  ServerCallbackReaderWriterImpl(param.server_context, param.call,
997  std::move(param.call_requester),
998  reactor);
999 
1000  stream->BindReactor(reactor);
1001  reactor->OnStarted(param.server_context);
1002  // The earliest that OnCancel can be called is after OnStarted is done.
1003  reactor->MaybeCallOnCancel();
1004  stream->MaybeDone();
1005  }
1006 
1007  private:
1008  std::function<experimental::ServerBidiReactor<RequestType, ResponseType>*()>
1009  func_;
1010 
1011  class ServerCallbackReaderWriterImpl
1012  : public experimental::ServerCallbackReaderWriter<RequestType,
1013  ResponseType> {
1014  public:
1015  void Finish(Status s) override {
1016  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
1017  &finish_ops_);
1018  finish_ops_.set_core_cq_tag(&finish_tag_);
1019 
1020  if (!ctx_->sent_initial_metadata_) {
1021  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1022  ctx_->initial_metadata_flags());
1023  if (ctx_->compression_level_set()) {
1024  finish_ops_.set_compression_level(ctx_->compression_level());
1025  }
1026  ctx_->sent_initial_metadata_ = true;
1027  }
1028  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
1029  call_.PerformOps(&finish_ops_);
1030  }
1031 
1032  void SendInitialMetadata() override {
1033  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
1034  callbacks_outstanding_++;
1035  meta_tag_.Set(call_.call(),
1036  [this](bool ok) {
1037  reactor_->OnSendInitialMetadataDone(ok);
1038  MaybeDone();
1039  },
1040  &meta_ops_);
1041  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1042  ctx_->initial_metadata_flags());
1043  if (ctx_->compression_level_set()) {
1044  meta_ops_.set_compression_level(ctx_->compression_level());
1045  }
1046  ctx_->sent_initial_metadata_ = true;
1047  meta_ops_.set_core_cq_tag(&meta_tag_);
1048  call_.PerformOps(&meta_ops_);
1049  }
1050 
1051  void Write(const ResponseType* resp, WriteOptions options) override {
1052  callbacks_outstanding_++;
1053  if (options.is_last_message()) {
1054  options.set_buffer_hint();
1055  }
1056  if (!ctx_->sent_initial_metadata_) {
1057  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1058  ctx_->initial_metadata_flags());
1059  if (ctx_->compression_level_set()) {
1060  write_ops_.set_compression_level(ctx_->compression_level());
1061  }
1062  ctx_->sent_initial_metadata_ = true;
1063  }
1064  // TODO(vjpai): don't assert
1065  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
1066  call_.PerformOps(&write_ops_);
1067  }
1068 
1069  void WriteAndFinish(const ResponseType* resp, WriteOptions options,
1070  Status s) override {
1071  // Don't send any message if the status is bad
1072  if (s.ok()) {
1073  // TODO(vjpai): don't assert
1074  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
1075  }
1076  Finish(std::move(s));
1077  }
1078 
1079  void Read(RequestType* req) override {
1080  callbacks_outstanding_++;
1081  read_ops_.RecvMessage(req);
1082  call_.PerformOps(&read_ops_);
1083  }
1084 
1085  private:
1086  friend class CallbackBidiHandler<RequestType, ResponseType>;
1087 
1088  ServerCallbackReaderWriterImpl(
1089  ::grpc_impl::ServerContext* ctx, Call* call,
1090  std::function<void()> call_requester,
1092  : ctx_(ctx),
1093  call_(*call),
1094  call_requester_(std::move(call_requester)),
1095  reactor_(reactor) {
1096  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
1097  write_tag_.Set(call_.call(),
1098  [this](bool ok) {
1099  reactor_->OnWriteDone(ok);
1100  MaybeDone();
1101  },
1102  &write_ops_);
1103  write_ops_.set_core_cq_tag(&write_tag_);
1104  read_tag_.Set(call_.call(),
1105  [this](bool ok) {
1106  reactor_->OnReadDone(ok);
1107  MaybeDone();
1108  },
1109  &read_ops_);
1110  read_ops_.set_core_cq_tag(&read_tag_);
1111  }
1112  ~ServerCallbackReaderWriterImpl() {}
1113 
1114  void MaybeDone() {
1115  if (--callbacks_outstanding_ == 0) {
1116  reactor_->OnDone();
1117  grpc_call* call = call_.call();
1118  auto call_requester = std::move(call_requester_);
1119  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
1121  call_requester();
1122  }
1123  }
1124 
1126  CallbackWithSuccessTag meta_tag_;
1129  finish_ops_;
1130  CallbackWithSuccessTag finish_tag_;
1132  CallbackWithSuccessTag write_tag_;
1134  CallbackWithSuccessTag read_tag_;
1135 
1137  Call call_;
1138  std::function<void()> call_requester_;
1140  std::atomic_int callbacks_outstanding_{
1141  3}; // reserve for OnStarted, Finish, and CompletionOp
1142  };
1143 };
1144 
1145 } // namespace internal
1146 
1147 } // namespace grpc
1148 
1149 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
void OnDone() override
Definition: server_callback.h:455
void Release() override
Definition: server_callback.h:88
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:145
virtual void grpc_call_ref(grpc_call *call)=0
Definition: server_callback.h:462
void StartWriteAndFinish(const Response *resp, WriteOptions options, Status s)
Initiate a write operation with specified options and final RPC Status, which also causes any trailin...
Definition: server_callback.h:285
void BindReactor(ServerBidiReactor< Request, Response > *reactor)
Definition: server_callback.h:220
virtual void OnSendInitialMetadataDone(bool ok)
Notifies the application that an explicit StartSendInitialMetadata operation completed.
Definition: server_callback.h:324
Definition: server_callback.h:47
void StartWrite(const Response *resp, WriteOptions options)
Initiate a write operation with specified options.
Definition: server_callback.h:268
virtual ~ServerCallbackWriter()
Definition: server_callback.h:184
Definition: server_callback.h:442
void OnStarted(::grpc_impl::ServerContext *, const Request *) override
Similar to ServerBidiReactor::OnStarted, except that this also provides the request object sent by th...
Definition: server_callback.h:456
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
virtual void OnReadDone(bool ok)
Definition: server_callback.h:380
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:821
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:801
void StartRead(Request *req)
Definition: server_callback.h:367
virtual void OnSendInitialMetadataDone(bool ok)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:423
virtual void grpc_call_unref(grpc_call *call)=0
CallbackUnaryHandler(std::function< void(::grpc_impl::ServerContext *, const RequestType *, ResponseType *, experimental::ServerCallbackRpcController *)> func)
Definition: server_callback.h:474
DefaultMessageHolder()
Definition: server_callback.h:84
virtual void SendInitialMetadata(std::function< void(bool)>)=0
Definition: async_unary_call.h:303
void OnStarted(::grpc_impl::ServerContext *, Response *) override
Similar to ServerBidiReactor::OnStarted, except that this also provides the response object that the ...
Definition: server_callback.h:446
void StartWriteAndFinish(const Response *resp, WriteOptions options, Status s)
Definition: server_callback.h:405
void OnDone() override
Definition: server_callback.h:425
Definition: grpc_types.h:40
virtual void OnWriteDone(bool ok)
Notifies the application that a StartWrite (or StartWriteLast) operation completed.
Definition: server_callback.h:337
grpc_call * call() const
Definition: call.h:72
Definition: server_callback.h:204
void StartWrite(const Response *resp)
Initiate a write operation.
Definition: server_callback.h:260
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:104
::grpc_impl::ServerContext ServerContext
Definition: server_context.h:25
A ServerContext allows the person implementing a service handler to:
Definition: server_context_impl.h:114
void BindReactor(ServerWriteReactor< Request, Response > *reactor)
Definition: server_callback.h:198
void StartSendInitialMetadata()
Do NOT call any operation initiation method (names that start with Start) until after the library has...
Definition: server_callback.h:247
::google::protobuf::util::Status Status
Definition: config_protobuf.h:96
void OnCancel() override
Notifies the application that this RPC has been cancelled.
Definition: server_callback.h:347
CallbackBidiHandler(std::function< experimental::ServerBidiReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:974
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:979
void Finish(Status s)
Definition: server_callback.h:368
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
void StartWriteLast(const Response *resp, WriteOptions options)
Definition: server_callback.h:409
ResponseT * response()
Definition: message_allocator.h:47
Definition: call_op_set.h:629
RequestT * request()
Definition: message_allocator.h:46
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status, void **handler_data) final
Definition: server_callback.h:829
Definition: call_op_set.h:218
CallbackServerStreamingHandler(std::function< experimental::ServerWriteReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:796
void SetMessageAllocator(experimental::MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback.h:481
void StartWrite(const Response *resp, WriteOptions options)
Definition: server_callback.h:402
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:144
void OnDone() override
Definition: server_callback.h:445
void StartRead(Request *req)
Initiate a read operation.
Definition: server_callback.h:253
Definition: call_op_set.h:288
virtual void OnStarted(::grpc_impl::ServerContext *context, Response *resp)
Similar to ServerBidiReactor::OnStarted, except that this also provides the response object that the ...
Definition: server_callback.h:376
Definition: server_callback.h:49
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void Finish(Status s)
Indicate that the stream is to be finished and the trailing metadata and RPC status are to be sent...
Definition: server_callback.h:308
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:108
void OnCancel() override
Definition: server_callback.h:382
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:51
Definition: message_allocator.h:40
Definition: server_callback.h:167
ReturnType * CatchingReactorCreator(Func &&func, Args &&... args)
Definition: callback_common.h:51
Definition: rpc_service_method.h:44
CallbackClientStreamingHandler(std::function< experimental::ServerReadReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:651
void OnStarted(::grpc_impl::ServerContext *) override
Notify the application that a streaming RPC has started and that it is now ok to call any operation i...
Definition: server_callback.h:466
Per-message write options.
Definition: call_op_set.h:85
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:656
Definition: byte_buffer.h:49
Definition: server_callback.h:81
void OnCancel() override
Definition: server_callback.h:426
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
virtual void OnWriteDone(bool ok)
Definition: server_callback.h:424
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
Definition: server_callback.h:182
bool ok() const
Is the status OK?
Definition: status.h:118
virtual void OnStarted(::grpc_impl::ServerContext *context)
Notify the application that a streaming RPC has started and that it is now ok to call any operation i...
Definition: server_callback.h:316
Base class for running an RPC handler.
Definition: rpc_service_method.h:41
Definition: server_callback.h:43
void StartWriteLast(const Response *resp, WriteOptions options)
Inform system of a planned write operation with specified options, but allow the library to schedule ...
Definition: server_callback.h:298
virtual void OnStarted(::grpc_impl::ServerContext *context, const Request *req)
Similar to ServerBidiReactor::OnStarted, except that this also provides the request object sent by th...
Definition: server_callback.h:419
Did it work? If it didn&#39;t, why?
Definition: status.h:31
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
void CatchingCallback(Func &&func, Args &&... args)
An exception-safe way of invoking a user-specified callback function.
Definition: callback_common.h:38
Definition: server_callback.h:112
void StartSendInitialMetadata()
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:366
virtual void OnSendInitialMetadataDone(bool ok)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:379
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:106
Definition: message_allocator.h:27
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
virtual ~ServerCallbackReader()
Definition: server_callback.h:169
void OnDone() override
Notifies the application that all operations associated with this RPC have completed.
Definition: server_callback.h:342
void StartWrite(const Response *resp)
Definition: server_callback.h:401
void StartSendInitialMetadata()
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:400
void Finish(Status s)
Definition: server_callback.h:412
void BindReactor(ServerReadReactor< Request, Response > *reactor)
Definition: server_callback.h:176
void OnDone() override
Definition: server_callback.h:381
virtual void OnReadDone(bool ok)
Notifies the application that a StartRead operation completed.
Definition: server_callback.h:330
A sequence of bytes.
Definition: byte_buffer.h:65
void OnDone() override
Notifies the application that all operations associated with this RPC have completed.
Definition: server_callback.h:465
virtual void WriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:189
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:486
Straightforward wrapping of the C call object.
Definition: call.h:38
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status, void **handler_data) final
Definition: server_callback.h:508
virtual void WriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:212
virtual ~ServerReactor()=default
Definition: server_callback.h:452
virtual ~ServerCallbackReaderWriter()
Definition: server_callback.h:206