GRPC C++  1.32.0
server_callback_handlers.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
20 
26 
27 namespace grpc_impl {
28 namespace internal {
29 
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
32  public:
35  const RequestType*, ResponseType*)>
36  get_reactor)
37  : get_reactor_(std::move(get_reactor)) {}
38 
41  allocator) {
42  allocator_ = allocator;
43  }
44 
45  void RunHandler(const HandlerParameter& param) final {
46  // Arena allocate a controller structure (that includes request/response)
48  auto* allocator_state = static_cast<
50  param.internal_data);
51 
53  param.call->call(), sizeof(ServerCallbackUnaryImpl)))
54  ServerCallbackUnaryImpl(
55  static_cast<::grpc_impl::CallbackServerContext*>(
56  param.server_context),
57  param.call, allocator_state, std::move(param.call_requester));
58  param.server_context->BeginCompletionOp(
59  param.call, [call](bool) { call->MaybeDone(); }, call);
60 
61  ServerUnaryReactor* reactor = nullptr;
62  if (param.status.ok()) {
63  reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
64  get_reactor_,
65  static_cast<::grpc_impl::CallbackServerContext*>(
66  param.server_context),
67  call->request(), call->response());
68  }
69 
70  if (reactor == nullptr) {
71  // if deserialization or reactor creator failed, we need to fail the call
73  param.call->call(), sizeof(UnimplementedUnaryReactor)))
76  }
77 
79  call->SetupReactor(reactor);
80  }
81 
83  ::grpc::Status* status, void** handler_data) final {
85  buf.set_buffer(req);
86  RequestType* request = nullptr;
88  allocator_state = nullptr;
89  if (allocator_ != nullptr) {
90  allocator_state = allocator_->AllocateMessages();
91  } else {
92  allocator_state =
96  }
97  *handler_data = allocator_state;
98  request = allocator_state->request();
99  *status =
101  buf.Release();
102  if (status->ok()) {
103  return request;
104  }
105  // Clean up on deserialization failure.
106  allocator_state->Release();
107  return nullptr;
108  }
109 
110  private:
112  const RequestType*, ResponseType*)>
113  get_reactor_;
115  allocator_ = nullptr;
116 
117  class ServerCallbackUnaryImpl : public ServerCallbackUnary {
118  public:
119  void Finish(::grpc::Status s) override {
120  // A callback that only contains a call to MaybeDone can be run as an
121  // inline callback regardless of whether or not OnDone is inlineable
122  // because if the actual OnDone callback needs to be scheduled, MaybeDone
123  // is responsible for dispatching to an executor thread if needed. Thus,
124  // when setting up the finish_tag_, we can set its own callback to
125  // inlineable.
126  finish_tag_.Set(
127  call_.call(),
128  [this](bool) {
129  this->MaybeDone(
130  reactor_.load(std::memory_order_relaxed)->InternalInlineable());
131  },
132  &finish_ops_, /*can_inline=*/true);
133  finish_ops_.set_core_cq_tag(&finish_tag_);
134 
135  if (!ctx_->sent_initial_metadata_) {
136  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
137  ctx_->initial_metadata_flags());
138  if (ctx_->compression_level_set()) {
139  finish_ops_.set_compression_level(ctx_->compression_level());
140  }
141  ctx_->sent_initial_metadata_ = true;
142  }
143  // The response is dropped if the status is not OK.
144  if (s.ok()) {
145  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
146  finish_ops_.SendMessagePtr(response()));
147  } else {
148  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
149  }
150  finish_ops_.set_core_cq_tag(&finish_tag_);
151  call_.PerformOps(&finish_ops_);
152  }
153 
154  void SendInitialMetadata() override {
155  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
156  this->Ref();
157  // The callback for this function should not be marked inline because it
158  // is directly invoking a user-controlled reaction
159  // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
160  // thread. However, any OnDone needed after that can be inlined because it
161  // is already running on an executor thread.
162  meta_tag_.Set(call_.call(),
163  [this](bool ok) {
164  ServerUnaryReactor* reactor =
165  reactor_.load(std::memory_order_relaxed);
166  reactor->OnSendInitialMetadataDone(ok);
167  this->MaybeDone(/*inlineable_ondone=*/true);
168  },
169  &meta_ops_, /*can_inline=*/false);
170  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
171  ctx_->initial_metadata_flags());
172  if (ctx_->compression_level_set()) {
173  meta_ops_.set_compression_level(ctx_->compression_level());
174  }
175  ctx_->sent_initial_metadata_ = true;
176  meta_ops_.set_core_cq_tag(&meta_tag_);
177  call_.PerformOps(&meta_ops_);
178  }
179 
180  private:
181  friend class CallbackUnaryHandler<RequestType, ResponseType>;
182 
183  ServerCallbackUnaryImpl(
186  allocator_state,
187  std::function<void()> call_requester)
188  : ctx_(ctx),
189  call_(*call),
190  allocator_state_(allocator_state),
191  call_requester_(std::move(call_requester)) {
192  ctx_->set_message_allocator_state(allocator_state);
193  }
194 
199  void SetupReactor(ServerUnaryReactor* reactor) {
200  reactor_.store(reactor, std::memory_order_relaxed);
201  this->BindReactor(reactor);
202  this->MaybeCallOnCancel(reactor);
203  this->MaybeDone(reactor->InternalInlineable());
204  }
205 
206  const RequestType* request() { return allocator_state_->request(); }
207  ResponseType* response() { return allocator_state_->response(); }
208 
209  void CallOnDone() override {
210  reactor_.load(std::memory_order_relaxed)->OnDone();
211  grpc_call* call = call_.call();
212  auto call_requester = std::move(call_requester_);
213  allocator_state_->Release();
214  this->~ServerCallbackUnaryImpl(); // explicitly call destructor
216  call_requester();
217  }
218 
219  ServerReactor* reactor() override {
220  return reactor_.load(std::memory_order_relaxed);
221  }
222 
224  meta_ops_;
229  finish_ops_;
231 
235  allocator_state_;
236  std::function<void()> call_requester_;
237  // reactor_ can always be loaded/stored with relaxed memory ordering because
238  // its value is only set once, independently of other data in the object,
239  // and the loads that use it will always actually come provably later even
240  // though they are from different threads since they are triggered by
241  // actions initiated only by the setting up of the reactor_ variable. In
242  // a sense, it's a delayed "const": it gets its value from the SetupReactor
243  // method (not the constructor, so it's not a true const), but it doesn't
244  // change after that and it only gets used by actions caused, directly or
245  // indirectly, by that setup. This comment also applies to the reactor_
246  // variables of the other streaming objects in this file.
247  std::atomic<ServerUnaryReactor*> reactor_;
248  // callbacks_outstanding_ follows a refcount pattern
249  std::atomic<intptr_t> callbacks_outstanding_{
250  3}; // reserve for start, Finish, and CompletionOp
251  };
252 };
253 
254 template <class RequestType, class ResponseType>
256  public:
258  std::function<ServerReadReactor<RequestType>*(
259  ::grpc_impl::CallbackServerContext*, ResponseType*)>
260  get_reactor)
261  : get_reactor_(std::move(get_reactor)) {}
262  void RunHandler(const HandlerParameter& param) final {
263  // Arena allocate a reader structure (that includes response)
264  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
265 
267  param.call->call(), sizeof(ServerCallbackReaderImpl)))
268  ServerCallbackReaderImpl(
269  static_cast<::grpc_impl::CallbackServerContext*>(
270  param.server_context),
271  param.call, std::move(param.call_requester));
272  // Inlineable OnDone can be false in the CompletionOp callback because there
273  // is no read reactor that has an inlineable OnDone; this only applies to
274  // the DefaultReactor (which is unary).
275  param.server_context->BeginCompletionOp(
276  param.call,
277  [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
278  reader);
279 
280  ServerReadReactor<RequestType>* reactor = nullptr;
281  if (param.status.ok()) {
284  get_reactor_,
285  static_cast<::grpc_impl::CallbackServerContext*>(
286  param.server_context),
287  reader->response());
288  }
289 
290  if (reactor == nullptr) {
291  // if deserialization or reactor creator failed, we need to fail the call
293  param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
296  }
297 
298  reader->SetupReactor(reactor);
299  }
300 
301  private:
302  std::function<ServerReadReactor<RequestType>*(
303  ::grpc_impl::CallbackServerContext*, ResponseType*)>
304  get_reactor_;
305 
306  class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
307  public:
308  void Finish(::grpc::Status s) override {
309  // A finish tag with only MaybeDone can have its callback inlined
310  // regardless even if OnDone is not inlineable because this callback just
311  // checks a ref and then decides whether or not to dispatch OnDone.
312  finish_tag_.Set(call_.call(),
313  [this](bool) {
314  // Inlineable OnDone can be false here because there is
315  // no read reactor that has an inlineable OnDone; this
316  // only applies to the DefaultReactor (which is unary).
317  this->MaybeDone(/*inlineable_ondone=*/false);
318  },
319  &finish_ops_, /*can_inline=*/true);
320  if (!ctx_->sent_initial_metadata_) {
321  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
322  ctx_->initial_metadata_flags());
323  if (ctx_->compression_level_set()) {
324  finish_ops_.set_compression_level(ctx_->compression_level());
325  }
326  ctx_->sent_initial_metadata_ = true;
327  }
328  // The response is dropped if the status is not OK.
329  if (s.ok()) {
330  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
331  finish_ops_.SendMessagePtr(&resp_));
332  } else {
333  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
334  }
335  finish_ops_.set_core_cq_tag(&finish_tag_);
336  call_.PerformOps(&finish_ops_);
337  }
338 
339  void SendInitialMetadata() override {
340  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
341  this->Ref();
342  // The callback for this function should not be inlined because it invokes
343  // a user-controlled reaction, but any resulting OnDone can be inlined in
344  // the executor to which this callback is dispatched.
345  meta_tag_.Set(call_.call(),
346  [this](bool ok) {
347  ServerReadReactor<RequestType>* reactor =
348  reactor_.load(std::memory_order_relaxed);
349  reactor->OnSendInitialMetadataDone(ok);
350  this->MaybeDone(/*inlineable_ondone=*/true);
351  },
352  &meta_ops_, /*can_inline=*/false);
353  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
354  ctx_->initial_metadata_flags());
355  if (ctx_->compression_level_set()) {
356  meta_ops_.set_compression_level(ctx_->compression_level());
357  }
358  ctx_->sent_initial_metadata_ = true;
359  meta_ops_.set_core_cq_tag(&meta_tag_);
360  call_.PerformOps(&meta_ops_);
361  }
362 
363  void Read(RequestType* req) override {
364  this->Ref();
365  read_ops_.RecvMessage(req);
366  call_.PerformOps(&read_ops_);
367  }
368 
369  private:
370  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
371 
372  ServerCallbackReaderImpl(::grpc_impl::CallbackServerContext* ctx,
373  ::grpc::internal::Call* call,
374  std::function<void()> call_requester)
375  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
376 
377  void SetupReactor(ServerReadReactor<RequestType>* reactor) {
378  reactor_.store(reactor, std::memory_order_relaxed);
379  // The callback for this function should not be inlined because it invokes
380  // a user-controlled reaction, but any resulting OnDone can be inlined in
381  // the executor to which this callback is dispatched.
382  read_tag_.Set(call_.call(),
383  [this, reactor](bool ok) {
384  reactor->OnReadDone(ok);
385  this->MaybeDone(/*inlineable_ondone=*/true);
386  },
387  &read_ops_, /*can_inline=*/false);
388  read_ops_.set_core_cq_tag(&read_tag_);
389  this->BindReactor(reactor);
390  this->MaybeCallOnCancel(reactor);
391  // Inlineable OnDone can be false here because there is no read
392  // reactor that has an inlineable OnDone; this only applies to the
393  // DefaultReactor (which is unary).
394  this->MaybeDone(/*inlineable_ondone=*/false);
395  }
396 
397  ~ServerCallbackReaderImpl() {}
398 
399  ResponseType* response() { return &resp_; }
400 
401  void CallOnDone() override {
402  reactor_.load(std::memory_order_relaxed)->OnDone();
403  grpc_call* call = call_.call();
404  auto call_requester = std::move(call_requester_);
405  this->~ServerCallbackReaderImpl(); // explicitly call destructor
407  call_requester();
408  }
409 
410  ServerReactor* reactor() override {
411  return reactor_.load(std::memory_order_relaxed);
412  }
413 
415  meta_ops_;
420  finish_ops_;
424  read_ops_;
426 
429  ResponseType resp_;
430  std::function<void()> call_requester_;
431  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
432  std::atomic<ServerReadReactor<RequestType>*> reactor_;
433  // callbacks_outstanding_ follows a refcount pattern
434  std::atomic<intptr_t> callbacks_outstanding_{
435  3}; // reserve for OnStarted, Finish, and CompletionOp
436  };
437 };
438 
439 template <class RequestType, class ResponseType>
440 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
441  public:
443  std::function<ServerWriteReactor<ResponseType>*(
444  ::grpc_impl::CallbackServerContext*, const RequestType*)>
445  get_reactor)
446  : get_reactor_(std::move(get_reactor)) {}
447  void RunHandler(const HandlerParameter& param) final {
448  // Arena allocate a writer structure
449  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
450 
452  param.call->call(), sizeof(ServerCallbackWriterImpl)))
453  ServerCallbackWriterImpl(
454  static_cast<::grpc_impl::CallbackServerContext*>(
455  param.server_context),
456  param.call, static_cast<RequestType*>(param.request),
457  std::move(param.call_requester));
458  // Inlineable OnDone can be false in the CompletionOp callback because there
459  // is no write reactor that has an inlineable OnDone; this only applies to
460  // the DefaultReactor (which is unary).
461  param.server_context->BeginCompletionOp(
462  param.call,
463  [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
464  writer);
465 
466  ServerWriteReactor<ResponseType>* reactor = nullptr;
467  if (param.status.ok()) {
470  get_reactor_,
471  static_cast<::grpc_impl::CallbackServerContext*>(
472  param.server_context),
473  writer->request());
474  }
475  if (reactor == nullptr) {
476  // if deserialization or reactor creator failed, we need to fail the call
478  param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
481  }
482 
483  writer->SetupReactor(reactor);
484  }
485 
487  ::grpc::Status* status, void** /*handler_data*/) final {
488  ::grpc::ByteBuffer buf;
489  buf.set_buffer(req);
490  auto* request =
492  call, sizeof(RequestType))) RequestType();
493  *status =
495  buf.Release();
496  if (status->ok()) {
497  return request;
498  }
499  request->~RequestType();
500  return nullptr;
501  }
502 
503  private:
504  std::function<ServerWriteReactor<ResponseType>*(
505  ::grpc_impl::CallbackServerContext*, const RequestType*)>
506  get_reactor_;
507 
508  class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
509  public:
510  void Finish(::grpc::Status s) override {
511  // A finish tag with only MaybeDone can have its callback inlined
512  // regardless even if OnDone is not inlineable because this callback just
513  // checks a ref and then decides whether or not to dispatch OnDone.
514  finish_tag_.Set(call_.call(),
515  [this](bool) {
516  // Inlineable OnDone can be false here because there is
517  // no write reactor that has an inlineable OnDone; this
518  // only applies to the DefaultReactor (which is unary).
519  this->MaybeDone(/*inlineable_ondone=*/false);
520  },
521  &finish_ops_, /*can_inline=*/true);
522  finish_ops_.set_core_cq_tag(&finish_tag_);
523 
524  if (!ctx_->sent_initial_metadata_) {
525  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
526  ctx_->initial_metadata_flags());
527  if (ctx_->compression_level_set()) {
528  finish_ops_.set_compression_level(ctx_->compression_level());
529  }
530  ctx_->sent_initial_metadata_ = true;
531  }
532  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
533  call_.PerformOps(&finish_ops_);
534  }
535 
536  void SendInitialMetadata() override {
537  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
538  this->Ref();
539  // The callback for this function should not be inlined because it invokes
540  // a user-controlled reaction, but any resulting OnDone can be inlined in
541  // the executor to which this callback is dispatched.
542  meta_tag_.Set(call_.call(),
543  [this](bool ok) {
544  ServerWriteReactor<ResponseType>* reactor =
545  reactor_.load(std::memory_order_relaxed);
546  reactor->OnSendInitialMetadataDone(ok);
547  this->MaybeDone(/*inlineable_ondone=*/true);
548  },
549  &meta_ops_, /*can_inline=*/false);
550  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
551  ctx_->initial_metadata_flags());
552  if (ctx_->compression_level_set()) {
553  meta_ops_.set_compression_level(ctx_->compression_level());
554  }
555  ctx_->sent_initial_metadata_ = true;
556  meta_ops_.set_core_cq_tag(&meta_tag_);
557  call_.PerformOps(&meta_ops_);
558  }
559 
560  void Write(const ResponseType* resp,
561  ::grpc::WriteOptions options) override {
562  this->Ref();
563  if (options.is_last_message()) {
564  options.set_buffer_hint();
565  }
566  if (!ctx_->sent_initial_metadata_) {
567  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
568  ctx_->initial_metadata_flags());
569  if (ctx_->compression_level_set()) {
570  write_ops_.set_compression_level(ctx_->compression_level());
571  }
572  ctx_->sent_initial_metadata_ = true;
573  }
574  // TODO(vjpai): don't assert
575  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
576  call_.PerformOps(&write_ops_);
577  }
578 
579  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
580  ::grpc::Status s) override {
581  // This combines the write into the finish callback
582  // TODO(vjpai): don't assert
583  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
584  Finish(std::move(s));
585  }
586 
587  private:
588  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
589 
590  ServerCallbackWriterImpl(::grpc_impl::CallbackServerContext* ctx,
591  ::grpc::internal::Call* call,
592  const RequestType* req,
593  std::function<void()> call_requester)
594  : ctx_(ctx),
595  call_(*call),
596  req_(req),
597  call_requester_(std::move(call_requester)) {}
598 
599  void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
600  reactor_.store(reactor, std::memory_order_relaxed);
601  // The callback for this function should not be inlined because it invokes
602  // a user-controlled reaction, but any resulting OnDone can be inlined in
603  // the executor to which this callback is dispatched.
604  write_tag_.Set(call_.call(),
605  [this, reactor](bool ok) {
606  reactor->OnWriteDone(ok);
607  this->MaybeDone(/*inlineable_ondone=*/true);
608  },
609  &write_ops_, /*can_inline=*/false);
610  write_ops_.set_core_cq_tag(&write_tag_);
611  this->BindReactor(reactor);
612  this->MaybeCallOnCancel(reactor);
613  // Inlineable OnDone can be false here because there is no write
614  // reactor that has an inlineable OnDone; this only applies to the
615  // DefaultReactor (which is unary).
616  this->MaybeDone(/*inlineable_ondone=*/false);
617  }
618  ~ServerCallbackWriterImpl() { req_->~RequestType(); }
619 
620  const RequestType* request() { return req_; }
621 
622  void CallOnDone() override {
623  reactor_.load(std::memory_order_relaxed)->OnDone();
624  grpc_call* call = call_.call();
625  auto call_requester = std::move(call_requester_);
626  this->~ServerCallbackWriterImpl(); // explicitly call destructor
628  call_requester();
629  }
630 
631  ServerReactor* reactor() override {
632  return reactor_.load(std::memory_order_relaxed);
633  }
634 
636  meta_ops_;
641  finish_ops_;
645  write_ops_;
647 
650  const RequestType* req_;
651  std::function<void()> call_requester_;
652  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
653  std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
654  // callbacks_outstanding_ follows a refcount pattern
655  std::atomic<intptr_t> callbacks_outstanding_{
656  3}; // reserve for OnStarted, Finish, and CompletionOp
657  };
658 };
659 
660 template <class RequestType, class ResponseType>
662  public:
666  get_reactor)
667  : get_reactor_(std::move(get_reactor)) {}
668  void RunHandler(const HandlerParameter& param) final {
669  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
670 
672  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
673  ServerCallbackReaderWriterImpl(
674  static_cast<::grpc_impl::CallbackServerContext*>(
675  param.server_context),
676  param.call, std::move(param.call_requester));
677  // Inlineable OnDone can be false in the CompletionOp callback because there
678  // is no bidi reactor that has an inlineable OnDone; this only applies to
679  // the DefaultReactor (which is unary).
680  param.server_context->BeginCompletionOp(
681  param.call,
682  [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
683  stream);
684 
686  if (param.status.ok()) {
689  get_reactor_, static_cast<::grpc_impl::CallbackServerContext*>(
690  param.server_context));
691  }
692 
693  if (reactor == nullptr) {
694  // if deserialization or reactor creator failed, we need to fail the call
696  param.call->call(),
700  }
701 
702  stream->SetupReactor(reactor);
703  }
704 
705  private:
706  std::function<ServerBidiReactor<RequestType, ResponseType>*(
708  get_reactor_;
709 
710  class ServerCallbackReaderWriterImpl
711  : public ServerCallbackReaderWriter<RequestType, ResponseType> {
712  public:
713  void Finish(::grpc::Status s) override {
714  // A finish tag with only MaybeDone can have its callback inlined
715  // regardless even if OnDone is not inlineable because this callback just
716  // checks a ref and then decides whether or not to dispatch OnDone.
717  finish_tag_.Set(call_.call(),
718  [this](bool) {
719  // Inlineable OnDone can be false here because there is
720  // no bidi reactor that has an inlineable OnDone; this
721  // only applies to the DefaultReactor (which is unary).
722  this->MaybeDone(/*inlineable_ondone=*/false);
723  },
724  &finish_ops_, /*can_inline=*/true);
725  finish_ops_.set_core_cq_tag(&finish_tag_);
726 
727  if (!ctx_->sent_initial_metadata_) {
728  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
729  ctx_->initial_metadata_flags());
730  if (ctx_->compression_level_set()) {
731  finish_ops_.set_compression_level(ctx_->compression_level());
732  }
733  ctx_->sent_initial_metadata_ = true;
734  }
735  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
736  call_.PerformOps(&finish_ops_);
737  }
738 
739  void SendInitialMetadata() override {
740  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
741  this->Ref();
742  // The callback for this function should not be inlined because it invokes
743  // a user-controlled reaction, but any resulting OnDone can be inlined in
744  // the executor to which this callback is dispatched.
745  meta_tag_.Set(call_.call(),
746  [this](bool ok) {
747  ServerBidiReactor<RequestType, ResponseType>* reactor =
748  reactor_.load(std::memory_order_relaxed);
749  reactor->OnSendInitialMetadataDone(ok);
750  this->MaybeDone(/*inlineable_ondone=*/true);
751  },
752  &meta_ops_, /*can_inline=*/false);
753  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
754  ctx_->initial_metadata_flags());
755  if (ctx_->compression_level_set()) {
756  meta_ops_.set_compression_level(ctx_->compression_level());
757  }
758  ctx_->sent_initial_metadata_ = true;
759  meta_ops_.set_core_cq_tag(&meta_tag_);
760  call_.PerformOps(&meta_ops_);
761  }
762 
763  void Write(const ResponseType* resp,
764  ::grpc::WriteOptions options) override {
765  this->Ref();
766  if (options.is_last_message()) {
767  options.set_buffer_hint();
768  }
769  if (!ctx_->sent_initial_metadata_) {
770  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
771  ctx_->initial_metadata_flags());
772  if (ctx_->compression_level_set()) {
773  write_ops_.set_compression_level(ctx_->compression_level());
774  }
775  ctx_->sent_initial_metadata_ = true;
776  }
777  // TODO(vjpai): don't assert
778  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
779  call_.PerformOps(&write_ops_);
780  }
781 
782  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
783  ::grpc::Status s) override {
784  // TODO(vjpai): don't assert
785  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
786  Finish(std::move(s));
787  }
788 
789  void Read(RequestType* req) override {
790  this->Ref();
791  read_ops_.RecvMessage(req);
792  call_.PerformOps(&read_ops_);
793  }
794 
795  private:
796  friend class CallbackBidiHandler<RequestType, ResponseType>;
797 
798  ServerCallbackReaderWriterImpl(::grpc_impl::CallbackServerContext* ctx,
799  ::grpc::internal::Call* call,
800  std::function<void()> call_requester)
801  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
802 
803  void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
804  reactor_.store(reactor, std::memory_order_relaxed);
805  // The callbacks for these functions should not be inlined because they
806  // invoke user-controlled reactions, but any resulting OnDones can be
807  // inlined in the executor to which a callback is dispatched.
808  write_tag_.Set(call_.call(),
809  [this, reactor](bool ok) {
810  reactor->OnWriteDone(ok);
811  this->MaybeDone(/*inlineable_ondone=*/true);
812  },
813  &write_ops_, /*can_inline=*/false);
814  write_ops_.set_core_cq_tag(&write_tag_);
815  read_tag_.Set(call_.call(),
816  [this, reactor](bool ok) {
817  reactor->OnReadDone(ok);
818  this->MaybeDone(/*inlineable_ondone=*/true);
819  },
820  &read_ops_, /*can_inline=*/false);
821  read_ops_.set_core_cq_tag(&read_tag_);
822  this->BindReactor(reactor);
823  this->MaybeCallOnCancel(reactor);
824  // Inlineable OnDone can be false here because there is no bidi
825  // reactor that has an inlineable OnDone; this only applies to the
826  // DefaultReactor (which is unary).
827  this->MaybeDone(/*inlineable_ondone=*/false);
828  }
829 
830  void CallOnDone() override {
831  reactor_.load(std::memory_order_relaxed)->OnDone();
832  grpc_call* call = call_.call();
833  auto call_requester = std::move(call_requester_);
834  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
836  call_requester();
837  }
838 
839  ServerReactor* reactor() override {
840  return reactor_.load(std::memory_order_relaxed);
841  }
842 
844  meta_ops_;
849  finish_ops_;
853  write_ops_;
857  read_ops_;
859 
862  std::function<void()> call_requester_;
863  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
864  std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
865  // callbacks_outstanding_ follows a refcount pattern
866  std::atomic<intptr_t> callbacks_outstanding_{
867  3}; // reserve for OnStarted, Finish, and CompletionOp
868  };
869 };
870 
871 } // namespace internal
872 } // namespace grpc_impl
873 
874 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:136
grpc_impl::internal::CallbackBidiHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:668
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:653
rpc_service_method.h
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:850
status.h
grpc::CoreCodegenInterface::grpc_call_ref
virtual void grpc_call_ref(grpc_call *call)=0
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:287
grpc_impl::internal::CallbackUnaryHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:45
grpc::internal::MethodHandler::HandlerParameter
Definition: rpc_service_method.h:44
grpc_impl::ServerCallbackUnary::BindReactor
void BindReactor(Reactor *reactor)
Definition: server_callback_impl.h:201
grpc::CoreCodegenInterface::grpc_call_arena_alloc
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
grpc_impl::internal::CallbackUnaryHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:82
grpc_impl::internal::CallbackServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:447
grpc_impl::ServerUnaryReactor
Definition: server_callback_impl.h:693
server_callback_impl.h
grpc_impl::internal::CallbackBidiHandler::CallbackBidiHandler
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(::grpc_impl::CallbackServerContext *)> get_reactor)
Definition: server_callback_handlers.h:663
grpc_impl::internal::CallbackUnaryHandler::SetMessageAllocator
void SetMessageAllocator(::grpc::experimental::MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_handlers.h:39
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:35
grpc_impl::ServerBidiReactor
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback_impl.h:186
grpc::experimental::MessageAllocator< RequestType, ResponseType >
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:217
grpc_impl::CallbackServerContext
Definition: server_context_impl.h:560
grpc_impl::internal::CallbackClientStreamingHandler::CallbackClientStreamingHandler
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(::grpc_impl::CallbackServerContext *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:257
grpc_impl::internal::UnimplementedUnaryReactor
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
Definition: server_callback_impl.h:770
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:118
grpc_impl::internal::ServerCallbackCall::MaybeDone
void MaybeDone()
Definition: server_callback_impl.h:92
grpc_impl::ServerCallbackWriter
Definition: server_callback_impl.h:221
grpc_impl::ServerReadReactor
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback_impl.h:182
grpc_impl::ServerCallbackReader::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:31
grpc_impl::internal::DefaultMessageHolder
Definition: server_callback_impl.h:160
grpc_impl::internal::ServerCallbackCall::MaybeCallOnCancel
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback_impl.h:106
grpc_impl::ServerCallbackReader< RequestType >::BindReactor
void BindReactor(ServerReadReactor< RequestType > *reactor)
Definition: server_callback_impl.h:215
grpc_impl::internal::CallbackBidiHandler
Definition: server_callback_handlers.h:661
grpc_impl::ServerCallbackWriter< ResponseType >::BindReactor
void BindReactor(ServerWriteReactor< ResponseType > *reactor)
Definition: server_callback_impl.h:232
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
grpc_byte_buffer
Definition: grpc_types.h:40
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:67
grpc_impl::internal::CallbackClientStreamingHandler
Definition: server_callback_handlers.h:255
grpc_impl::ServerCallbackUnary::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::experimental::ServerUnaryReactor
::grpc_impl::ServerUnaryReactor ServerUnaryReactor
Definition: server_callback.h:51
grpc::SerializationTraits
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
grpc_impl::ServerCallbackReaderWriter
Definition: server_callback_impl.h:238
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:41
grpc::CoreCodegenInterface::grpc_call_unref
virtual void grpc_call_unref(grpc_call *call)=0
grpc_impl::ServerWriteReactor
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback_impl.h:184
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:79
server_context_impl.h
grpc_impl::internal::CallbackServerStreamingHandler::CallbackServerStreamingHandler
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(::grpc_impl::CallbackServerContext *, const RequestType *)> get_reactor)
Definition: server_callback_handlers.h:442
grpc::UNIMPLEMENTED
@ UNIMPLEMENTED
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
grpc_impl::ServerCallbackUnary
Definition: server_callback_impl.h:191
grpc::experimental::MessageHolder::response
ResponseT * response()
Definition: message_allocator.h:49
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:69
std
Definition: async_unary_call_impl.h:301
grpc_impl::internal::ServerCallbackCall::Ref
void Ref()
Increases the reference count.
Definition: server_callback_impl.h:124
grpc_impl::internal::CallbackUnaryHandler::CallbackUnaryHandler
CallbackUnaryHandler(std::function< ServerUnaryReactor *(::grpc_impl::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:33
grpc::WriteOptions::set_buffer_hint
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:122
grpc_impl::ServerCallbackReaderWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc_impl
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
grpc_impl::internal::CallbackClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:262
grpc_impl::ServerCallbackReader
Definition: server_callback_impl.h:207
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:93
GPR_CODEGEN_ASSERT
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
grpc::WriteOptions::is_last_message
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:186
grpc::experimental::MessageAllocator::AllocateMessages
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
grpc::internal::CallOpRecvMessage
Definition: byte_buffer.h:58
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:146
grpc_impl::ServerCallbackReaderWriter< RequestType, ResponseType >::BindReactor
void BindReactor(ServerBidiReactor< RequestType, ResponseType > *reactor)
Definition: server_callback_impl.h:250
grpc_impl::ServerCallbackWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::experimental::MessageHolder::Release
virtual void Release()=0
grpc::internal::CatchingReactorGetter
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
Definition: callback_common.h:51
grpc_impl::internal::CallbackServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: server_callback_handlers.h:486
message_allocator.h
grpc::experimental::MessageHolder< RequestType, ResponseType >
grpc::experimental::MessageHolder::request
RequestT * request()
Definition: message_allocator.h:48
grpc_impl::internal::FinishOnlyReactor
Definition: server_callback_impl.h:764