GRPC C++  1.20.0
server_callback.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
20 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
21 
22 #include <atomic>
23 #include <functional>
24 #include <type_traits>
25 
34 
35 namespace grpc {
36 
37 // Declare base class of all reactors as internal
38 namespace internal {
39 
41  public:
42  virtual ~ServerReactor() = default;
43  virtual void OnDone() = 0;
44  virtual void OnCancel() = 0;
45 };
46 
47 } // namespace internal
48 
49 namespace experimental {
50 
51 // Forward declarations
52 template <class Request, class Response>
54 template <class Request, class Response>
56 template <class Request, class Response>
58 
59 // For unary RPCs, the exposed controller class is only an interface
60 // and the actual implementation is an internal class.
62  public:
63  virtual ~ServerCallbackRpcController() = default;
64 
65  // The method handler must call this function when it is done so that
66  // the library knows to free its resources
67  virtual void Finish(Status s) = 0;
68 
69  // Allow the method handler to push out the initial metadata before
70  // the response and status are ready
71  virtual void SendInitialMetadata(std::function<void(bool)>) = 0;
72 
104  virtual void SetCancelCallback(std::function<void()> callback) = 0;
105  virtual void ClearCancelCallback() = 0;
106 };
107 
108 // NOTE: The actual streaming object classes are provided
109 // as API only to support mocking. There are no implementations of
110 // these class interfaces in the API.
111 template <class Request>
113  public:
115  virtual void Finish(Status s) = 0;
116  virtual void SendInitialMetadata() = 0;
117  virtual void Read(Request* msg) = 0;
118 
119  protected:
120  template <class Response>
122  reactor->BindReader(this);
123  }
124 };
125 
126 template <class Response>
128  public:
130 
131  virtual void Finish(Status s) = 0;
132  virtual void SendInitialMetadata() = 0;
133  virtual void Write(const Response* msg, WriteOptions options) = 0;
134  virtual void WriteAndFinish(const Response* msg, WriteOptions options,
135  Status s) {
136  // Default implementation that can/should be overridden
137  Write(msg, std::move(options));
138  Finish(std::move(s));
139  }
140 
141  protected:
142  template <class Request>
144  reactor->BindWriter(this);
145  }
146 };
147 
148 template <class Request, class Response>
150  public:
152 
153  virtual void Finish(Status s) = 0;
154  virtual void SendInitialMetadata() = 0;
155  virtual void Read(Request* msg) = 0;
156  virtual void Write(const Response* msg, WriteOptions options) = 0;
157  virtual void WriteAndFinish(const Response* msg, WriteOptions options,
158  Status s) {
159  // Default implementation that can/should be overridden
160  Write(msg, std::move(options));
161  Finish(std::move(s));
162  }
163 
164  protected:
166  reactor->BindStream(this);
167  }
168 };
169 
170 // The following classes are the reactor interfaces that are to be implemented
171 // by the user, returned as the result of the method handler for a callback
172 // method, and activated by the call to OnStarted. The library guarantees that
173 // OnStarted will be called for any reactor that has been created using a
174 // method handler registered on a service. No operation initiation method may be
175 // called until after the call to OnStarted.
176 // Note that none of the classes are pure; all reactions have a default empty
177 // reaction so that the user class only needs to override those classes that it
178 // cares about.
179 
181 template <class Request, class Response>
183  public:
184  ~ServerBidiReactor() = default;
185 
188 
192  void StartSendInitialMetadata() { stream_->SendInitialMetadata(); }
193 
198  void StartRead(Request* req) { stream_->Read(req); }
199 
205  void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); }
206 
213  void StartWrite(const Response* resp, WriteOptions options) {
214  stream_->Write(resp, std::move(options));
215  }
216 
230  void StartWriteAndFinish(const Response* resp, WriteOptions options,
231  Status s) {
232  stream_->WriteAndFinish(resp, std::move(options), std::move(s));
233  }
234 
243  void StartWriteLast(const Response* resp, WriteOptions options) {
244  StartWrite(resp, std::move(options.set_last_message()));
245  }
246 
253  void Finish(Status s) { stream_->Finish(std::move(s)); }
254 
259  virtual void OnStarted(ServerContext* context) {}
260 
267  virtual void OnSendInitialMetadataDone(bool ok) {}
268 
273  virtual void OnReadDone(bool ok) {}
274 
280  virtual void OnWriteDone(bool ok) {}
281 
285  void OnDone() override {}
286 
290  void OnCancel() override {}
291 
292  private:
293  friend class ServerCallbackReaderWriter<Request, Response>;
294  void BindStream(ServerCallbackReaderWriter<Request, Response>* stream) {
295  stream_ = stream;
296  }
297 
299 };
300 
302 template <class Request, class Response>
304  public:
305  ~ServerReadReactor() = default;
306 
308  void StartSendInitialMetadata() { reader_->SendInitialMetadata(); }
309  void StartRead(Request* req) { reader_->Read(req); }
310  void Finish(Status s) { reader_->Finish(std::move(s)); }
311 
318  virtual void OnStarted(ServerContext* context, Response* resp) {}
319 
321  virtual void OnSendInitialMetadataDone(bool ok) {}
322  virtual void OnReadDone(bool ok) {}
323  void OnDone() override {}
324  void OnCancel() override {}
325 
326  private:
327  friend class ServerCallbackReader<Request>;
328  void BindReader(ServerCallbackReader<Request>* reader) { reader_ = reader; }
329 
331 };
332 
334 template <class Request, class Response>
336  public:
337  ~ServerWriteReactor() = default;
338 
340  void StartSendInitialMetadata() { writer_->SendInitialMetadata(); }
341  void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); }
342  void StartWrite(const Response* resp, WriteOptions options) {
343  writer_->Write(resp, std::move(options));
344  }
345  void StartWriteAndFinish(const Response* resp, WriteOptions options,
346  Status s) {
347  writer_->WriteAndFinish(resp, std::move(options), std::move(s));
348  }
349  void StartWriteLast(const Response* resp, WriteOptions options) {
350  StartWrite(resp, std::move(options.set_last_message()));
351  }
352  void Finish(Status s) { writer_->Finish(std::move(s)); }
353 
359  virtual void OnStarted(ServerContext* context, const Request* req) {}
360 
362  virtual void OnSendInitialMetadataDone(bool ok) {}
363  virtual void OnWriteDone(bool ok) {}
364  void OnDone() override {}
365  void OnCancel() override {}
366 
367  private:
368  friend class ServerCallbackWriter<Response>;
369  void BindWriter(ServerCallbackWriter<Response>* writer) { writer_ = writer; }
370 
372 };
373 
374 } // namespace experimental
375 
376 namespace internal {
377 
378 template <class Request, class Response>
380  : public experimental::ServerReadReactor<Request, Response> {
381  public:
382  void OnDone() override { delete this; }
383  void OnStarted(ServerContext*, Response*) override {
384  this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
385  }
386 };
387 
388 template <class Request, class Response>
390  : public experimental::ServerWriteReactor<Request, Response> {
391  public:
392  void OnDone() override { delete this; }
393  void OnStarted(ServerContext*, const Request*) override {
394  this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
395  }
396 };
397 
398 template <class Request, class Response>
400  : public experimental::ServerBidiReactor<Request, Response> {
401  public:
402  void OnDone() override { delete this; }
403  void OnStarted(ServerContext*) override {
404  this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
405  }
406 };
407 
408 template <class RequestType, class ResponseType>
409 class CallbackUnaryHandler : public MethodHandler {
410  public:
412  std::function<void(ServerContext*, const RequestType*, ResponseType*,
414  func)
415  : func_(func) {}
416  void RunHandler(const HandlerParameter& param) final {
417  // Arena allocate a controller structure (that includes request/response)
418  g_core_codegen_interface->grpc_call_ref(param.call->call());
419  auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc(
420  param.call->call(), sizeof(ServerCallbackRpcControllerImpl)))
421  ServerCallbackRpcControllerImpl(
422  param.server_context, param.call,
423  static_cast<RequestType*>(param.request),
424  std::move(param.call_requester));
425  Status status = param.status;
426 
427  if (status.ok()) {
428  // Call the actual function handler and expect the user to call finish
429  CatchingCallback(func_, param.server_context, controller->request(),
430  controller->response(), controller);
431  } else {
432  // if deserialization failed, we need to fail the call
433  controller->Finish(status);
434  }
435  }
436 
438  Status* status) final {
439  ByteBuffer buf;
440  buf.set_buffer(req);
441  auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
442  call, sizeof(RequestType))) RequestType();
443  *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
444  buf.Release();
445  if (status->ok()) {
446  return request;
447  }
448  request->~RequestType();
449  return nullptr;
450  }
451 
452  private:
453  std::function<void(ServerContext*, const RequestType*, ResponseType*,
455  func_;
456 
457  // The implementation class of ServerCallbackRpcController is a private member
458  // of CallbackUnaryHandler since it is never exposed anywhere, and this allows
459  // it to take advantage of CallbackUnaryHandler's friendships.
460  class ServerCallbackRpcControllerImpl
462  public:
463  void Finish(Status s) override {
464  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
465  &finish_ops_);
466  if (!ctx_->sent_initial_metadata_) {
467  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
468  ctx_->initial_metadata_flags());
469  if (ctx_->compression_level_set()) {
470  finish_ops_.set_compression_level(ctx_->compression_level());
471  }
472  ctx_->sent_initial_metadata_ = true;
473  }
474  // The response is dropped if the status is not OK.
475  if (s.ok()) {
476  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
477  finish_ops_.SendMessagePtr(&resp_));
478  } else {
479  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
480  }
481  finish_ops_.set_core_cq_tag(&finish_tag_);
482  call_.PerformOps(&finish_ops_);
483  }
484 
485  void SendInitialMetadata(std::function<void(bool)> f) override {
486  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
487  callbacks_outstanding_++;
488  // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14
489  // and if performance of this operation matters
490  meta_tag_.Set(call_.call(),
491  [this, f](bool ok) {
492  f(ok);
493  MaybeDone();
494  },
495  &meta_ops_);
496  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
497  ctx_->initial_metadata_flags());
498  if (ctx_->compression_level_set()) {
499  meta_ops_.set_compression_level(ctx_->compression_level());
500  }
501  ctx_->sent_initial_metadata_ = true;
502  meta_ops_.set_core_cq_tag(&meta_tag_);
503  call_.PerformOps(&meta_ops_);
504  }
505 
506  // Neither SetCancelCallback nor ClearCancelCallback should affect the
507  // callbacks_outstanding_ count since they are paired and both must precede
508  // the invocation of Finish (if they are used at all)
509  void SetCancelCallback(std::function<void()> callback) override {
510  ctx_->SetCancelCallback(std::move(callback));
511  }
512 
513  void ClearCancelCallback() override { ctx_->ClearCancelCallback(); }
514 
515  private:
516  friend class CallbackUnaryHandler<RequestType, ResponseType>;
517 
518  ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call,
519  const RequestType* req,
520  std::function<void()> call_requester)
521  : ctx_(ctx),
522  call_(*call),
523  req_(req),
524  call_requester_(std::move(call_requester)) {
525  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr);
526  }
527 
528  ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); }
529 
530  const RequestType* request() { return req_; }
531  ResponseType* response() { return &resp_; }
532 
533  void MaybeDone() {
534  if (--callbacks_outstanding_ == 0) {
535  grpc_call* call = call_.call();
536  auto call_requester = std::move(call_requester_);
537  this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor
539  call_requester();
540  }
541  }
542 
544  CallbackWithSuccessTag meta_tag_;
547  finish_ops_;
548  CallbackWithSuccessTag finish_tag_;
549 
550  ServerContext* ctx_;
551  Call call_;
552  const RequestType* req_;
553  ResponseType resp_;
554  std::function<void()> call_requester_;
555  std::atomic_int callbacks_outstanding_{
556  2}; // reserve for Finish and CompletionOp
557  };
558 };
559 
560 template <class RequestType, class ResponseType>
562  public:
564  std::function<
566  func)
567  : func_(std::move(func)) {}
568  void RunHandler(const HandlerParameter& param) final {
569  // Arena allocate a reader structure (that includes response)
570  g_core_codegen_interface->grpc_call_ref(param.call->call());
571 
573  param.status.ok()
576  func_)
577  : nullptr;
578 
579  if (reactor == nullptr) {
580  // if deserialization or reactor creator failed, we need to fail the call
582  }
583 
584  auto* reader = new (g_core_codegen_interface->grpc_call_arena_alloc(
585  param.call->call(), sizeof(ServerCallbackReaderImpl)))
586  ServerCallbackReaderImpl(param.server_context, param.call,
587  std::move(param.call_requester), reactor);
588 
589  reader->BindReactor(reactor);
590  reactor->OnStarted(param.server_context, reader->response());
591  reader->MaybeDone();
592  }
593 
594  private:
595  std::function<experimental::ServerReadReactor<RequestType, ResponseType>*()>
596  func_;
597 
598  class ServerCallbackReaderImpl
599  : public experimental::ServerCallbackReader<RequestType> {
600  public:
601  void Finish(Status s) override {
602  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
603  &finish_ops_);
604  if (!ctx_->sent_initial_metadata_) {
605  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
606  ctx_->initial_metadata_flags());
607  if (ctx_->compression_level_set()) {
608  finish_ops_.set_compression_level(ctx_->compression_level());
609  }
610  ctx_->sent_initial_metadata_ = true;
611  }
612  // The response is dropped if the status is not OK.
613  if (s.ok()) {
614  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
615  finish_ops_.SendMessagePtr(&resp_));
616  } else {
617  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
618  }
619  finish_ops_.set_core_cq_tag(&finish_tag_);
620  call_.PerformOps(&finish_ops_);
621  }
622 
623  void SendInitialMetadata() override {
624  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
625  callbacks_outstanding_++;
626  meta_tag_.Set(call_.call(),
627  [this](bool ok) {
628  reactor_->OnSendInitialMetadataDone(ok);
629  MaybeDone();
630  },
631  &meta_ops_);
632  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
633  ctx_->initial_metadata_flags());
634  if (ctx_->compression_level_set()) {
635  meta_ops_.set_compression_level(ctx_->compression_level());
636  }
637  ctx_->sent_initial_metadata_ = true;
638  meta_ops_.set_core_cq_tag(&meta_tag_);
639  call_.PerformOps(&meta_ops_);
640  }
641 
642  void Read(RequestType* req) override {
643  callbacks_outstanding_++;
644  read_ops_.RecvMessage(req);
645  call_.PerformOps(&read_ops_);
646  }
647 
648  private:
649  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
650 
651  ServerCallbackReaderImpl(
652  ServerContext* ctx, Call* call, std::function<void()> call_requester,
654  : ctx_(ctx),
655  call_(*call),
656  call_requester_(std::move(call_requester)),
657  reactor_(reactor) {
658  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
659  read_tag_.Set(call_.call(),
660  [this](bool ok) {
661  reactor_->OnReadDone(ok);
662  MaybeDone();
663  },
664  &read_ops_);
665  read_ops_.set_core_cq_tag(&read_tag_);
666  }
667 
668  ~ServerCallbackReaderImpl() {}
669 
670  ResponseType* response() { return &resp_; }
671 
672  void MaybeDone() {
673  if (--callbacks_outstanding_ == 0) {
674  reactor_->OnDone();
675  grpc_call* call = call_.call();
676  auto call_requester = std::move(call_requester_);
677  this->~ServerCallbackReaderImpl(); // explicitly call destructor
679  call_requester();
680  }
681  }
682 
684  CallbackWithSuccessTag meta_tag_;
687  finish_ops_;
688  CallbackWithSuccessTag finish_tag_;
690  CallbackWithSuccessTag read_tag_;
691 
692  ServerContext* ctx_;
693  Call call_;
694  ResponseType resp_;
695  std::function<void()> call_requester_;
697  std::atomic_int callbacks_outstanding_{
698  3}; // reserve for OnStarted, Finish, and CompletionOp
699  };
700 };
701 
702 template <class RequestType, class ResponseType>
704  public:
706  std::function<
708  func)
709  : func_(std::move(func)) {}
710  void RunHandler(const HandlerParameter& param) final {
711  // Arena allocate a writer structure
712  g_core_codegen_interface->grpc_call_ref(param.call->call());
713 
715  param.status.ok()
718  func_)
719  : nullptr;
720 
721  if (reactor == nullptr) {
722  // if deserialization or reactor creator failed, we need to fail the call
724  }
725 
726  auto* writer = new (g_core_codegen_interface->grpc_call_arena_alloc(
727  param.call->call(), sizeof(ServerCallbackWriterImpl)))
728  ServerCallbackWriterImpl(param.server_context, param.call,
729  static_cast<RequestType*>(param.request),
730  std::move(param.call_requester), reactor);
731  writer->BindReactor(reactor);
732  reactor->OnStarted(param.server_context, writer->request());
733  writer->MaybeDone();
734  }
735 
737  Status* status) final {
738  ByteBuffer buf;
739  buf.set_buffer(req);
740  auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
741  call, sizeof(RequestType))) RequestType();
742  *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
743  buf.Release();
744  if (status->ok()) {
745  return request;
746  }
747  request->~RequestType();
748  return nullptr;
749  }
750 
751  private:
752  std::function<experimental::ServerWriteReactor<RequestType, ResponseType>*()>
753  func_;
754 
755  class ServerCallbackWriterImpl
756  : public experimental::ServerCallbackWriter<ResponseType> {
757  public:
758  void Finish(Status s) override {
759  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
760  &finish_ops_);
761  finish_ops_.set_core_cq_tag(&finish_tag_);
762 
763  if (!ctx_->sent_initial_metadata_) {
764  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
765  ctx_->initial_metadata_flags());
766  if (ctx_->compression_level_set()) {
767  finish_ops_.set_compression_level(ctx_->compression_level());
768  }
769  ctx_->sent_initial_metadata_ = true;
770  }
771  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
772  call_.PerformOps(&finish_ops_);
773  }
774 
775  void SendInitialMetadata() override {
776  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
777  callbacks_outstanding_++;
778  meta_tag_.Set(call_.call(),
779  [this](bool ok) {
780  reactor_->OnSendInitialMetadataDone(ok);
781  MaybeDone();
782  },
783  &meta_ops_);
784  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
785  ctx_->initial_metadata_flags());
786  if (ctx_->compression_level_set()) {
787  meta_ops_.set_compression_level(ctx_->compression_level());
788  }
789  ctx_->sent_initial_metadata_ = true;
790  meta_ops_.set_core_cq_tag(&meta_tag_);
791  call_.PerformOps(&meta_ops_);
792  }
793 
794  void Write(const ResponseType* resp, WriteOptions options) override {
795  callbacks_outstanding_++;
796  if (options.is_last_message()) {
797  options.set_buffer_hint();
798  }
799  if (!ctx_->sent_initial_metadata_) {
800  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
801  ctx_->initial_metadata_flags());
802  if (ctx_->compression_level_set()) {
803  write_ops_.set_compression_level(ctx_->compression_level());
804  }
805  ctx_->sent_initial_metadata_ = true;
806  }
807  // TODO(vjpai): don't assert
808  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
809  call_.PerformOps(&write_ops_);
810  }
811 
812  void WriteAndFinish(const ResponseType* resp, WriteOptions options,
813  Status s) override {
814  // This combines the write into the finish callback
815  // Don't send any message if the status is bad
816  if (s.ok()) {
817  // TODO(vjpai): don't assert
818  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
819  }
820  Finish(std::move(s));
821  }
822 
823  private:
824  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
825 
826  ServerCallbackWriterImpl(
827  ServerContext* ctx, Call* call, const RequestType* req,
828  std::function<void()> call_requester,
830  : ctx_(ctx),
831  call_(*call),
832  req_(req),
833  call_requester_(std::move(call_requester)),
834  reactor_(reactor) {
835  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
836  write_tag_.Set(call_.call(),
837  [this](bool ok) {
838  reactor_->OnWriteDone(ok);
839  MaybeDone();
840  },
841  &write_ops_);
842  write_ops_.set_core_cq_tag(&write_tag_);
843  }
844  ~ServerCallbackWriterImpl() { req_->~RequestType(); }
845 
846  const RequestType* request() { return req_; }
847 
848  void MaybeDone() {
849  if (--callbacks_outstanding_ == 0) {
850  reactor_->OnDone();
851  grpc_call* call = call_.call();
852  auto call_requester = std::move(call_requester_);
853  this->~ServerCallbackWriterImpl(); // explicitly call destructor
855  call_requester();
856  }
857  }
858 
860  CallbackWithSuccessTag meta_tag_;
863  finish_ops_;
864  CallbackWithSuccessTag finish_tag_;
866  CallbackWithSuccessTag write_tag_;
867 
868  ServerContext* ctx_;
869  Call call_;
870  const RequestType* req_;
871  std::function<void()> call_requester_;
873  std::atomic_int callbacks_outstanding_{
874  3}; // reserve for OnStarted, Finish, and CompletionOp
875  };
876 };
877 
878 template <class RequestType, class ResponseType>
880  public:
882  std::function<
884  func)
885  : func_(std::move(func)) {}
886  void RunHandler(const HandlerParameter& param) final {
887  g_core_codegen_interface->grpc_call_ref(param.call->call());
888 
890  param.status.ok()
893  func_)
894  : nullptr;
895 
896  if (reactor == nullptr) {
897  // if deserialization or reactor creator failed, we need to fail the call
899  }
900 
901  auto* stream = new (g_core_codegen_interface->grpc_call_arena_alloc(
902  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
903  ServerCallbackReaderWriterImpl(param.server_context, param.call,
904  std::move(param.call_requester),
905  reactor);
906 
907  stream->BindReactor(reactor);
908  reactor->OnStarted(param.server_context);
909  stream->MaybeDone();
910  }
911 
912  private:
913  std::function<experimental::ServerBidiReactor<RequestType, ResponseType>*()>
914  func_;
915 
916  class ServerCallbackReaderWriterImpl
917  : public experimental::ServerCallbackReaderWriter<RequestType,
918  ResponseType> {
919  public:
920  void Finish(Status s) override {
921  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
922  &finish_ops_);
923  finish_ops_.set_core_cq_tag(&finish_tag_);
924 
925  if (!ctx_->sent_initial_metadata_) {
926  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
927  ctx_->initial_metadata_flags());
928  if (ctx_->compression_level_set()) {
929  finish_ops_.set_compression_level(ctx_->compression_level());
930  }
931  ctx_->sent_initial_metadata_ = true;
932  }
933  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
934  call_.PerformOps(&finish_ops_);
935  }
936 
937  void SendInitialMetadata() override {
938  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
939  callbacks_outstanding_++;
940  meta_tag_.Set(call_.call(),
941  [this](bool ok) {
942  reactor_->OnSendInitialMetadataDone(ok);
943  MaybeDone();
944  },
945  &meta_ops_);
946  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
947  ctx_->initial_metadata_flags());
948  if (ctx_->compression_level_set()) {
949  meta_ops_.set_compression_level(ctx_->compression_level());
950  }
951  ctx_->sent_initial_metadata_ = true;
952  meta_ops_.set_core_cq_tag(&meta_tag_);
953  call_.PerformOps(&meta_ops_);
954  }
955 
956  void Write(const ResponseType* resp, WriteOptions options) override {
957  callbacks_outstanding_++;
958  if (options.is_last_message()) {
959  options.set_buffer_hint();
960  }
961  if (!ctx_->sent_initial_metadata_) {
962  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
963  ctx_->initial_metadata_flags());
964  if (ctx_->compression_level_set()) {
965  write_ops_.set_compression_level(ctx_->compression_level());
966  }
967  ctx_->sent_initial_metadata_ = true;
968  }
969  // TODO(vjpai): don't assert
970  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
971  call_.PerformOps(&write_ops_);
972  }
973 
974  void WriteAndFinish(const ResponseType* resp, WriteOptions options,
975  Status s) override {
976  // Don't send any message if the status is bad
977  if (s.ok()) {
978  // TODO(vjpai): don't assert
979  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
980  }
981  Finish(std::move(s));
982  }
983 
984  void Read(RequestType* req) override {
985  callbacks_outstanding_++;
986  read_ops_.RecvMessage(req);
987  call_.PerformOps(&read_ops_);
988  }
989 
990  private:
991  friend class CallbackBidiHandler<RequestType, ResponseType>;
992 
993  ServerCallbackReaderWriterImpl(
994  ServerContext* ctx, Call* call, std::function<void()> call_requester,
996  : ctx_(ctx),
997  call_(*call),
998  call_requester_(std::move(call_requester)),
999  reactor_(reactor) {
1000  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
1001  write_tag_.Set(call_.call(),
1002  [this](bool ok) {
1003  reactor_->OnWriteDone(ok);
1004  MaybeDone();
1005  },
1006  &write_ops_);
1007  write_ops_.set_core_cq_tag(&write_tag_);
1008  read_tag_.Set(call_.call(),
1009  [this](bool ok) {
1010  reactor_->OnReadDone(ok);
1011  MaybeDone();
1012  },
1013  &read_ops_);
1014  read_ops_.set_core_cq_tag(&read_tag_);
1015  }
1016  ~ServerCallbackReaderWriterImpl() {}
1017 
1018  void MaybeDone() {
1019  if (--callbacks_outstanding_ == 0) {
1020  reactor_->OnDone();
1021  grpc_call* call = call_.call();
1022  auto call_requester = std::move(call_requester_);
1023  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
1025  call_requester();
1026  }
1027  }
1028 
1030  CallbackWithSuccessTag meta_tag_;
1033  finish_ops_;
1034  CallbackWithSuccessTag finish_tag_;
1036  CallbackWithSuccessTag write_tag_;
1038  CallbackWithSuccessTag read_tag_;
1039 
1040  ServerContext* ctx_;
1041  Call call_;
1042  std::function<void()> call_requester_;
1044  std::atomic_int callbacks_outstanding_{
1045  3}; // reserve for OnStarted, Finish, and CompletionOp
1046  };
1047 };
1048 
1049 } // namespace internal
1050 
1051 } // namespace grpc
1052 
1053 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
void OnDone() override
Definition: server_callback.h:392
virtual void OnStarted(ServerContext *context, Response *resp)
Similar to ServerBidiReactor::OnStarted, except that this also provides the response object that the ...
Definition: server_callback.h:318
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:126
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:144
virtual void grpc_call_ref(grpc_call *call)=0
Definition: server_callback.h:399
void StartWriteAndFinish(const Response *resp, WriteOptions options, Status s)
Initiate a write operation with specified options and final RPC Status, which also causes any trailin...
Definition: server_callback.h:230
void BindReactor(ServerBidiReactor< Request, Response > *reactor)
Definition: server_callback.h:165
virtual void OnSendInitialMetadataDone(bool ok)
Notifies the application that an explicit StartSendInitialMetadata operation completed.
Definition: server_callback.h:267
Definition: server_callback.h:879
void StartWrite(const Response *resp, WriteOptions options)
Initiate a write operation with specified options.
Definition: server_callback.h:213
virtual ~ServerCallbackWriter()
Definition: server_callback.h:129
Definition: server_callback.h:379
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:165
virtual void OnReadDone(bool ok)
Definition: server_callback.h:322
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:828
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:710
void StartRead(Request *req)
Definition: server_callback.h:309
virtual void OnSendInitialMetadataDone(bool ok)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:362
virtual void grpc_call_unref(grpc_call *call)=0
virtual void SendInitialMetadata(std::function< void(bool)>)=0
Definition: async_unary_call.h:304
void StartWriteAndFinish(const Response *resp, WriteOptions options, Status s)
Definition: server_callback.h:345
void OnDone() override
Definition: server_callback.h:364
Definition: grpc_types.h:40
virtual void OnWriteDone(bool ok)
Notifies the application that a StartWrite (or StartWriteLast) operation completed.
Definition: server_callback.h:280
grpc_call * call() const
Definition: call.h:70
Definition: server_callback.h:149
void StartWrite(const Response *resp)
Initiate a write operation.
Definition: server_callback.h:205
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:53
void BindReactor(ServerWriteReactor< Request, Response > *reactor)
Definition: server_callback.h:143
void StartSendInitialMetadata()
Do NOT call any operation initiation method (names that start with Start) until after the library has...
Definition: server_callback.h:192
::google::protobuf::util::Status Status
Definition: config_protobuf.h:93
void OnCancel() override
Notifies the application that this RPC has been cancelled.
Definition: server_callback.h:290
CallbackBidiHandler(std::function< experimental::ServerBidiReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:881
virtual void OnStarted(ServerContext *context, const Request *req)
Similar to ServerBidiReactor::OnStarted, except that this also provides the request object sent by th...
Definition: server_callback.h:359
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:886
void Finish(Status s)
Definition: server_callback.h:310
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
virtual void OnStarted(ServerContext *context)
Notify the application that a streaming RPC has started and that it is now ok to call any operation i...
Definition: server_callback.h:259
void StartWriteLast(const Response *resp, WriteOptions options)
Definition: server_callback.h:349
Definition: call_op_set.h:636
Definition: call_op_set.h:224
CallbackServerStreamingHandler(std::function< experimental::ServerWriteReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:705
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status) final
Definition: server_callback.h:736
void StartWrite(const Response *resp, WriteOptions options)
Definition: server_callback.h:342
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:143
void OnDone() override
Definition: server_callback.h:382
void StartRead(Request *req)
Initiate a read operation.
Definition: server_callback.h:198
Definition: call_op_set.h:294
Definition: server_callback.h:40
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void Finish(Status s)
Indicate that the stream is to be finished and the trailing metadata and RPC status are to be sent...
Definition: server_callback.h:253
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:57
void OnCancel() override
Definition: server_callback.h:324
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:51
Definition: server_callback.h:112
ReturnType * CatchingReactorCreator(Func &&func, Args &&... args)
Definition: callback_common.h:51
Definition: rpc_service_method.h:42
CallbackClientStreamingHandler(std::function< experimental::ServerReadReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:563
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:110
Per-message write options.
Definition: call_op_set.h:86
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:568
Definition: byte_buffer.h:49
void OnCancel() override
Definition: server_callback.h:365
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
virtual void OnWriteDone(bool ok)
Definition: server_callback.h:363
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
Definition: server_callback.h:127
bool ok() const
Is the status OK?
Definition: status.h:118
Base class for running an RPC handler.
Definition: rpc_service_method.h:39
Definition: server_callback.h:561
void StartWriteLast(const Response *resp, WriteOptions options)
Inform system of a planned write operation with specified options, but allow the library to schedule ...
Definition: server_callback.h:243
void OnStarted(ServerContext *, Response *) override
Similar to ServerBidiReactor::OnStarted, except that this also provides the response object that the ...
Definition: server_callback.h:383
Did it work? If it didn&#39;t, why?
Definition: status.h:31
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
void CatchingCallback(Func &&func, Args &&... args)
An exception-safe way of invoking a user-specified callback function.
Definition: callback_common.h:38
Definition: server_callback.h:61
void StartSendInitialMetadata()
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:308
virtual void OnSendInitialMetadataDone(bool ok)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:321
ServerReadReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:55
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:190
void OnStarted(ServerContext *) override
Notify the application that a streaming RPC has started and that it is now ok to call any operation i...
Definition: server_callback.h:403
virtual ~ServerCallbackReader()
Definition: server_callback.h:114
void OnDone() override
Notifies the application that all operations associated with this RPC have completed.
Definition: server_callback.h:285
void StartWrite(const Response *resp)
Definition: server_callback.h:341
void StartSendInitialMetadata()
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:340
void Finish(Status s)
Definition: server_callback.h:352
void BindReactor(ServerReadReactor< Request, Response > *reactor)
Definition: server_callback.h:121
void OnDone() override
Definition: server_callback.h:323
virtual void OnReadDone(bool ok)
Notifies the application that a StartRead operation completed.
Definition: server_callback.h:273
A sequence of bytes.
Definition: byte_buffer.h:64
void OnDone() override
Notifies the application that all operations associated with this RPC have completed.
Definition: server_callback.h:402
virtual void WriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:134
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:416
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status) final
Definition: server_callback.h:437
Straightforward wrapping of the C call object.
Definition: call.h:36
virtual void WriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:157
virtual ~ServerReactor()=default
void OnStarted(ServerContext *, const Request *) override
Similar to ServerBidiReactor::OnStarted, except that this also provides the request object sent by th...
Definition: server_callback.h:393
Definition: server_callback.h:389
virtual ~ServerCallbackReaderWriter()
Definition: server_callback.h:151
CallbackUnaryHandler(std::function< void(ServerContext *, const RequestType *, ResponseType *, experimental::ServerCallbackRpcController *)> func)
Definition: server_callback.h:411