GRPC C++  1.22.0-dev
server_callback.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
20 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
21 
22 #include <atomic>
23 #include <functional>
24 #include <type_traits>
25 
35 
36 namespace grpc {
37 
38 // Declare base class of all reactors as internal
39 namespace internal {
40 
41 // Forward declarations
42 template <class Request, class Response>
44 template <class Request, class Response>
46 template <class Request, class Response>
48 
50  public:
51  virtual ~ServerReactor() = default;
52  virtual void OnDone() = 0;
53  virtual void OnCancel() = 0;
54 
55  private:
56  friend class ::grpc::ServerContext;
57  template <class Request, class Response>
59  template <class Request, class Response>
61  template <class Request, class Response>
62  friend class CallbackBidiHandler;
63 
64  // The ServerReactor is responsible for tracking when it is safe to call
65  // OnCancel. This function should not be called until after OnStarted is done
66  // and the RPC has completed with a cancellation. This is tracked by counting
67  // how many of these conditions have been met and calling OnCancel when none
68  // remain unmet.
69 
70  void MaybeCallOnCancel() {
71  if (on_cancel_conditions_remaining_.fetch_sub(
72  1, std::memory_order_acq_rel) == 1) {
73  OnCancel();
74  }
75  }
76 
77  std::atomic_int on_cancel_conditions_remaining_{2};
78 };
79 
80 template <class Request, class Response>
82  : public experimental::MessageHolder<Request, Response> {
83  public:
85  this->set_request(&request_obj_);
86  this->set_response(&response_obj_);
87  }
88  void Release() override {
89  // the object is allocated in the call arena.
91  }
92 
93  private:
94  Request request_obj_;
95  Response response_obj_;
96 };
97 
98 } // namespace internal
99 
100 namespace experimental {
101 
102 // Forward declarations
103 template <class Request, class Response>
105 template <class Request, class Response>
107 template <class Request, class Response>
109 
110 // For unary RPCs, the exposed controller class is only an interface
111 // and the actual implementation is an internal class.
113  public:
114  virtual ~ServerCallbackRpcController() = default;
115 
116  // The method handler must call this function when it is done so that
117  // the library knows to free its resources
118  virtual void Finish(Status s) = 0;
119 
120  // Allow the method handler to push out the initial metadata before
121  // the response and status are ready
122  virtual void SendInitialMetadata(std::function<void(bool)>) = 0;
123 
155  virtual void SetCancelCallback(std::function<void()> callback) = 0;
156  virtual void ClearCancelCallback() = 0;
157 
158  // NOTE: This is an API for advanced users who need custom allocators.
159  // Get and maybe mutate the allocator state associated with the current RPC.
160  virtual RpcAllocatorState* GetRpcAllocatorState() = 0;
161 };
162 
163 // NOTE: The actual streaming object classes are provided
164 // as API only to support mocking. There are no implementations of
165 // these class interfaces in the API.
166 template <class Request>
168  public:
170  virtual void Finish(Status s) = 0;
171  virtual void SendInitialMetadata() = 0;
172  virtual void Read(Request* msg) = 0;
173 
174  protected:
175  template <class Response>
177  reactor->BindReader(this);
178  }
179 };
180 
181 template <class Response>
183  public:
185 
186  virtual void Finish(Status s) = 0;
187  virtual void SendInitialMetadata() = 0;
188  virtual void Write(const Response* msg, WriteOptions options) = 0;
189  virtual void WriteAndFinish(const Response* msg, WriteOptions options,
190  Status s) {
191  // Default implementation that can/should be overridden
192  Write(msg, std::move(options));
193  Finish(std::move(s));
194  }
195 
196  protected:
197  template <class Request>
199  reactor->BindWriter(this);
200  }
201 };
202 
203 template <class Request, class Response>
205  public:
207 
208  virtual void Finish(Status s) = 0;
209  virtual void SendInitialMetadata() = 0;
210  virtual void Read(Request* msg) = 0;
211  virtual void Write(const Response* msg, WriteOptions options) = 0;
212  virtual void WriteAndFinish(const Response* msg, WriteOptions options,
213  Status s) {
214  // Default implementation that can/should be overridden
215  Write(msg, std::move(options));
216  Finish(std::move(s));
217  }
218 
219  protected:
221  reactor->BindStream(this);
222  }
223 };
224 
225 // The following classes are the reactor interfaces that are to be implemented
226 // by the user, returned as the result of the method handler for a callback
227 // method, and activated by the call to OnStarted. The library guarantees that
228 // OnStarted will be called for any reactor that has been created using a
229 // method handler registered on a service. No operation initiation method may be
230 // called until after the call to OnStarted.
231 // Note that none of the classes are pure; all reactions have a default empty
232 // reaction so that the user class only needs to override those classes that it
233 // cares about.
234 
236 template <class Request, class Response>
238  public:
239  ~ServerBidiReactor() = default;
240 
243 
247  void StartSendInitialMetadata() { stream_->SendInitialMetadata(); }
248 
253  void StartRead(Request* req) { stream_->Read(req); }
254 
260  void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); }
261 
268  void StartWrite(const Response* resp, WriteOptions options) {
269  stream_->Write(resp, std::move(options));
270  }
271 
285  void StartWriteAndFinish(const Response* resp, WriteOptions options,
286  Status s) {
287  stream_->WriteAndFinish(resp, std::move(options), std::move(s));
288  }
289 
298  void StartWriteLast(const Response* resp, WriteOptions options) {
299  StartWrite(resp, std::move(options.set_last_message()));
300  }
301 
308  void Finish(Status s) { stream_->Finish(std::move(s)); }
309 
316  virtual void OnStarted(ServerContext* context) {}
317 
324  virtual void OnSendInitialMetadataDone(bool ok) {}
325 
330  virtual void OnReadDone(bool ok) {}
331 
337  virtual void OnWriteDone(bool ok) {}
338 
342  void OnDone() override {}
343 
347  void OnCancel() override {}
348 
349  private:
350  friend class ServerCallbackReaderWriter<Request, Response>;
351  void BindStream(ServerCallbackReaderWriter<Request, Response>* stream) {
352  stream_ = stream;
353  }
354 
356 };
357 
359 template <class Request, class Response>
361  public:
362  ~ServerReadReactor() = default;
363 
365  void StartSendInitialMetadata() { reader_->SendInitialMetadata(); }
366  void StartRead(Request* req) { reader_->Read(req); }
367  void Finish(Status s) { reader_->Finish(std::move(s)); }
368 
375  virtual void OnStarted(ServerContext* context, Response* resp) {}
376 
378  virtual void OnSendInitialMetadataDone(bool ok) {}
379  virtual void OnReadDone(bool ok) {}
380  void OnDone() override {}
381  void OnCancel() override {}
382 
383  private:
384  friend class ServerCallbackReader<Request>;
385  void BindReader(ServerCallbackReader<Request>* reader) { reader_ = reader; }
386 
388 };
389 
391 template <class Request, class Response>
393  public:
394  ~ServerWriteReactor() = default;
395 
397  void StartSendInitialMetadata() { writer_->SendInitialMetadata(); }
398  void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); }
399  void StartWrite(const Response* resp, WriteOptions options) {
400  writer_->Write(resp, std::move(options));
401  }
402  void StartWriteAndFinish(const Response* resp, WriteOptions options,
403  Status s) {
404  writer_->WriteAndFinish(resp, std::move(options), std::move(s));
405  }
406  void StartWriteLast(const Response* resp, WriteOptions options) {
407  StartWrite(resp, std::move(options.set_last_message()));
408  }
409  void Finish(Status s) { writer_->Finish(std::move(s)); }
410 
416  virtual void OnStarted(ServerContext* context, const Request* req) {}
417 
419  virtual void OnSendInitialMetadataDone(bool ok) {}
420  virtual void OnWriteDone(bool ok) {}
421  void OnDone() override {}
422  void OnCancel() override {}
423 
424  private:
425  friend class ServerCallbackWriter<Response>;
426  void BindWriter(ServerCallbackWriter<Response>* writer) { writer_ = writer; }
427 
429 };
430 
431 } // namespace experimental
432 
433 namespace internal {
434 
435 template <class Request, class Response>
437  : public experimental::ServerReadReactor<Request, Response> {
438  public:
439  void OnDone() override { delete this; }
440  void OnStarted(ServerContext*, Response*) override {
441  this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
442  }
443 };
444 
445 template <class Request, class Response>
447  : public experimental::ServerWriteReactor<Request, Response> {
448  public:
449  void OnDone() override { delete this; }
450  void OnStarted(ServerContext*, const Request*) override {
451  this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
452  }
453 };
454 
455 template <class Request, class Response>
457  : public experimental::ServerBidiReactor<Request, Response> {
458  public:
459  void OnDone() override { delete this; }
460  void OnStarted(ServerContext*) override {
461  this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
462  }
463 };
464 
465 template <class RequestType, class ResponseType>
466 class CallbackUnaryHandler : public MethodHandler {
467  public:
469  std::function<void(ServerContext*, const RequestType*, ResponseType*,
471  func)
472  : func_(func) {}
473 
476  allocator_ = allocator;
477  }
478 
479  void RunHandler(const HandlerParameter& param) final {
480  // Arena allocate a controller structure (that includes request/response)
481  g_core_codegen_interface->grpc_call_ref(param.call->call());
482  auto* allocator_state =
484  param.internal_data);
485  auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc(
486  param.call->call(), sizeof(ServerCallbackRpcControllerImpl)))
487  ServerCallbackRpcControllerImpl(param.server_context, param.call,
488  allocator_state,
489  std::move(param.call_requester));
490  Status status = param.status;
491  if (status.ok()) {
492  // Call the actual function handler and expect the user to call finish
493  CatchingCallback(func_, param.server_context, controller->request(),
494  controller->response(), controller);
495  } else {
496  // if deserialization failed, we need to fail the call
497  controller->Finish(status);
498  }
499  }
500 
501  void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status,
502  void** handler_data) final {
503  ByteBuffer buf;
504  buf.set_buffer(req);
505  RequestType* request = nullptr;
507  nullptr;
508  if (allocator_ != nullptr) {
509  allocator_state = allocator_->AllocateMessages();
510  } else {
511  allocator_state = new (g_core_codegen_interface->grpc_call_arena_alloc(
514  }
515  *handler_data = allocator_state;
516  request = allocator_state->request();
517  *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
518  buf.Release();
519  if (status->ok()) {
520  return request;
521  }
522  // Clean up on deserialization failure.
523  allocator_state->Release();
524  return nullptr;
525  }
526 
527  private:
528  std::function<void(ServerContext*, const RequestType*, ResponseType*,
530  func_;
532  nullptr;
533 
534  // The implementation class of ServerCallbackRpcController is a private member
535  // of CallbackUnaryHandler since it is never exposed anywhere, and this allows
536  // it to take advantage of CallbackUnaryHandler's friendships.
537  class ServerCallbackRpcControllerImpl
539  public:
540  void Finish(Status s) override {
541  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
542  &finish_ops_);
543  if (!ctx_->sent_initial_metadata_) {
544  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
545  ctx_->initial_metadata_flags());
546  if (ctx_->compression_level_set()) {
547  finish_ops_.set_compression_level(ctx_->compression_level());
548  }
549  ctx_->sent_initial_metadata_ = true;
550  }
551  // The response is dropped if the status is not OK.
552  if (s.ok()) {
553  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
554  finish_ops_.SendMessagePtr(response()));
555  } else {
556  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
557  }
558  finish_ops_.set_core_cq_tag(&finish_tag_);
559  call_.PerformOps(&finish_ops_);
560  }
561 
562  void SendInitialMetadata(std::function<void(bool)> f) override {
563  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
564  callbacks_outstanding_++;
565  // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14
566  // and if performance of this operation matters
567  meta_tag_.Set(call_.call(),
568  [this, f](bool ok) {
569  f(ok);
570  MaybeDone();
571  },
572  &meta_ops_);
573  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
574  ctx_->initial_metadata_flags());
575  if (ctx_->compression_level_set()) {
576  meta_ops_.set_compression_level(ctx_->compression_level());
577  }
578  ctx_->sent_initial_metadata_ = true;
579  meta_ops_.set_core_cq_tag(&meta_tag_);
580  call_.PerformOps(&meta_ops_);
581  }
582 
583  // Neither SetCancelCallback nor ClearCancelCallback should affect the
584  // callbacks_outstanding_ count since they are paired and both must precede
585  // the invocation of Finish (if they are used at all)
586  void SetCancelCallback(std::function<void()> callback) override {
587  ctx_->SetCancelCallback(std::move(callback));
588  }
589 
590  void ClearCancelCallback() override { ctx_->ClearCancelCallback(); }
591 
592  experimental::RpcAllocatorState* GetRpcAllocatorState() override {
593  return allocator_state_;
594  }
595 
596  private:
597  friend class CallbackUnaryHandler<RequestType, ResponseType>;
598 
599  ServerCallbackRpcControllerImpl(
600  ServerContext* ctx, Call* call,
602  std::function<void()> call_requester)
603  : ctx_(ctx),
604  call_(*call),
605  allocator_state_(allocator_state),
606  call_requester_(std::move(call_requester)) {
607  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr);
608  }
609 
610  const RequestType* request() { return allocator_state_->request(); }
611  ResponseType* response() { return allocator_state_->response(); }
612 
613  void MaybeDone() {
614  if (--callbacks_outstanding_ == 0) {
615  grpc_call* call = call_.call();
616  auto call_requester = std::move(call_requester_);
617  allocator_state_->Release();
618  this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor
620  call_requester();
621  }
622  }
623 
625  CallbackWithSuccessTag meta_tag_;
628  finish_ops_;
629  CallbackWithSuccessTag finish_tag_;
630 
631  ServerContext* ctx_;
632  Call call_;
634  allocator_state_;
635  std::function<void()> call_requester_;
636  std::atomic_int callbacks_outstanding_{
637  2}; // reserve for Finish and CompletionOp
638  };
639 };
640 
641 template <class RequestType, class ResponseType>
643  public:
645  std::function<
647  func)
648  : func_(std::move(func)) {}
649  void RunHandler(const HandlerParameter& param) final {
650  // Arena allocate a reader structure (that includes response)
651  g_core_codegen_interface->grpc_call_ref(param.call->call());
652 
654  param.status.ok()
657  func_)
658  : nullptr;
659 
660  if (reactor == nullptr) {
661  // if deserialization or reactor creator failed, we need to fail the call
663  }
664 
665  auto* reader = new (g_core_codegen_interface->grpc_call_arena_alloc(
666  param.call->call(), sizeof(ServerCallbackReaderImpl)))
667  ServerCallbackReaderImpl(param.server_context, param.call,
668  std::move(param.call_requester), reactor);
669 
670  reader->BindReactor(reactor);
671  reactor->OnStarted(param.server_context, reader->response());
672  // The earliest that OnCancel can be called is after OnStarted is done.
673  reactor->MaybeCallOnCancel();
674  reader->MaybeDone();
675  }
676 
677  private:
678  std::function<experimental::ServerReadReactor<RequestType, ResponseType>*()>
679  func_;
680 
681  class ServerCallbackReaderImpl
682  : public experimental::ServerCallbackReader<RequestType> {
683  public:
684  void Finish(Status s) override {
685  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
686  &finish_ops_);
687  if (!ctx_->sent_initial_metadata_) {
688  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
689  ctx_->initial_metadata_flags());
690  if (ctx_->compression_level_set()) {
691  finish_ops_.set_compression_level(ctx_->compression_level());
692  }
693  ctx_->sent_initial_metadata_ = true;
694  }
695  // The response is dropped if the status is not OK.
696  if (s.ok()) {
697  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
698  finish_ops_.SendMessagePtr(&resp_));
699  } else {
700  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
701  }
702  finish_ops_.set_core_cq_tag(&finish_tag_);
703  call_.PerformOps(&finish_ops_);
704  }
705 
706  void SendInitialMetadata() override {
707  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
708  callbacks_outstanding_++;
709  meta_tag_.Set(call_.call(),
710  [this](bool ok) {
711  reactor_->OnSendInitialMetadataDone(ok);
712  MaybeDone();
713  },
714  &meta_ops_);
715  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
716  ctx_->initial_metadata_flags());
717  if (ctx_->compression_level_set()) {
718  meta_ops_.set_compression_level(ctx_->compression_level());
719  }
720  ctx_->sent_initial_metadata_ = true;
721  meta_ops_.set_core_cq_tag(&meta_tag_);
722  call_.PerformOps(&meta_ops_);
723  }
724 
725  void Read(RequestType* req) override {
726  callbacks_outstanding_++;
727  read_ops_.RecvMessage(req);
728  call_.PerformOps(&read_ops_);
729  }
730 
731  private:
732  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
733 
734  ServerCallbackReaderImpl(
735  ServerContext* ctx, Call* call, std::function<void()> call_requester,
737  : ctx_(ctx),
738  call_(*call),
739  call_requester_(std::move(call_requester)),
740  reactor_(reactor) {
741  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
742  read_tag_.Set(call_.call(),
743  [this](bool ok) {
744  reactor_->OnReadDone(ok);
745  MaybeDone();
746  },
747  &read_ops_);
748  read_ops_.set_core_cq_tag(&read_tag_);
749  }
750 
751  ~ServerCallbackReaderImpl() {}
752 
753  ResponseType* response() { return &resp_; }
754 
755  void MaybeDone() {
756  if (--callbacks_outstanding_ == 0) {
757  reactor_->OnDone();
758  grpc_call* call = call_.call();
759  auto call_requester = std::move(call_requester_);
760  this->~ServerCallbackReaderImpl(); // explicitly call destructor
762  call_requester();
763  }
764  }
765 
767  CallbackWithSuccessTag meta_tag_;
770  finish_ops_;
771  CallbackWithSuccessTag finish_tag_;
773  CallbackWithSuccessTag read_tag_;
774 
775  ServerContext* ctx_;
776  Call call_;
777  ResponseType resp_;
778  std::function<void()> call_requester_;
780  std::atomic_int callbacks_outstanding_{
781  3}; // reserve for OnStarted, Finish, and CompletionOp
782  };
783 };
784 
785 template <class RequestType, class ResponseType>
787  public:
789  std::function<
791  func)
792  : func_(std::move(func)) {}
793  void RunHandler(const HandlerParameter& param) final {
794  // Arena allocate a writer structure
795  g_core_codegen_interface->grpc_call_ref(param.call->call());
796 
798  param.status.ok()
801  func_)
802  : nullptr;
803 
804  if (reactor == nullptr) {
805  // if deserialization or reactor creator failed, we need to fail the call
807  }
808 
809  auto* writer = new (g_core_codegen_interface->grpc_call_arena_alloc(
810  param.call->call(), sizeof(ServerCallbackWriterImpl)))
811  ServerCallbackWriterImpl(param.server_context, param.call,
812  static_cast<RequestType*>(param.request),
813  std::move(param.call_requester), reactor);
814  writer->BindReactor(reactor);
815  reactor->OnStarted(param.server_context, writer->request());
816  // The earliest that OnCancel can be called is after OnStarted is done.
817  reactor->MaybeCallOnCancel();
818  writer->MaybeDone();
819  }
820 
821  void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status,
822  void** handler_data) final {
823  ByteBuffer buf;
824  buf.set_buffer(req);
825  auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
826  call, sizeof(RequestType))) RequestType();
827  *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
828  buf.Release();
829  if (status->ok()) {
830  return request;
831  }
832  request->~RequestType();
833  return nullptr;
834  }
835 
836  private:
837  std::function<experimental::ServerWriteReactor<RequestType, ResponseType>*()>
838  func_;
839 
840  class ServerCallbackWriterImpl
841  : public experimental::ServerCallbackWriter<ResponseType> {
842  public:
843  void Finish(Status s) override {
844  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
845  &finish_ops_);
846  finish_ops_.set_core_cq_tag(&finish_tag_);
847 
848  if (!ctx_->sent_initial_metadata_) {
849  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
850  ctx_->initial_metadata_flags());
851  if (ctx_->compression_level_set()) {
852  finish_ops_.set_compression_level(ctx_->compression_level());
853  }
854  ctx_->sent_initial_metadata_ = true;
855  }
856  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
857  call_.PerformOps(&finish_ops_);
858  }
859 
860  void SendInitialMetadata() override {
861  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
862  callbacks_outstanding_++;
863  meta_tag_.Set(call_.call(),
864  [this](bool ok) {
865  reactor_->OnSendInitialMetadataDone(ok);
866  MaybeDone();
867  },
868  &meta_ops_);
869  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
870  ctx_->initial_metadata_flags());
871  if (ctx_->compression_level_set()) {
872  meta_ops_.set_compression_level(ctx_->compression_level());
873  }
874  ctx_->sent_initial_metadata_ = true;
875  meta_ops_.set_core_cq_tag(&meta_tag_);
876  call_.PerformOps(&meta_ops_);
877  }
878 
879  void Write(const ResponseType* resp, WriteOptions options) override {
880  callbacks_outstanding_++;
881  if (options.is_last_message()) {
882  options.set_buffer_hint();
883  }
884  if (!ctx_->sent_initial_metadata_) {
885  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
886  ctx_->initial_metadata_flags());
887  if (ctx_->compression_level_set()) {
888  write_ops_.set_compression_level(ctx_->compression_level());
889  }
890  ctx_->sent_initial_metadata_ = true;
891  }
892  // TODO(vjpai): don't assert
893  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
894  call_.PerformOps(&write_ops_);
895  }
896 
897  void WriteAndFinish(const ResponseType* resp, WriteOptions options,
898  Status s) override {
899  // This combines the write into the finish callback
900  // Don't send any message if the status is bad
901  if (s.ok()) {
902  // TODO(vjpai): don't assert
903  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
904  }
905  Finish(std::move(s));
906  }
907 
908  private:
909  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
910 
911  ServerCallbackWriterImpl(
912  ServerContext* ctx, Call* call, const RequestType* req,
913  std::function<void()> call_requester,
915  : ctx_(ctx),
916  call_(*call),
917  req_(req),
918  call_requester_(std::move(call_requester)),
919  reactor_(reactor) {
920  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
921  write_tag_.Set(call_.call(),
922  [this](bool ok) {
923  reactor_->OnWriteDone(ok);
924  MaybeDone();
925  },
926  &write_ops_);
927  write_ops_.set_core_cq_tag(&write_tag_);
928  }
929  ~ServerCallbackWriterImpl() { req_->~RequestType(); }
930 
931  const RequestType* request() { return req_; }
932 
933  void MaybeDone() {
934  if (--callbacks_outstanding_ == 0) {
935  reactor_->OnDone();
936  grpc_call* call = call_.call();
937  auto call_requester = std::move(call_requester_);
938  this->~ServerCallbackWriterImpl(); // explicitly call destructor
940  call_requester();
941  }
942  }
943 
945  CallbackWithSuccessTag meta_tag_;
948  finish_ops_;
949  CallbackWithSuccessTag finish_tag_;
951  CallbackWithSuccessTag write_tag_;
952 
953  ServerContext* ctx_;
954  Call call_;
955  const RequestType* req_;
956  std::function<void()> call_requester_;
958  std::atomic_int callbacks_outstanding_{
959  3}; // reserve for OnStarted, Finish, and CompletionOp
960  };
961 };
962 
963 template <class RequestType, class ResponseType>
964 class CallbackBidiHandler : public MethodHandler {
965  public:
967  std::function<
969  func)
970  : func_(std::move(func)) {}
971  void RunHandler(const HandlerParameter& param) final {
972  g_core_codegen_interface->grpc_call_ref(param.call->call());
973 
975  param.status.ok()
978  func_)
979  : nullptr;
980 
981  if (reactor == nullptr) {
982  // if deserialization or reactor creator failed, we need to fail the call
984  }
985 
986  auto* stream = new (g_core_codegen_interface->grpc_call_arena_alloc(
987  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
988  ServerCallbackReaderWriterImpl(param.server_context, param.call,
989  std::move(param.call_requester),
990  reactor);
991 
992  stream->BindReactor(reactor);
993  reactor->OnStarted(param.server_context);
994  // The earliest that OnCancel can be called is after OnStarted is done.
995  reactor->MaybeCallOnCancel();
996  stream->MaybeDone();
997  }
998 
999  private:
1000  std::function<experimental::ServerBidiReactor<RequestType, ResponseType>*()>
1001  func_;
1002 
1003  class ServerCallbackReaderWriterImpl
1004  : public experimental::ServerCallbackReaderWriter<RequestType,
1005  ResponseType> {
1006  public:
1007  void Finish(Status s) override {
1008  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
1009  &finish_ops_);
1010  finish_ops_.set_core_cq_tag(&finish_tag_);
1011 
1012  if (!ctx_->sent_initial_metadata_) {
1013  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1014  ctx_->initial_metadata_flags());
1015  if (ctx_->compression_level_set()) {
1016  finish_ops_.set_compression_level(ctx_->compression_level());
1017  }
1018  ctx_->sent_initial_metadata_ = true;
1019  }
1020  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
1021  call_.PerformOps(&finish_ops_);
1022  }
1023 
1024  void SendInitialMetadata() override {
1025  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
1026  callbacks_outstanding_++;
1027  meta_tag_.Set(call_.call(),
1028  [this](bool ok) {
1029  reactor_->OnSendInitialMetadataDone(ok);
1030  MaybeDone();
1031  },
1032  &meta_ops_);
1033  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1034  ctx_->initial_metadata_flags());
1035  if (ctx_->compression_level_set()) {
1036  meta_ops_.set_compression_level(ctx_->compression_level());
1037  }
1038  ctx_->sent_initial_metadata_ = true;
1039  meta_ops_.set_core_cq_tag(&meta_tag_);
1040  call_.PerformOps(&meta_ops_);
1041  }
1042 
1043  void Write(const ResponseType* resp, WriteOptions options) override {
1044  callbacks_outstanding_++;
1045  if (options.is_last_message()) {
1046  options.set_buffer_hint();
1047  }
1048  if (!ctx_->sent_initial_metadata_) {
1049  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1050  ctx_->initial_metadata_flags());
1051  if (ctx_->compression_level_set()) {
1052  write_ops_.set_compression_level(ctx_->compression_level());
1053  }
1054  ctx_->sent_initial_metadata_ = true;
1055  }
1056  // TODO(vjpai): don't assert
1057  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
1058  call_.PerformOps(&write_ops_);
1059  }
1060 
1061  void WriteAndFinish(const ResponseType* resp, WriteOptions options,
1062  Status s) override {
1063  // Don't send any message if the status is bad
1064  if (s.ok()) {
1065  // TODO(vjpai): don't assert
1066  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
1067  }
1068  Finish(std::move(s));
1069  }
1070 
1071  void Read(RequestType* req) override {
1072  callbacks_outstanding_++;
1073  read_ops_.RecvMessage(req);
1074  call_.PerformOps(&read_ops_);
1075  }
1076 
1077  private:
1078  friend class CallbackBidiHandler<RequestType, ResponseType>;
1079 
1080  ServerCallbackReaderWriterImpl(
1081  ServerContext* ctx, Call* call, std::function<void()> call_requester,
1083  : ctx_(ctx),
1084  call_(*call),
1085  call_requester_(std::move(call_requester)),
1086  reactor_(reactor) {
1087  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
1088  write_tag_.Set(call_.call(),
1089  [this](bool ok) {
1090  reactor_->OnWriteDone(ok);
1091  MaybeDone();
1092  },
1093  &write_ops_);
1094  write_ops_.set_core_cq_tag(&write_tag_);
1095  read_tag_.Set(call_.call(),
1096  [this](bool ok) {
1097  reactor_->OnReadDone(ok);
1098  MaybeDone();
1099  },
1100  &read_ops_);
1101  read_ops_.set_core_cq_tag(&read_tag_);
1102  }
1103  ~ServerCallbackReaderWriterImpl() {}
1104 
1105  void MaybeDone() {
1106  if (--callbacks_outstanding_ == 0) {
1107  reactor_->OnDone();
1108  grpc_call* call = call_.call();
1109  auto call_requester = std::move(call_requester_);
1110  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
1112  call_requester();
1113  }
1114  }
1115 
1117  CallbackWithSuccessTag meta_tag_;
1120  finish_ops_;
1121  CallbackWithSuccessTag finish_tag_;
1123  CallbackWithSuccessTag write_tag_;
1125  CallbackWithSuccessTag read_tag_;
1126 
1127  ServerContext* ctx_;
1128  Call call_;
1129  std::function<void()> call_requester_;
1131  std::atomic_int callbacks_outstanding_{
1132  3}; // reserve for OnStarted, Finish, and CompletionOp
1133  };
1134 };
1135 
1136 } // namespace internal
1137 
1138 } // namespace grpc
1139 
1140 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
void OnDone() override
Definition: server_callback.h:449
void Release() override
Definition: server_callback.h:88
virtual void OnStarted(ServerContext *context, Response *resp)
Similar to ServerBidiReactor::OnStarted, except that this also provides the response object that the ...
Definition: server_callback.h:375
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:145
virtual void grpc_call_ref(grpc_call *call)=0
Definition: server_callback.h:456
void StartWriteAndFinish(const Response *resp, WriteOptions options, Status s)
Initiate a write operation with specified options and final RPC Status, which also causes any trailin...
Definition: server_callback.h:285
void BindReactor(ServerBidiReactor< Request, Response > *reactor)
Definition: server_callback.h:220
virtual void OnSendInitialMetadataDone(bool ok)
Notifies the application that an explicit StartSendInitialMetadata operation completed.
Definition: server_callback.h:324
Definition: server_callback.h:47
void StartWrite(const Response *resp, WriteOptions options)
Initiate a write operation with specified options.
Definition: server_callback.h:268
virtual ~ServerCallbackWriter()
Definition: server_callback.h:184
Definition: server_callback.h:436
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
virtual void OnReadDone(bool ok)
Definition: server_callback.h:379
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:826
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:793
void StartRead(Request *req)
Definition: server_callback.h:366
virtual void OnSendInitialMetadataDone(bool ok)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:419
virtual void grpc_call_unref(grpc_call *call)=0
DefaultMessageHolder()
Definition: server_callback.h:84
virtual void SendInitialMetadata(std::function< void(bool)>)=0
Definition: async_unary_call.h:303
void StartWriteAndFinish(const Response *resp, WriteOptions options, Status s)
Definition: server_callback.h:402
void OnDone() override
Definition: server_callback.h:421
Definition: grpc_types.h:40
virtual void OnWriteDone(bool ok)
Notifies the application that a StartWrite (or StartWriteLast) operation completed.
Definition: server_callback.h:337
grpc_call * call() const
Definition: call.h:72
Definition: server_callback.h:204
void StartWrite(const Response *resp)
Initiate a write operation.
Definition: server_callback.h:260
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:104
void BindReactor(ServerWriteReactor< Request, Response > *reactor)
Definition: server_callback.h:198
void StartSendInitialMetadata()
Do NOT call any operation initiation method (names that start with Start) until after the library has...
Definition: server_callback.h:247
::google::protobuf::util::Status Status
Definition: config_protobuf.h:96
void OnCancel() override
Notifies the application that this RPC has been cancelled.
Definition: server_callback.h:347
CallbackBidiHandler(std::function< experimental::ServerBidiReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:966
virtual void OnStarted(ServerContext *context, const Request *req)
Similar to ServerBidiReactor::OnStarted, except that this also provides the request object sent by th...
Definition: server_callback.h:416
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:971
void Finish(Status s)
Definition: server_callback.h:367
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
virtual void OnStarted(ServerContext *context)
Notify the application that a streaming RPC has started and that it is now ok to call any operation i...
Definition: server_callback.h:316
void StartWriteLast(const Response *resp, WriteOptions options)
Definition: server_callback.h:406
ResponseT * response()
Definition: message_allocator.h:47
Definition: call_op_set.h:634
RequestT * request()
Definition: message_allocator.h:46
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status, void **handler_data) final
Definition: server_callback.h:821
Definition: call_op_set.h:223
CallbackServerStreamingHandler(std::function< experimental::ServerWriteReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:788
void SetMessageAllocator(experimental::MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback.h:474
void StartWrite(const Response *resp, WriteOptions options)
Definition: server_callback.h:399
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:144
void OnDone() override
Definition: server_callback.h:439
void StartRead(Request *req)
Initiate a read operation.
Definition: server_callback.h:253
Definition: call_op_set.h:293
Definition: server_callback.h:49
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void Finish(Status s)
Indicate that the stream is to be finished and the trailing metadata and RPC status are to be sent...
Definition: server_callback.h:308
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:108
void OnCancel() override
Definition: server_callback.h:381
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:51
Definition: message_allocator.h:40
Definition: server_callback.h:167
ReturnType * CatchingReactorCreator(Func &&func, Args &&... args)
Definition: callback_common.h:51
Definition: rpc_service_method.h:42
CallbackClientStreamingHandler(std::function< experimental::ServerReadReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:644
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:114
Per-message write options.
Definition: call_op_set.h:85
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:649
Definition: byte_buffer.h:49
Definition: server_callback.h:81
void OnCancel() override
Definition: server_callback.h:422
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
virtual void OnWriteDone(bool ok)
Definition: server_callback.h:420
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
Definition: server_callback.h:182
bool ok() const
Is the status OK?
Definition: status.h:118
Base class for running an RPC handler.
Definition: rpc_service_method.h:39
Definition: server_callback.h:43
void StartWriteLast(const Response *resp, WriteOptions options)
Inform system of a planned write operation with specified options, but allow the library to schedule ...
Definition: server_callback.h:298
void OnStarted(ServerContext *, Response *) override
Similar to ServerBidiReactor::OnStarted, except that this also provides the response object that the ...
Definition: server_callback.h:440
Did it work? If it didn&#39;t, why?
Definition: status.h:31
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
void CatchingCallback(Func &&func, Args &&... args)
An exception-safe way of invoking a user-specified callback function.
Definition: callback_common.h:38
Definition: server_callback.h:112
void StartSendInitialMetadata()
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:365
virtual void OnSendInitialMetadataDone(bool ok)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:378
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:106
Definition: message_allocator.h:27
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
void OnStarted(ServerContext *) override
Notify the application that a streaming RPC has started and that it is now ok to call any operation i...
Definition: server_callback.h:460
virtual ~ServerCallbackReader()
Definition: server_callback.h:169
void OnDone() override
Notifies the application that all operations associated with this RPC have completed.
Definition: server_callback.h:342
void StartWrite(const Response *resp)
Definition: server_callback.h:398
void StartSendInitialMetadata()
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:397
void Finish(Status s)
Definition: server_callback.h:409
void BindReactor(ServerReadReactor< Request, Response > *reactor)
Definition: server_callback.h:176
void OnDone() override
Definition: server_callback.h:380
virtual void OnReadDone(bool ok)
Notifies the application that a StartRead operation completed.
Definition: server_callback.h:330
A sequence of bytes.
Definition: byte_buffer.h:65
void OnDone() override
Notifies the application that all operations associated with this RPC have completed.
Definition: server_callback.h:459
virtual void WriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:189
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:479
Straightforward wrapping of the C call object.
Definition: call.h:38
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status, void **handler_data) final
Definition: server_callback.h:501
virtual void WriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:212
virtual ~ServerReactor()=default
void OnStarted(ServerContext *, const Request *) override
Similar to ServerBidiReactor::OnStarted, except that this also provides the request object sent by th...
Definition: server_callback.h:450
Definition: server_callback.h:446
virtual ~ServerCallbackReaderWriter()
Definition: server_callback.h:206
CallbackUnaryHandler(std::function< void(ServerContext *, const RequestType *, ResponseType *, experimental::ServerCallbackRpcController *)> func)
Definition: server_callback.h:468