GRPC C++  1.36.1
server_callback_handlers.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
20 
26 
27 namespace grpc {
28 namespace internal {
29 
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
32  public:
35  const RequestType*, ResponseType*)>
36  get_reactor)
37  : get_reactor_(std::move(get_reactor)) {}
38 
41  allocator) {
42  allocator_ = allocator;
43  }
44 
45  void RunHandler(const HandlerParameter& param) final {
46  // Arena allocate a controller structure (that includes request/response)
48  auto* allocator_state = static_cast<
50  param.internal_data);
51 
53  param.call->call(), sizeof(ServerCallbackUnaryImpl)))
54  ServerCallbackUnaryImpl(
55  static_cast<::grpc::CallbackServerContext*>(param.server_context),
56  param.call, allocator_state, param.call_requester);
57  param.server_context->BeginCompletionOp(
58  param.call, [call](bool) { call->MaybeDone(); }, call);
59 
60  ServerUnaryReactor* reactor = nullptr;
61  if (param.status.ok()) {
62  reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
63  get_reactor_,
64  static_cast<::grpc::CallbackServerContext*>(param.server_context),
65  call->request(), call->response());
66  }
67 
68  if (reactor == nullptr) {
69  // if deserialization or reactor creator failed, we need to fail the call
71  param.call->call(), sizeof(UnimplementedUnaryReactor)))
74  }
75 
77  call->SetupReactor(reactor);
78  }
79 
81  ::grpc::Status* status, void** handler_data) final {
83  buf.set_buffer(req);
84  RequestType* request = nullptr;
86  allocator_state = nullptr;
87  if (allocator_ != nullptr) {
88  allocator_state = allocator_->AllocateMessages();
89  } else {
90  allocator_state =
94  }
95  *handler_data = allocator_state;
96  request = allocator_state->request();
97  *status =
99  buf.Release();
100  if (status->ok()) {
101  return request;
102  }
103  // Clean up on deserialization failure.
104  allocator_state->Release();
105  return nullptr;
106  }
107 
108  private:
110  const RequestType*, ResponseType*)>
111  get_reactor_;
113  allocator_ = nullptr;
114 
115  class ServerCallbackUnaryImpl : public ServerCallbackUnary {
116  public:
117  void Finish(::grpc::Status s) override {
118  // A callback that only contains a call to MaybeDone can be run as an
119  // inline callback regardless of whether or not OnDone is inlineable
120  // because if the actual OnDone callback needs to be scheduled, MaybeDone
121  // is responsible for dispatching to an executor thread if needed. Thus,
122  // when setting up the finish_tag_, we can set its own callback to
123  // inlineable.
124  finish_tag_.Set(
125  call_.call(),
126  [this](bool) {
127  this->MaybeDone(
128  reactor_.load(std::memory_order_relaxed)->InternalInlineable());
129  },
130  &finish_ops_, /*can_inline=*/true);
131  finish_ops_.set_core_cq_tag(&finish_tag_);
132 
133  if (!ctx_->sent_initial_metadata_) {
134  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
135  ctx_->initial_metadata_flags());
136  if (ctx_->compression_level_set()) {
137  finish_ops_.set_compression_level(ctx_->compression_level());
138  }
139  ctx_->sent_initial_metadata_ = true;
140  }
141  // The response is dropped if the status is not OK.
142  if (s.ok()) {
143  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
144  finish_ops_.SendMessagePtr(response()));
145  } else {
146  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
147  }
148  finish_ops_.set_core_cq_tag(&finish_tag_);
149  call_.PerformOps(&finish_ops_);
150  }
151 
152  void SendInitialMetadata() override {
153  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
154  this->Ref();
155  // The callback for this function should not be marked inline because it
156  // is directly invoking a user-controlled reaction
157  // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
158  // thread. However, any OnDone needed after that can be inlined because it
159  // is already running on an executor thread.
160  meta_tag_.Set(
161  call_.call(),
162  [this](bool ok) {
163  ServerUnaryReactor* reactor =
164  reactor_.load(std::memory_order_relaxed);
165  reactor->OnSendInitialMetadataDone(ok);
166  this->MaybeDone(/*inlineable_ondone=*/true);
167  },
168  &meta_ops_, /*can_inline=*/false);
169  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
170  ctx_->initial_metadata_flags());
171  if (ctx_->compression_level_set()) {
172  meta_ops_.set_compression_level(ctx_->compression_level());
173  }
174  ctx_->sent_initial_metadata_ = true;
175  meta_ops_.set_core_cq_tag(&meta_tag_);
176  call_.PerformOps(&meta_ops_);
177  }
178 
179  private:
180  friend class CallbackUnaryHandler<RequestType, ResponseType>;
181 
182  ServerCallbackUnaryImpl(
185  allocator_state,
186  std::function<void()> call_requester)
187  : ctx_(ctx),
188  call_(*call),
189  allocator_state_(allocator_state),
190  call_requester_(std::move(call_requester)) {
191  ctx_->set_message_allocator_state(allocator_state);
192  }
193 
198  void SetupReactor(ServerUnaryReactor* reactor) {
199  reactor_.store(reactor, std::memory_order_relaxed);
200  this->BindReactor(reactor);
201  this->MaybeCallOnCancel(reactor);
202  this->MaybeDone(reactor->InternalInlineable());
203  }
204 
205  const RequestType* request() { return allocator_state_->request(); }
206  ResponseType* response() { return allocator_state_->response(); }
207 
208  void CallOnDone() override {
209  reactor_.load(std::memory_order_relaxed)->OnDone();
210  grpc_call* call = call_.call();
211  auto call_requester = std::move(call_requester_);
212  allocator_state_->Release();
213  if (ctx_->context_allocator() != nullptr) {
214  ctx_->context_allocator()->Release(ctx_);
215  }
216  this->~ServerCallbackUnaryImpl(); // explicitly call destructor
218  call_requester();
219  }
220 
221  ServerReactor* reactor() override {
222  return reactor_.load(std::memory_order_relaxed);
223  }
224 
226  meta_ops_;
231  finish_ops_;
233 
234  ::grpc::CallbackServerContext* const ctx_;
237  allocator_state_;
238  std::function<void()> call_requester_;
239  // reactor_ can always be loaded/stored with relaxed memory ordering because
240  // its value is only set once, independently of other data in the object,
241  // and the loads that use it will always actually come provably later even
242  // though they are from different threads since they are triggered by
243  // actions initiated only by the setting up of the reactor_ variable. In
244  // a sense, it's a delayed "const": it gets its value from the SetupReactor
245  // method (not the constructor, so it's not a true const), but it doesn't
246  // change after that and it only gets used by actions caused, directly or
247  // indirectly, by that setup. This comment also applies to the reactor_
248  // variables of the other streaming objects in this file.
249  std::atomic<ServerUnaryReactor*> reactor_;
250  // callbacks_outstanding_ follows a refcount pattern
251  std::atomic<intptr_t> callbacks_outstanding_{
252  3}; // reserve for start, Finish, and CompletionOp
253  };
254 };
255 
256 template <class RequestType, class ResponseType>
257 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
258  public:
260  std::function<ServerReadReactor<RequestType>*(
261  ::grpc::CallbackServerContext*, ResponseType*)>
262  get_reactor)
263  : get_reactor_(std::move(get_reactor)) {}
264  void RunHandler(const HandlerParameter& param) final {
265  // Arena allocate a reader structure (that includes response)
266  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
267 
269  param.call->call(), sizeof(ServerCallbackReaderImpl)))
270  ServerCallbackReaderImpl(
271  static_cast<::grpc::CallbackServerContext*>(param.server_context),
272  param.call, param.call_requester);
273  // Inlineable OnDone can be false in the CompletionOp callback because there
274  // is no read reactor that has an inlineable OnDone; this only applies to
275  // the DefaultReactor (which is unary).
276  param.server_context->BeginCompletionOp(
277  param.call,
278  [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
279  reader);
280 
281  ServerReadReactor<RequestType>* reactor = nullptr;
282  if (param.status.ok()) {
285  get_reactor_,
286  static_cast<::grpc::CallbackServerContext*>(param.server_context),
287  reader->response());
288  }
289 
290  if (reactor == nullptr) {
291  // if deserialization or reactor creator failed, we need to fail the call
293  param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
296  }
297 
298  reader->SetupReactor(reactor);
299  }
300 
301  private:
302  std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*,
303  ResponseType*)>
304  get_reactor_;
305 
306  class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
307  public:
308  void Finish(::grpc::Status s) override {
309  // A finish tag with only MaybeDone can have its callback inlined
310  // regardless even if OnDone is not inlineable because this callback just
311  // checks a ref and then decides whether or not to dispatch OnDone.
312  finish_tag_.Set(
313  call_.call(),
314  [this](bool) {
315  // Inlineable OnDone can be false here because there is
316  // no read reactor that has an inlineable OnDone; this
317  // only applies to the DefaultReactor (which is unary).
318  this->MaybeDone(/*inlineable_ondone=*/false);
319  },
320  &finish_ops_, /*can_inline=*/true);
321  if (!ctx_->sent_initial_metadata_) {
322  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
323  ctx_->initial_metadata_flags());
324  if (ctx_->compression_level_set()) {
325  finish_ops_.set_compression_level(ctx_->compression_level());
326  }
327  ctx_->sent_initial_metadata_ = true;
328  }
329  // The response is dropped if the status is not OK.
330  if (s.ok()) {
331  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
332  finish_ops_.SendMessagePtr(&resp_));
333  } else {
334  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
335  }
336  finish_ops_.set_core_cq_tag(&finish_tag_);
337  call_.PerformOps(&finish_ops_);
338  }
339 
340  void SendInitialMetadata() override {
341  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
342  this->Ref();
343  // The callback for this function should not be inlined because it invokes
344  // a user-controlled reaction, but any resulting OnDone can be inlined in
345  // the executor to which this callback is dispatched.
346  meta_tag_.Set(
347  call_.call(),
348  [this](bool ok) {
349  ServerReadReactor<RequestType>* reactor =
350  reactor_.load(std::memory_order_relaxed);
351  reactor->OnSendInitialMetadataDone(ok);
352  this->MaybeDone(/*inlineable_ondone=*/true);
353  },
354  &meta_ops_, /*can_inline=*/false);
355  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
356  ctx_->initial_metadata_flags());
357  if (ctx_->compression_level_set()) {
358  meta_ops_.set_compression_level(ctx_->compression_level());
359  }
360  ctx_->sent_initial_metadata_ = true;
361  meta_ops_.set_core_cq_tag(&meta_tag_);
362  call_.PerformOps(&meta_ops_);
363  }
364 
365  void Read(RequestType* req) override {
366  this->Ref();
367  read_ops_.RecvMessage(req);
368  call_.PerformOps(&read_ops_);
369  }
370 
371  private:
372  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
373 
374  ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx,
375  ::grpc::internal::Call* call,
376  std::function<void()> call_requester)
377  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
378 
379  void SetupReactor(ServerReadReactor<RequestType>* reactor) {
380  reactor_.store(reactor, std::memory_order_relaxed);
381  // The callback for this function should not be inlined because it invokes
382  // a user-controlled reaction, but any resulting OnDone can be inlined in
383  // the executor to which this callback is dispatched.
384  read_tag_.Set(
385  call_.call(),
386  [this, reactor](bool ok) {
387  reactor->OnReadDone(ok);
388  this->MaybeDone(/*inlineable_ondone=*/true);
389  },
390  &read_ops_, /*can_inline=*/false);
391  read_ops_.set_core_cq_tag(&read_tag_);
392  this->BindReactor(reactor);
393  this->MaybeCallOnCancel(reactor);
394  // Inlineable OnDone can be false here because there is no read
395  // reactor that has an inlineable OnDone; this only applies to the
396  // DefaultReactor (which is unary).
397  this->MaybeDone(/*inlineable_ondone=*/false);
398  }
399 
400  ~ServerCallbackReaderImpl() {}
401 
402  ResponseType* response() { return &resp_; }
403 
404  void CallOnDone() override {
405  reactor_.load(std::memory_order_relaxed)->OnDone();
406  grpc_call* call = call_.call();
407  auto call_requester = std::move(call_requester_);
408  if (ctx_->context_allocator() != nullptr) {
409  ctx_->context_allocator()->Release(ctx_);
410  }
411  this->~ServerCallbackReaderImpl(); // explicitly call destructor
413  call_requester();
414  }
415 
416  ServerReactor* reactor() override {
417  return reactor_.load(std::memory_order_relaxed);
418  }
419 
421  meta_ops_;
426  finish_ops_;
430  read_ops_;
432 
433  ::grpc::CallbackServerContext* const ctx_;
435  ResponseType resp_;
436  std::function<void()> call_requester_;
437  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
438  std::atomic<ServerReadReactor<RequestType>*> reactor_;
439  // callbacks_outstanding_ follows a refcount pattern
440  std::atomic<intptr_t> callbacks_outstanding_{
441  3}; // reserve for OnStarted, Finish, and CompletionOp
442  };
443 };
444 
445 template <class RequestType, class ResponseType>
446 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
447  public:
449  std::function<ServerWriteReactor<ResponseType>*(
450  ::grpc::CallbackServerContext*, const RequestType*)>
451  get_reactor)
452  : get_reactor_(std::move(get_reactor)) {}
453  void RunHandler(const HandlerParameter& param) final {
454  // Arena allocate a writer structure
455  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
456 
458  param.call->call(), sizeof(ServerCallbackWriterImpl)))
459  ServerCallbackWriterImpl(
460  static_cast<::grpc::CallbackServerContext*>(param.server_context),
461  param.call, static_cast<RequestType*>(param.request),
462  param.call_requester);
463  // Inlineable OnDone can be false in the CompletionOp callback because there
464  // is no write reactor that has an inlineable OnDone; this only applies to
465  // the DefaultReactor (which is unary).
466  param.server_context->BeginCompletionOp(
467  param.call,
468  [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
469  writer);
470 
471  ServerWriteReactor<ResponseType>* reactor = nullptr;
472  if (param.status.ok()) {
475  get_reactor_,
476  static_cast<::grpc::CallbackServerContext*>(param.server_context),
477  writer->request());
478  }
479  if (reactor == nullptr) {
480  // if deserialization or reactor creator failed, we need to fail the call
482  param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
485  }
486 
487  writer->SetupReactor(reactor);
488  }
489 
491  ::grpc::Status* status, void** /*handler_data*/) final {
492  ::grpc::ByteBuffer buf;
493  buf.set_buffer(req);
494  auto* request =
496  call, sizeof(RequestType))) RequestType();
497  *status =
499  buf.Release();
500  if (status->ok()) {
501  return request;
502  }
503  request->~RequestType();
504  return nullptr;
505  }
506 
507  private:
508  std::function<ServerWriteReactor<ResponseType>*(
509  ::grpc::CallbackServerContext*, const RequestType*)>
510  get_reactor_;
511 
512  class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
513  public:
514  void Finish(::grpc::Status s) override {
515  // A finish tag with only MaybeDone can have its callback inlined
516  // regardless even if OnDone is not inlineable because this callback just
517  // checks a ref and then decides whether or not to dispatch OnDone.
518  finish_tag_.Set(
519  call_.call(),
520  [this](bool) {
521  // Inlineable OnDone can be false here because there is
522  // no write reactor that has an inlineable OnDone; this
523  // only applies to the DefaultReactor (which is unary).
524  this->MaybeDone(/*inlineable_ondone=*/false);
525  },
526  &finish_ops_, /*can_inline=*/true);
527  finish_ops_.set_core_cq_tag(&finish_tag_);
528 
529  if (!ctx_->sent_initial_metadata_) {
530  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
531  ctx_->initial_metadata_flags());
532  if (ctx_->compression_level_set()) {
533  finish_ops_.set_compression_level(ctx_->compression_level());
534  }
535  ctx_->sent_initial_metadata_ = true;
536  }
537  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
538  call_.PerformOps(&finish_ops_);
539  }
540 
541  void SendInitialMetadata() override {
542  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
543  this->Ref();
544  // The callback for this function should not be inlined because it invokes
545  // a user-controlled reaction, but any resulting OnDone can be inlined in
546  // the executor to which this callback is dispatched.
547  meta_tag_.Set(
548  call_.call(),
549  [this](bool ok) {
550  ServerWriteReactor<ResponseType>* reactor =
551  reactor_.load(std::memory_order_relaxed);
552  reactor->OnSendInitialMetadataDone(ok);
553  this->MaybeDone(/*inlineable_ondone=*/true);
554  },
555  &meta_ops_, /*can_inline=*/false);
556  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
557  ctx_->initial_metadata_flags());
558  if (ctx_->compression_level_set()) {
559  meta_ops_.set_compression_level(ctx_->compression_level());
560  }
561  ctx_->sent_initial_metadata_ = true;
562  meta_ops_.set_core_cq_tag(&meta_tag_);
563  call_.PerformOps(&meta_ops_);
564  }
565 
566  void Write(const ResponseType* resp,
567  ::grpc::WriteOptions options) override {
568  this->Ref();
569  if (options.is_last_message()) {
570  options.set_buffer_hint();
571  }
572  if (!ctx_->sent_initial_metadata_) {
573  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
574  ctx_->initial_metadata_flags());
575  if (ctx_->compression_level_set()) {
576  write_ops_.set_compression_level(ctx_->compression_level());
577  }
578  ctx_->sent_initial_metadata_ = true;
579  }
580  // TODO(vjpai): don't assert
581  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
582  call_.PerformOps(&write_ops_);
583  }
584 
585  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
586  ::grpc::Status s) override {
587  // This combines the write into the finish callback
588  // TODO(vjpai): don't assert
589  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
590  Finish(std::move(s));
591  }
592 
593  private:
594  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
595 
596  ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx,
597  ::grpc::internal::Call* call,
598  const RequestType* req,
599  std::function<void()> call_requester)
600  : ctx_(ctx),
601  call_(*call),
602  req_(req),
603  call_requester_(std::move(call_requester)) {}
604 
605  void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
606  reactor_.store(reactor, std::memory_order_relaxed);
607  // The callback for this function should not be inlined because it invokes
608  // a user-controlled reaction, but any resulting OnDone can be inlined in
609  // the executor to which this callback is dispatched.
610  write_tag_.Set(
611  call_.call(),
612  [this, reactor](bool ok) {
613  reactor->OnWriteDone(ok);
614  this->MaybeDone(/*inlineable_ondone=*/true);
615  },
616  &write_ops_, /*can_inline=*/false);
617  write_ops_.set_core_cq_tag(&write_tag_);
618  this->BindReactor(reactor);
619  this->MaybeCallOnCancel(reactor);
620  // Inlineable OnDone can be false here because there is no write
621  // reactor that has an inlineable OnDone; this only applies to the
622  // DefaultReactor (which is unary).
623  this->MaybeDone(/*inlineable_ondone=*/false);
624  }
625  ~ServerCallbackWriterImpl() {
626  if (req_ != nullptr) {
627  req_->~RequestType();
628  }
629  }
630 
631  const RequestType* request() { return req_; }
632 
633  void CallOnDone() override {
634  reactor_.load(std::memory_order_relaxed)->OnDone();
635  grpc_call* call = call_.call();
636  auto call_requester = std::move(call_requester_);
637  if (ctx_->context_allocator() != nullptr) {
638  ctx_->context_allocator()->Release(ctx_);
639  }
640  this->~ServerCallbackWriterImpl(); // explicitly call destructor
642  call_requester();
643  }
644 
645  ServerReactor* reactor() override {
646  return reactor_.load(std::memory_order_relaxed);
647  }
648 
650  meta_ops_;
655  finish_ops_;
659  write_ops_;
661 
662  ::grpc::CallbackServerContext* const ctx_;
664  const RequestType* req_;
665  std::function<void()> call_requester_;
666  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
667  std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
668  // callbacks_outstanding_ follows a refcount pattern
669  std::atomic<intptr_t> callbacks_outstanding_{
670  3}; // reserve for OnStarted, Finish, and CompletionOp
671  };
672 };
673 
674 template <class RequestType, class ResponseType>
675 class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
676  public:
680  get_reactor)
681  : get_reactor_(std::move(get_reactor)) {}
682  void RunHandler(const HandlerParameter& param) final {
683  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
684 
686  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
687  ServerCallbackReaderWriterImpl(
688  static_cast<::grpc::CallbackServerContext*>(param.server_context),
689  param.call, param.call_requester);
690  // Inlineable OnDone can be false in the CompletionOp callback because there
691  // is no bidi reactor that has an inlineable OnDone; this only applies to
692  // the DefaultReactor (which is unary).
693  param.server_context->BeginCompletionOp(
694  param.call,
695  [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
696  stream);
697 
699  if (param.status.ok()) {
702  get_reactor_,
703  static_cast<::grpc::CallbackServerContext*>(param.server_context));
704  }
705 
706  if (reactor == nullptr) {
707  // if deserialization or reactor creator failed, we need to fail the call
709  param.call->call(),
713  }
714 
715  stream->SetupReactor(reactor);
716  }
717 
718  private:
719  std::function<ServerBidiReactor<RequestType, ResponseType>*(
721  get_reactor_;
722 
723  class ServerCallbackReaderWriterImpl
724  : public ServerCallbackReaderWriter<RequestType, ResponseType> {
725  public:
726  void Finish(::grpc::Status s) override {
727  // A finish tag with only MaybeDone can have its callback inlined
728  // regardless even if OnDone is not inlineable because this callback just
729  // checks a ref and then decides whether or not to dispatch OnDone.
730  finish_tag_.Set(
731  call_.call(),
732  [this](bool) {
733  // Inlineable OnDone can be false here because there is
734  // no bidi reactor that has an inlineable OnDone; this
735  // only applies to the DefaultReactor (which is unary).
736  this->MaybeDone(/*inlineable_ondone=*/false);
737  },
738  &finish_ops_, /*can_inline=*/true);
739  finish_ops_.set_core_cq_tag(&finish_tag_);
740 
741  if (!ctx_->sent_initial_metadata_) {
742  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
743  ctx_->initial_metadata_flags());
744  if (ctx_->compression_level_set()) {
745  finish_ops_.set_compression_level(ctx_->compression_level());
746  }
747  ctx_->sent_initial_metadata_ = true;
748  }
749  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
750  call_.PerformOps(&finish_ops_);
751  }
752 
753  void SendInitialMetadata() override {
754  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
755  this->Ref();
756  // The callback for this function should not be inlined because it invokes
757  // a user-controlled reaction, but any resulting OnDone can be inlined in
758  // the executor to which this callback is dispatched.
759  meta_tag_.Set(
760  call_.call(),
761  [this](bool ok) {
762  ServerBidiReactor<RequestType, ResponseType>* reactor =
763  reactor_.load(std::memory_order_relaxed);
764  reactor->OnSendInitialMetadataDone(ok);
765  this->MaybeDone(/*inlineable_ondone=*/true);
766  },
767  &meta_ops_, /*can_inline=*/false);
768  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
769  ctx_->initial_metadata_flags());
770  if (ctx_->compression_level_set()) {
771  meta_ops_.set_compression_level(ctx_->compression_level());
772  }
773  ctx_->sent_initial_metadata_ = true;
774  meta_ops_.set_core_cq_tag(&meta_tag_);
775  call_.PerformOps(&meta_ops_);
776  }
777 
778  void Write(const ResponseType* resp,
779  ::grpc::WriteOptions options) override {
780  this->Ref();
781  if (options.is_last_message()) {
782  options.set_buffer_hint();
783  }
784  if (!ctx_->sent_initial_metadata_) {
785  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
786  ctx_->initial_metadata_flags());
787  if (ctx_->compression_level_set()) {
788  write_ops_.set_compression_level(ctx_->compression_level());
789  }
790  ctx_->sent_initial_metadata_ = true;
791  }
792  // TODO(vjpai): don't assert
793  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
794  call_.PerformOps(&write_ops_);
795  }
796 
797  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
798  ::grpc::Status s) override {
799  // TODO(vjpai): don't assert
800  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
801  Finish(std::move(s));
802  }
803 
804  void Read(RequestType* req) override {
805  this->Ref();
806  read_ops_.RecvMessage(req);
807  call_.PerformOps(&read_ops_);
808  }
809 
810  private:
811  friend class CallbackBidiHandler<RequestType, ResponseType>;
812 
813  ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx,
814  ::grpc::internal::Call* call,
815  std::function<void()> call_requester)
816  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
817 
818  void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
819  reactor_.store(reactor, std::memory_order_relaxed);
820  // The callbacks for these functions should not be inlined because they
821  // invoke user-controlled reactions, but any resulting OnDones can be
822  // inlined in the executor to which a callback is dispatched.
823  write_tag_.Set(
824  call_.call(),
825  [this, reactor](bool ok) {
826  reactor->OnWriteDone(ok);
827  this->MaybeDone(/*inlineable_ondone=*/true);
828  },
829  &write_ops_, /*can_inline=*/false);
830  write_ops_.set_core_cq_tag(&write_tag_);
831  read_tag_.Set(
832  call_.call(),
833  [this, reactor](bool ok) {
834  reactor->OnReadDone(ok);
835  this->MaybeDone(/*inlineable_ondone=*/true);
836  },
837  &read_ops_, /*can_inline=*/false);
838  read_ops_.set_core_cq_tag(&read_tag_);
839  this->BindReactor(reactor);
840  this->MaybeCallOnCancel(reactor);
841  // Inlineable OnDone can be false here because there is no bidi
842  // reactor that has an inlineable OnDone; this only applies to the
843  // DefaultReactor (which is unary).
844  this->MaybeDone(/*inlineable_ondone=*/false);
845  }
846 
847  void CallOnDone() override {
848  reactor_.load(std::memory_order_relaxed)->OnDone();
849  grpc_call* call = call_.call();
850  auto call_requester = std::move(call_requester_);
851  if (ctx_->context_allocator() != nullptr) {
852  ctx_->context_allocator()->Release(ctx_);
853  }
854  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
856  call_requester();
857  }
858 
859  ServerReactor* reactor() override {
860  return reactor_.load(std::memory_order_relaxed);
861  }
862 
864  meta_ops_;
869  finish_ops_;
873  write_ops_;
877  read_ops_;
879 
880  ::grpc::CallbackServerContext* const ctx_;
882  std::function<void()> call_requester_;
883  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
884  std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
885  // callbacks_outstanding_ follows a refcount pattern
886  std::atomic<intptr_t> callbacks_outstanding_{
887  3}; // reserve for OnStarted, Finish, and CompletionOp
888  };
889 };
890 
891 } // namespace internal
892 } // namespace grpc
893 
894 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
grpc::ServerReadReactor
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:182
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:137
grpc::ServerCallbackReaderWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::CallbackUnaryHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:80
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:654
rpc_service_method.h
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:852
status.h
grpc::CoreCodegenInterface::grpc_call_ref
virtual void grpc_call_ref(grpc_call *call)=0
grpc::CallbackServerContext
Definition: server_context.h:586
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:282
grpc::CoreCodegenInterface::grpc_call_arena_alloc
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:35
grpc::internal::CallbackClientStreamingHandler::CallbackClientStreamingHandler
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(::grpc::CallbackServerContext *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:259
grpc::experimental::MessageAllocator< RequestType, ResponseType >
grpc::internal::CallbackClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:264
grpc::ServerCallbackUnary::BindReactor
void BindReactor(Reactor *reactor)
Definition: server_callback.h:201
server_callback.h
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:212
grpc::internal::ServerCallbackCall::MaybeCallOnCancel
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:106
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:118
grpc::ServerCallbackReaderWriter
Definition: server_callback.h:238
grpc::internal::CallbackUnaryHandler::SetMessageAllocator
void SetMessageAllocator(::grpc::experimental::MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_handlers.h:39
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:31
grpc::internal::CallbackServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: server_callback_handlers.h:490
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
grpc::ServerBidiReactor
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:186
grpc_byte_buffer
Definition: grpc_types.h:40
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:61
grpc::ServerCallbackWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::DefaultMessageHolder
Definition: server_callback.h:160
grpc::internal::CallbackBidiHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:682
grpc::SerializationTraits
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
grpc::internal::CallbackUnaryHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:45
grpc::ServerCallbackReader< RequestType >::BindReactor
void BindReactor(ServerReadReactor< RequestType > *reactor)
Definition: server_callback.h:215
grpc::ServerCallbackWriter< ResponseType >::BindReactor
void BindReactor(ServerWriteReactor< ResponseType > *reactor)
Definition: server_callback.h:232
grpc::internal::FinishOnlyReactor
Definition: server_callback.h:758
grpc::internal::CallbackUnaryHandler::CallbackUnaryHandler
CallbackUnaryHandler(std::function< ServerUnaryReactor *(::grpc::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:33
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:38
grpc::CoreCodegenInterface::grpc_call_unref
virtual void grpc_call_unref(grpc_call *call)=0
grpc::internal::CallbackServerStreamingHandler::CallbackServerStreamingHandler
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(::grpc::CallbackServerContext *, const RequestType *)> get_reactor)
Definition: server_callback_handlers.h:448
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:79
grpc::ServerWriteReactor
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:184
grpc::internal::UnimplementedUnaryReactor
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
Definition: server_callback.h:764
grpc::UNIMPLEMENTED
@ UNIMPLEMENTED
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
grpc::ServerCallbackReaderWriter< RequestType, ResponseType >::BindReactor
void BindReactor(ServerBidiReactor< RequestType, ResponseType > *reactor)
Definition: server_callback.h:250
grpc::internal::CallbackBidiHandler::CallbackBidiHandler
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(::grpc::CallbackServerContext *)> get_reactor)
Definition: server_callback_handlers.h:677
grpc::ServerCallbackWriter
Definition: server_callback.h:221
grpc::experimental::MessageHolder::response
ResponseT * response()
Definition: message_allocator.h:49
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:69
std
Definition: async_unary_call.h:398
server_context.h
grpc::WriteOptions::set_buffer_hint
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:117
grpc::ServerCallbackUnary::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::experimental::ServerUnaryReactor
::grpc::ServerUnaryReactor ServerUnaryReactor
Definition: server_callback.h:788
grpc::internal::ServerCallbackCall::Ref
void Ref()
Increases the reference count.
Definition: server_callback.h:124
grpc::ServerCallbackReader
Definition: server_callback.h:207
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:96
GPR_CODEGEN_ASSERT
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
grpc::WriteOptions::is_last_message
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:181
grpc::experimental::MessageAllocator::AllocateMessages
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
grpc::internal::CallOpRecvMessage
Definition: byte_buffer.h:52
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:140
grpc::experimental::MessageHolder::Release
virtual void Release()=0
grpc::internal::CatchingReactorGetter
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
Definition: callback_common.h:52
grpc::ServerCallbackUnary
Definition: server_callback.h:191
grpc::ServerUnaryReactor
Definition: server_callback.h:688
message_allocator.h
grpc::internal::CallbackServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:453
grpc::experimental::MessageHolder< RequestType, ResponseType >
grpc::experimental::MessageHolder::request
RequestT * request()
Definition: message_allocator.h:48
grpc::ServerCallbackReader::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::ServerCallbackCall::MaybeDone
void MaybeDone()
Definition: server_callback.h:92