GRPC C++  1.19.0-dev
server_callback.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
20 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
21 
22 #include <atomic>
23 #include <functional>
24 #include <type_traits>
25 
34 
35 namespace grpc {
36 
37 // Declare base class of all reactors as internal
38 namespace internal {
39 
41  public:
42  virtual ~ServerReactor() = default;
43  virtual void OnDone() {}
44  virtual void OnCancel() {}
45 };
46 
47 } // namespace internal
48 
49 namespace experimental {
50 
51 // Forward declarations
52 template <class Request, class Response>
54 template <class Request, class Response>
56 template <class Request, class Response>
58 
59 // For unary RPCs, the exposed controller class is only an interface
60 // and the actual implementation is an internal class.
62  public:
63  virtual ~ServerCallbackRpcController() = default;
64 
65  // The method handler must call this function when it is done so that
66  // the library knows to free its resources
67  virtual void Finish(Status s) = 0;
68 
69  // Allow the method handler to push out the initial metadata before
70  // the response and status are ready
71  virtual void SendInitialMetadata(std::function<void(bool)>) = 0;
72 };
73 
74 // NOTE: The actual streaming object classes are provided
75 // as API only to support mocking. There are no implementations of
76 // these class interfaces in the API.
77 template <class Request>
79  public:
80  virtual ~ServerCallbackReader() {}
81  virtual void Finish(Status s) = 0;
82  virtual void SendInitialMetadata() = 0;
83  virtual void Read(Request* msg) = 0;
84 
85  protected:
86  template <class Response>
88  reactor->BindReader(this);
89  }
90 };
91 
92 template <class Response>
94  public:
95  virtual ~ServerCallbackWriter() {}
96 
97  virtual void Finish(Status s) = 0;
98  virtual void SendInitialMetadata() = 0;
99  virtual void Write(const Response* msg, WriteOptions options) = 0;
100  virtual void WriteAndFinish(const Response* msg, WriteOptions options,
101  Status s) {
102  // Default implementation that can/should be overridden
103  Write(msg, std::move(options));
104  Finish(std::move(s));
105  };
106 
107  protected:
108  template <class Request>
110  reactor->BindWriter(this);
111  }
112 };
113 
114 template <class Request, class Response>
116  public:
118 
119  virtual void Finish(Status s) = 0;
120  virtual void SendInitialMetadata() = 0;
121  virtual void Read(Request* msg) = 0;
122  virtual void Write(const Response* msg, WriteOptions options) = 0;
123  virtual void WriteAndFinish(const Response* msg, WriteOptions options,
124  Status s) {
125  // Default implementation that can/should be overridden
126  Write(msg, std::move(options));
127  Finish(std::move(s));
128  };
129 
130  protected:
132  reactor->BindStream(this);
133  }
134 };
135 
136 // The following classes are reactors that are to be implemented
137 // by the user, returned as the result of the method handler for
138 // a callback method, and activated by the call to OnStarted
139 template <class Request, class Response>
141  public:
142  ~ServerBidiReactor() = default;
143  virtual void OnStarted(ServerContext*) {}
144  virtual void OnSendInitialMetadataDone(bool ok) {}
145  virtual void OnReadDone(bool ok) {}
146  virtual void OnWriteDone(bool ok) {}
147 
148  void StartSendInitialMetadata() { stream_->SendInitialMetadata(); }
149  void StartRead(Request* msg) { stream_->Read(msg); }
150  void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); }
151  void StartWrite(const Response* msg, WriteOptions options) {
152  stream_->Write(msg, std::move(options));
153  }
154  void StartWriteAndFinish(const Response* msg, WriteOptions options,
155  Status s) {
156  stream_->WriteAndFinish(msg, std::move(options), std::move(s));
157  }
158  void StartWriteLast(const Response* msg, WriteOptions options) {
159  StartWrite(msg, std::move(options.set_last_message()));
160  }
161  void Finish(Status s) { stream_->Finish(std::move(s)); }
162 
163  private:
164  friend class ServerCallbackReaderWriter<Request, Response>;
165  void BindStream(ServerCallbackReaderWriter<Request, Response>* stream) {
166  stream_ = stream;
167  }
168 
170 };
171 
172 template <class Request, class Response>
174  public:
175  ~ServerReadReactor() = default;
176  virtual void OnStarted(ServerContext*, Response* resp) {}
177  virtual void OnSendInitialMetadataDone(bool ok) {}
178  virtual void OnReadDone(bool ok) {}
179 
180  void StartSendInitialMetadata() { reader_->SendInitialMetadata(); }
181  void StartRead(Request* msg) { reader_->Read(msg); }
182  void Finish(Status s) { reader_->Finish(std::move(s)); }
183 
184  private:
185  friend class ServerCallbackReader<Request>;
186  void BindReader(ServerCallbackReader<Request>* reader) { reader_ = reader; }
187 
189 };
190 
191 template <class Request, class Response>
193  public:
194  ~ServerWriteReactor() = default;
195  virtual void OnStarted(ServerContext*, const Request* req) {}
196  virtual void OnSendInitialMetadataDone(bool ok) {}
197  virtual void OnWriteDone(bool ok) {}
198 
199  void StartSendInitialMetadata() { writer_->SendInitialMetadata(); }
200  void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); }
201  void StartWrite(const Response* msg, WriteOptions options) {
202  writer_->Write(msg, std::move(options));
203  }
204  void StartWriteAndFinish(const Response* msg, WriteOptions options,
205  Status s) {
206  writer_->WriteAndFinish(msg, std::move(options), std::move(s));
207  }
208  void StartWriteLast(const Response* msg, WriteOptions options) {
209  StartWrite(msg, std::move(options.set_last_message()));
210  }
211  void Finish(Status s) { writer_->Finish(std::move(s)); }
212 
213  private:
214  friend class ServerCallbackWriter<Response>;
215  void BindWriter(ServerCallbackWriter<Response>* writer) { writer_ = writer; }
216 
218 };
219 
220 } // namespace experimental
221 
222 namespace internal {
223 
224 template <class Request, class Response>
226  : public experimental::ServerReadReactor<Request, Response> {
227  public:
228  void OnDone() override { delete this; }
229  void OnStarted(ServerContext*, Response*) override {
230  this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
231  }
232 };
233 
234 template <class Request, class Response>
236  : public experimental::ServerWriteReactor<Request, Response> {
237  public:
238  void OnDone() override { delete this; }
239  void OnStarted(ServerContext*, const Request*) override {
240  this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
241  }
242 };
243 
244 template <class Request, class Response>
246  : public experimental::ServerBidiReactor<Request, Response> {
247  public:
248  void OnDone() override { delete this; }
249  void OnStarted(ServerContext*) override {
250  this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
251  }
252 };
253 
254 template <class RequestType, class ResponseType>
255 class CallbackUnaryHandler : public MethodHandler {
256  public:
258  std::function<void(ServerContext*, const RequestType*, ResponseType*,
260  func)
261  : func_(func) {}
262  void RunHandler(const HandlerParameter& param) final {
263  // Arena allocate a controller structure (that includes request/response)
264  g_core_codegen_interface->grpc_call_ref(param.call->call());
265  auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc(
266  param.call->call(), sizeof(ServerCallbackRpcControllerImpl)))
267  ServerCallbackRpcControllerImpl(
268  param.server_context, param.call,
269  static_cast<RequestType*>(param.request),
270  std::move(param.call_requester));
271  Status status = param.status;
272 
273  if (status.ok()) {
274  // Call the actual function handler and expect the user to call finish
275  CatchingCallback(func_, param.server_context, controller->request(),
276  controller->response(), controller);
277  } else {
278  // if deserialization failed, we need to fail the call
279  controller->Finish(status);
280  }
281  }
282 
284  Status* status) final {
285  ByteBuffer buf;
286  buf.set_buffer(req);
287  auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
288  call, sizeof(RequestType))) RequestType();
289  *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
290  buf.Release();
291  if (status->ok()) {
292  return request;
293  }
294  request->~RequestType();
295  return nullptr;
296  }
297 
298  private:
299  std::function<void(ServerContext*, const RequestType*, ResponseType*,
301  func_;
302 
303  // The implementation class of ServerCallbackRpcController is a private member
304  // of CallbackUnaryHandler since it is never exposed anywhere, and this allows
305  // it to take advantage of CallbackUnaryHandler's friendships.
306  class ServerCallbackRpcControllerImpl
308  public:
309  void Finish(Status s) override {
310  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
311  &finish_ops_);
312  if (!ctx_->sent_initial_metadata_) {
313  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
314  ctx_->initial_metadata_flags());
315  if (ctx_->compression_level_set()) {
316  finish_ops_.set_compression_level(ctx_->compression_level());
317  }
318  ctx_->sent_initial_metadata_ = true;
319  }
320  // The response is dropped if the status is not OK.
321  if (s.ok()) {
322  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
323  finish_ops_.SendMessagePtr(&resp_));
324  } else {
325  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
326  }
327  finish_ops_.set_core_cq_tag(&finish_tag_);
328  call_.PerformOps(&finish_ops_);
329  }
330 
331  void SendInitialMetadata(std::function<void(bool)> f) override {
332  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
333  callbacks_outstanding_++;
334  // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14
335  // and if performance of this operation matters
336  meta_tag_.Set(call_.call(),
337  [this, f](bool ok) {
338  f(ok);
339  MaybeDone();
340  },
341  &meta_ops_);
342  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
343  ctx_->initial_metadata_flags());
344  if (ctx_->compression_level_set()) {
345  meta_ops_.set_compression_level(ctx_->compression_level());
346  }
347  ctx_->sent_initial_metadata_ = true;
348  meta_ops_.set_core_cq_tag(&meta_tag_);
349  call_.PerformOps(&meta_ops_);
350  }
351 
352  private:
353  friend class CallbackUnaryHandler<RequestType, ResponseType>;
354 
355  ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call,
356  const RequestType* req,
357  std::function<void()> call_requester)
358  : ctx_(ctx),
359  call_(*call),
360  req_(req),
361  call_requester_(std::move(call_requester)) {
362  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr);
363  }
364 
365  ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); }
366 
367  const RequestType* request() { return req_; }
368  ResponseType* response() { return &resp_; }
369 
370  void MaybeDone() {
371  if (--callbacks_outstanding_ == 0) {
372  grpc_call* call = call_.call();
373  auto call_requester = std::move(call_requester_);
374  this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor
376  call_requester();
377  }
378  }
379 
381  CallbackWithSuccessTag meta_tag_;
384  finish_ops_;
385  CallbackWithSuccessTag finish_tag_;
386 
387  ServerContext* ctx_;
388  Call call_;
389  const RequestType* req_;
390  ResponseType resp_;
391  std::function<void()> call_requester_;
392  std::atomic_int callbacks_outstanding_{
393  2}; // reserve for Finish and CompletionOp
394  };
395 };
396 
397 template <class RequestType, class ResponseType>
399  public:
401  std::function<
403  func)
404  : func_(std::move(func)) {}
405  void RunHandler(const HandlerParameter& param) final {
406  // Arena allocate a reader structure (that includes response)
407  g_core_codegen_interface->grpc_call_ref(param.call->call());
408 
410  param.status.ok()
413  func_)
414  : nullptr;
415 
416  if (reactor == nullptr) {
417  // if deserialization or reactor creator failed, we need to fail the call
419  }
420 
421  auto* reader = new (g_core_codegen_interface->grpc_call_arena_alloc(
422  param.call->call(), sizeof(ServerCallbackReaderImpl)))
423  ServerCallbackReaderImpl(param.server_context, param.call,
424  std::move(param.call_requester), reactor);
425 
426  reader->BindReactor(reactor);
427  reactor->OnStarted(param.server_context, reader->response());
428  reader->MaybeDone();
429  }
430 
431  private:
432  std::function<experimental::ServerReadReactor<RequestType, ResponseType>*()>
433  func_;
434 
435  class ServerCallbackReaderImpl
436  : public experimental::ServerCallbackReader<RequestType> {
437  public:
438  void Finish(Status s) override {
439  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
440  &finish_ops_);
441  if (!ctx_->sent_initial_metadata_) {
442  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
443  ctx_->initial_metadata_flags());
444  if (ctx_->compression_level_set()) {
445  finish_ops_.set_compression_level(ctx_->compression_level());
446  }
447  ctx_->sent_initial_metadata_ = true;
448  }
449  // The response is dropped if the status is not OK.
450  if (s.ok()) {
451  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
452  finish_ops_.SendMessagePtr(&resp_));
453  } else {
454  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
455  }
456  finish_ops_.set_core_cq_tag(&finish_tag_);
457  call_.PerformOps(&finish_ops_);
458  }
459 
460  void SendInitialMetadata() override {
461  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
462  callbacks_outstanding_++;
463  meta_tag_.Set(call_.call(),
464  [this](bool ok) {
465  reactor_->OnSendInitialMetadataDone(ok);
466  MaybeDone();
467  },
468  &meta_ops_);
469  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
470  ctx_->initial_metadata_flags());
471  if (ctx_->compression_level_set()) {
472  meta_ops_.set_compression_level(ctx_->compression_level());
473  }
474  ctx_->sent_initial_metadata_ = true;
475  meta_ops_.set_core_cq_tag(&meta_tag_);
476  call_.PerformOps(&meta_ops_);
477  }
478 
479  void Read(RequestType* req) override {
480  callbacks_outstanding_++;
481  read_ops_.RecvMessage(req);
482  call_.PerformOps(&read_ops_);
483  }
484 
485  private:
486  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
487 
488  ServerCallbackReaderImpl(
489  ServerContext* ctx, Call* call, std::function<void()> call_requester,
491  : ctx_(ctx),
492  call_(*call),
493  call_requester_(std::move(call_requester)),
494  reactor_(reactor) {
495  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
496  read_tag_.Set(call_.call(),
497  [this](bool ok) {
498  reactor_->OnReadDone(ok);
499  MaybeDone();
500  },
501  &read_ops_);
502  read_ops_.set_core_cq_tag(&read_tag_);
503  }
504 
505  ~ServerCallbackReaderImpl() {}
506 
507  ResponseType* response() { return &resp_; }
508 
509  void MaybeDone() {
510  if (--callbacks_outstanding_ == 0) {
511  reactor_->OnDone();
512  grpc_call* call = call_.call();
513  auto call_requester = std::move(call_requester_);
514  this->~ServerCallbackReaderImpl(); // explicitly call destructor
516  call_requester();
517  }
518  }
519 
521  CallbackWithSuccessTag meta_tag_;
524  finish_ops_;
525  CallbackWithSuccessTag finish_tag_;
527  CallbackWithSuccessTag read_tag_;
528 
529  ServerContext* ctx_;
530  Call call_;
531  ResponseType resp_;
532  std::function<void()> call_requester_;
534  std::atomic_int callbacks_outstanding_{
535  3}; // reserve for OnStarted, Finish, and CompletionOp
536  };
537 };
538 
539 template <class RequestType, class ResponseType>
541  public:
543  std::function<
545  func)
546  : func_(std::move(func)) {}
547  void RunHandler(const HandlerParameter& param) final {
548  // Arena allocate a writer structure
549  g_core_codegen_interface->grpc_call_ref(param.call->call());
550 
552  param.status.ok()
555  func_)
556  : nullptr;
557 
558  if (reactor == nullptr) {
559  // if deserialization or reactor creator failed, we need to fail the call
561  }
562 
563  auto* writer = new (g_core_codegen_interface->grpc_call_arena_alloc(
564  param.call->call(), sizeof(ServerCallbackWriterImpl)))
565  ServerCallbackWriterImpl(param.server_context, param.call,
566  static_cast<RequestType*>(param.request),
567  std::move(param.call_requester), reactor);
568  writer->BindReactor(reactor);
569  reactor->OnStarted(param.server_context, writer->request());
570  writer->MaybeDone();
571  }
572 
574  Status* status) final {
575  ByteBuffer buf;
576  buf.set_buffer(req);
577  auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
578  call, sizeof(RequestType))) RequestType();
579  *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
580  buf.Release();
581  if (status->ok()) {
582  return request;
583  }
584  request->~RequestType();
585  return nullptr;
586  }
587 
588  private:
589  std::function<experimental::ServerWriteReactor<RequestType, ResponseType>*()>
590  func_;
591 
592  class ServerCallbackWriterImpl
593  : public experimental::ServerCallbackWriter<ResponseType> {
594  public:
595  void Finish(Status s) override {
596  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
597  &finish_ops_);
598  finish_ops_.set_core_cq_tag(&finish_tag_);
599 
600  if (!ctx_->sent_initial_metadata_) {
601  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
602  ctx_->initial_metadata_flags());
603  if (ctx_->compression_level_set()) {
604  finish_ops_.set_compression_level(ctx_->compression_level());
605  }
606  ctx_->sent_initial_metadata_ = true;
607  }
608  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
609  call_.PerformOps(&finish_ops_);
610  }
611 
612  void SendInitialMetadata() override {
613  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
614  callbacks_outstanding_++;
615  meta_tag_.Set(call_.call(),
616  [this](bool ok) {
617  reactor_->OnSendInitialMetadataDone(ok);
618  MaybeDone();
619  },
620  &meta_ops_);
621  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
622  ctx_->initial_metadata_flags());
623  if (ctx_->compression_level_set()) {
624  meta_ops_.set_compression_level(ctx_->compression_level());
625  }
626  ctx_->sent_initial_metadata_ = true;
627  meta_ops_.set_core_cq_tag(&meta_tag_);
628  call_.PerformOps(&meta_ops_);
629  }
630 
631  void Write(const ResponseType* resp, WriteOptions options) override {
632  callbacks_outstanding_++;
633  if (options.is_last_message()) {
634  options.set_buffer_hint();
635  }
636  if (!ctx_->sent_initial_metadata_) {
637  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
638  ctx_->initial_metadata_flags());
639  if (ctx_->compression_level_set()) {
640  write_ops_.set_compression_level(ctx_->compression_level());
641  }
642  ctx_->sent_initial_metadata_ = true;
643  }
644  // TODO(vjpai): don't assert
645  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
646  call_.PerformOps(&write_ops_);
647  }
648 
649  void WriteAndFinish(const ResponseType* resp, WriteOptions options,
650  Status s) override {
651  // This combines the write into the finish callback
652  // Don't send any message if the status is bad
653  if (s.ok()) {
654  // TODO(vjpai): don't assert
655  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
656  }
657  Finish(std::move(s));
658  }
659 
660  private:
661  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
662 
663  ServerCallbackWriterImpl(
664  ServerContext* ctx, Call* call, const RequestType* req,
665  std::function<void()> call_requester,
667  : ctx_(ctx),
668  call_(*call),
669  req_(req),
670  call_requester_(std::move(call_requester)),
671  reactor_(reactor) {
672  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
673  write_tag_.Set(call_.call(),
674  [this](bool ok) {
675  reactor_->OnWriteDone(ok);
676  MaybeDone();
677  },
678  &write_ops_);
679  write_ops_.set_core_cq_tag(&write_tag_);
680  }
681  ~ServerCallbackWriterImpl() { req_->~RequestType(); }
682 
683  const RequestType* request() { return req_; }
684 
685  void MaybeDone() {
686  if (--callbacks_outstanding_ == 0) {
687  reactor_->OnDone();
688  grpc_call* call = call_.call();
689  auto call_requester = std::move(call_requester_);
690  this->~ServerCallbackWriterImpl(); // explicitly call destructor
692  call_requester();
693  }
694  }
695 
697  CallbackWithSuccessTag meta_tag_;
700  finish_ops_;
701  CallbackWithSuccessTag finish_tag_;
703  CallbackWithSuccessTag write_tag_;
704 
705  ServerContext* ctx_;
706  Call call_;
707  const RequestType* req_;
708  std::function<void()> call_requester_;
710  std::atomic_int callbacks_outstanding_{
711  3}; // reserve for OnStarted, Finish, and CompletionOp
712  };
713 };
714 
715 template <class RequestType, class ResponseType>
717  public:
719  std::function<
721  func)
722  : func_(std::move(func)) {}
723  void RunHandler(const HandlerParameter& param) final {
724  g_core_codegen_interface->grpc_call_ref(param.call->call());
725 
727  param.status.ok()
730  func_)
731  : nullptr;
732 
733  if (reactor == nullptr) {
734  // if deserialization or reactor creator failed, we need to fail the call
736  }
737 
738  auto* stream = new (g_core_codegen_interface->grpc_call_arena_alloc(
739  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
740  ServerCallbackReaderWriterImpl(param.server_context, param.call,
741  std::move(param.call_requester),
742  reactor);
743 
744  stream->BindReactor(reactor);
745  reactor->OnStarted(param.server_context);
746  stream->MaybeDone();
747  }
748 
749  private:
750  std::function<experimental::ServerBidiReactor<RequestType, ResponseType>*()>
751  func_;
752 
753  class ServerCallbackReaderWriterImpl
754  : public experimental::ServerCallbackReaderWriter<RequestType,
755  ResponseType> {
756  public:
757  void Finish(Status s) override {
758  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
759  &finish_ops_);
760  finish_ops_.set_core_cq_tag(&finish_tag_);
761 
762  if (!ctx_->sent_initial_metadata_) {
763  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
764  ctx_->initial_metadata_flags());
765  if (ctx_->compression_level_set()) {
766  finish_ops_.set_compression_level(ctx_->compression_level());
767  }
768  ctx_->sent_initial_metadata_ = true;
769  }
770  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
771  call_.PerformOps(&finish_ops_);
772  }
773 
774  void SendInitialMetadata() override {
775  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
776  callbacks_outstanding_++;
777  meta_tag_.Set(call_.call(),
778  [this](bool ok) {
779  reactor_->OnSendInitialMetadataDone(ok);
780  MaybeDone();
781  },
782  &meta_ops_);
783  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
784  ctx_->initial_metadata_flags());
785  if (ctx_->compression_level_set()) {
786  meta_ops_.set_compression_level(ctx_->compression_level());
787  }
788  ctx_->sent_initial_metadata_ = true;
789  meta_ops_.set_core_cq_tag(&meta_tag_);
790  call_.PerformOps(&meta_ops_);
791  }
792 
793  void Write(const ResponseType* resp, WriteOptions options) override {
794  callbacks_outstanding_++;
795  if (options.is_last_message()) {
796  options.set_buffer_hint();
797  }
798  if (!ctx_->sent_initial_metadata_) {
799  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
800  ctx_->initial_metadata_flags());
801  if (ctx_->compression_level_set()) {
802  write_ops_.set_compression_level(ctx_->compression_level());
803  }
804  ctx_->sent_initial_metadata_ = true;
805  }
806  // TODO(vjpai): don't assert
807  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
808  call_.PerformOps(&write_ops_);
809  }
810 
811  void WriteAndFinish(const ResponseType* resp, WriteOptions options,
812  Status s) override {
813  // Don't send any message if the status is bad
814  if (s.ok()) {
815  // TODO(vjpai): don't assert
816  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
817  }
818  Finish(std::move(s));
819  }
820 
821  void Read(RequestType* req) override {
822  callbacks_outstanding_++;
823  read_ops_.RecvMessage(req);
824  call_.PerformOps(&read_ops_);
825  }
826 
827  private:
828  friend class CallbackBidiHandler<RequestType, ResponseType>;
829 
830  ServerCallbackReaderWriterImpl(
831  ServerContext* ctx, Call* call, std::function<void()> call_requester,
833  : ctx_(ctx),
834  call_(*call),
835  call_requester_(std::move(call_requester)),
836  reactor_(reactor) {
837  ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
838  write_tag_.Set(call_.call(),
839  [this](bool ok) {
840  reactor_->OnWriteDone(ok);
841  MaybeDone();
842  },
843  &write_ops_);
844  write_ops_.set_core_cq_tag(&write_tag_);
845  read_tag_.Set(call_.call(),
846  [this](bool ok) {
847  reactor_->OnReadDone(ok);
848  MaybeDone();
849  },
850  &read_ops_);
851  read_ops_.set_core_cq_tag(&read_tag_);
852  }
853  ~ServerCallbackReaderWriterImpl() {}
854 
855  void MaybeDone() {
856  if (--callbacks_outstanding_ == 0) {
857  reactor_->OnDone();
858  grpc_call* call = call_.call();
859  auto call_requester = std::move(call_requester_);
860  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
862  call_requester();
863  }
864  }
865 
867  CallbackWithSuccessTag meta_tag_;
870  finish_ops_;
871  CallbackWithSuccessTag finish_tag_;
873  CallbackWithSuccessTag write_tag_;
875  CallbackWithSuccessTag read_tag_;
876 
877  ServerContext* ctx_;
878  Call call_;
879  std::function<void()> call_requester_;
881  std::atomic_int callbacks_outstanding_{
882  3}; // reserve for OnStarted, Finish, and CompletionOp
883  };
884 };
885 
886 } // namespace internal
887 
888 } // namespace grpc
889 
890 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
void OnDone() override
Definition: server_callback.h:238
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
void StartRead(Request *msg)
Definition: server_callback.h:149
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:141
virtual void grpc_call_ref(grpc_call *call)=0
Definition: server_callback.h:245
void StartWriteLast(const Response *msg, WriteOptions options)
Definition: server_callback.h:158
void BindReactor(ServerBidiReactor< Request, Response > *reactor)
Definition: server_callback.h:131
virtual void OnSendInitialMetadataDone(bool ok)
Definition: server_callback.h:144
Definition: server_callback.h:716
virtual ~ServerCallbackWriter()
Definition: server_callback.h:95
Definition: server_callback.h:225
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
virtual void OnReadDone(bool ok)
Definition: server_callback.h:178
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:827
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:547
virtual void OnSendInitialMetadataDone(bool ok)
Definition: server_callback.h:196
virtual void grpc_call_unref(grpc_call *call)=0
virtual void SendInitialMetadata(std::function< void(bool)>)=0
Definition: async_unary_call.h:304
void StartWrite(const Response *msg)
Definition: server_callback.h:150
Definition: grpc_types.h:40
virtual void OnStarted(ServerContext *, const Request *req)
Definition: server_callback.h:195
virtual void OnWriteDone(bool ok)
Definition: server_callback.h:146
grpc_call * call() const
Definition: call.h:70
Definition: server_callback.h:115
Definition: server_callback.h:53
void StartWrite(const Response *msg)
Definition: server_callback.h:200
void BindReactor(ServerWriteReactor< Request, Response > *reactor)
Definition: server_callback.h:109
void StartSendInitialMetadata()
Definition: server_callback.h:148
void StartWriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:204
::google::protobuf::util::Status Status
Definition: config_protobuf.h:93
CallbackBidiHandler(std::function< experimental::ServerBidiReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:718
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:723
void Finish(Status s)
Definition: server_callback.h:182
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
Definition: call_op_set.h:635
Definition: call_op_set.h:223
CallbackServerStreamingHandler(std::function< experimental::ServerWriteReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:542
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status) final
Definition: server_callback.h:573
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:134
virtual void OnDone()
Definition: server_callback.h:43
void OnDone() override
Definition: server_callback.h:228
Definition: call_op_set.h:293
Definition: server_callback.h:40
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void Finish(Status s)
Definition: server_callback.h:161
Definition: server_callback.h:57
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:50
void StartWriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:154
Definition: server_callback.h:78
void StartRead(Request *msg)
Definition: server_callback.h:181
ReturnType * CatchingReactorCreator(Func &&func, Args &&... args)
Definition: callback_common.h:51
Definition: rpc_service_method.h:42
CallbackClientStreamingHandler(std::function< experimental::ServerReadReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:400
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:109
Per-message write options.
Definition: call_op_set.h:85
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:405
Definition: byte_buffer.h:49
void StartWrite(const Response *msg, WriteOptions options)
Definition: server_callback.h:151
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
virtual void OnWriteDone(bool ok)
Definition: server_callback.h:197
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
Definition: server_callback.h:93
bool ok() const
Is the status OK?
Definition: status.h:118
virtual void OnStarted(ServerContext *)
Definition: server_callback.h:143
Base class for running an RPC handler.
Definition: rpc_service_method.h:39
Definition: server_callback.h:398
void OnStarted(ServerContext *, Response *) override
Definition: server_callback.h:229
Did it work? If it didn&#39;t, why?
Definition: status.h:31
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
void CatchingCallback(Func &&func, Args &&... args)
An exception-safe way of invoking a user-specified callback function.
Definition: callback_common.h:38
void StartWriteLast(const Response *msg, WriteOptions options)
Definition: server_callback.h:208
Definition: server_callback.h:61
void StartSendInitialMetadata()
Definition: server_callback.h:180
virtual void OnStarted(ServerContext *, Response *resp)
Definition: server_callback.h:176
virtual void OnSendInitialMetadataDone(bool ok)
Definition: server_callback.h:177
Definition: server_callback.h:55
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
void OnStarted(ServerContext *) override
Definition: server_callback.h:249
virtual ~ServerCallbackReader()
Definition: server_callback.h:80
void StartSendInitialMetadata()
Definition: server_callback.h:199
void Finish(Status s)
Definition: server_callback.h:211
void BindReactor(ServerReadReactor< Request, Response > *reactor)
Definition: server_callback.h:87
virtual void OnReadDone(bool ok)
Definition: server_callback.h:145
A sequence of bytes.
Definition: byte_buffer.h:64
void OnDone() override
Definition: server_callback.h:248
virtual void WriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:100
void StartWrite(const Response *msg, WriteOptions options)
Definition: server_callback.h:201
void RunHandler(const HandlerParameter &param) final
Definition: server_callback.h:262
virtual void OnCancel()
Definition: server_callback.h:44
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status) final
Definition: server_callback.h:283
Straightforward wrapping of the C call object.
Definition: call.h:36
virtual void WriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:123
virtual ~ServerReactor()=default
void OnStarted(ServerContext *, const Request *) override
Definition: server_callback.h:239
Definition: server_callback.h:235
virtual ~ServerCallbackReaderWriter()
Definition: server_callback.h:117
CallbackUnaryHandler(std::function< void(ServerContext *, const RequestType *, ResponseType *, experimental::ServerCallbackRpcController *)> func)
Definition: server_callback.h:257