18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
30 template <
class RequestType,
class ResponseType>
35 const RequestType*, ResponseType*)>
37 : get_reactor_(
std::move(get_reactor)) {}
42 allocator_ = allocator;
48 auto* allocator_state =
static_cast<
53 param.call->call(),
sizeof(ServerCallbackUnaryImpl)))
54 ServerCallbackUnaryImpl(
56 param.call, allocator_state, param.call_requester);
57 param.server_context->BeginCompletionOp(
58 param.call, [call](
bool) { call->MaybeDone(); }, call);
61 if (param.status.ok()) {
62 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
68 if (reactor ==
nullptr) {
77 call->SetupReactor(reactor);
84 RequestType* request =
nullptr;
86 allocator_state =
nullptr;
87 if (allocator_ !=
nullptr) {
95 *handler_data = allocator_state;
96 request = allocator_state->
request();
110 const RequestType*, ResponseType*)>
113 allocator_ =
nullptr;
128 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
131 finish_ops_.set_core_cq_tag(&finish_tag_);
133 if (!ctx_->sent_initial_metadata_) {
135 ctx_->initial_metadata_flags());
136 if (ctx_->compression_level_set()) {
137 finish_ops_.set_compression_level(ctx_->compression_level());
139 ctx_->sent_initial_metadata_ =
true;
143 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
144 finish_ops_.SendMessagePtr(response()));
146 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
148 finish_ops_.set_core_cq_tag(&finish_tag_);
149 call_.PerformOps(&finish_ops_);
152 void SendInitialMetadata()
override {
163 ServerUnaryReactor* reactor =
164 reactor_.load(std::memory_order_relaxed);
165 reactor->OnSendInitialMetadataDone(ok);
166 this->MaybeDone(true);
169 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
170 ctx_->initial_metadata_flags());
171 if (ctx_->compression_level_set()) {
172 meta_ops_.set_compression_level(ctx_->compression_level());
174 ctx_->sent_initial_metadata_ =
true;
175 meta_ops_.set_core_cq_tag(&meta_tag_);
176 call_.PerformOps(&meta_ops_);
182 ServerCallbackUnaryImpl(
186 std::function<
void()> call_requester)
189 allocator_state_(allocator_state),
190 call_requester_(
std::move(call_requester)) {
191 ctx_->set_message_allocator_state(allocator_state);
199 reactor_.store(reactor, std::memory_order_relaxed);
202 this->
MaybeDone(reactor->InternalInlineable());
205 const RequestType* request() {
return allocator_state_->request(); }
206 ResponseType* response() {
return allocator_state_->response(); }
208 void CallOnDone()
override {
209 reactor_.load(std::memory_order_relaxed)->OnDone();
211 auto call_requester = std::move(call_requester_);
212 allocator_state_->Release();
213 if (ctx_->context_allocator() !=
nullptr) {
214 ctx_->context_allocator()->Release(ctx_);
216 this->~ServerCallbackUnaryImpl();
221 ServerReactor* reactor()
override {
222 return reactor_.load(std::memory_order_relaxed);
238 std::function<void()> call_requester_;
249 std::atomic<ServerUnaryReactor*> reactor_;
251 std::atomic<intptr_t> callbacks_outstanding_{
256 template <
class RequestType,
class ResponseType>
263 : get_reactor_(
std::move(get_reactor)) {}
269 param.call->call(),
sizeof(ServerCallbackReaderImpl)))
270 ServerCallbackReaderImpl(
272 param.call, param.call_requester);
276 param.server_context->BeginCompletionOp(
278 [reader](
bool) { reader->MaybeDone(
false); },
282 if (param.status.ok()) {
290 if (reactor ==
nullptr) {
298 reader->SetupReactor(reactor);
318 this->MaybeDone(false);
321 if (!ctx_->sent_initial_metadata_) {
323 ctx_->initial_metadata_flags());
324 if (ctx_->compression_level_set()) {
325 finish_ops_.set_compression_level(ctx_->compression_level());
327 ctx_->sent_initial_metadata_ =
true;
331 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
332 finish_ops_.SendMessagePtr(&resp_));
334 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
336 finish_ops_.set_core_cq_tag(&finish_tag_);
337 call_.PerformOps(&finish_ops_);
340 void SendInitialMetadata()
override {
349 ServerReadReactor<RequestType>* reactor =
350 reactor_.load(std::memory_order_relaxed);
351 reactor->OnSendInitialMetadataDone(ok);
352 this->MaybeDone(true);
355 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
356 ctx_->initial_metadata_flags());
357 if (ctx_->compression_level_set()) {
358 meta_ops_.set_compression_level(ctx_->compression_level());
360 ctx_->sent_initial_metadata_ =
true;
361 meta_ops_.set_core_cq_tag(&meta_tag_);
362 call_.PerformOps(&meta_ops_);
365 void Read(RequestType* req)
override {
367 read_ops_.RecvMessage(req);
368 call_.PerformOps(&read_ops_);
376 std::function<
void()> call_requester)
377 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
379 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
380 reactor_.store(reactor, std::memory_order_relaxed);
386 [
this, reactor](
bool ok) {
387 reactor->OnReadDone(ok);
388 this->MaybeDone(true);
391 read_ops_.set_core_cq_tag(&read_tag_);
400 ~ServerCallbackReaderImpl() {}
402 ResponseType* response() {
return &resp_; }
404 void CallOnDone()
override {
405 reactor_.load(std::memory_order_relaxed)->OnDone();
407 auto call_requester = std::move(call_requester_);
408 if (ctx_->context_allocator() !=
nullptr) {
409 ctx_->context_allocator()->Release(ctx_);
411 this->~ServerCallbackReaderImpl();
416 ServerReactor* reactor()
override {
417 return reactor_.load(std::memory_order_relaxed);
436 std::function<void()> call_requester_;
438 std::atomic<ServerReadReactor<RequestType>*> reactor_;
440 std::atomic<intptr_t> callbacks_outstanding_{
445 template <
class RequestType,
class ResponseType>
452 : get_reactor_(
std::move(get_reactor)) {}
458 param.call->call(),
sizeof(ServerCallbackWriterImpl)))
459 ServerCallbackWriterImpl(
461 param.call,
static_cast<RequestType*
>(param.request),
462 param.call_requester);
466 param.server_context->BeginCompletionOp(
468 [writer](
bool) { writer->MaybeDone(
false); },
472 if (param.status.ok()) {
479 if (reactor ==
nullptr) {
487 writer->SetupReactor(reactor);
496 call,
sizeof(RequestType))) RequestType();
503 request->~RequestType();
508 std::function<ServerWriteReactor<ResponseType>*(
524 this->MaybeDone(false);
527 finish_ops_.set_core_cq_tag(&finish_tag_);
529 if (!ctx_->sent_initial_metadata_) {
531 ctx_->initial_metadata_flags());
532 if (ctx_->compression_level_set()) {
533 finish_ops_.set_compression_level(ctx_->compression_level());
535 ctx_->sent_initial_metadata_ =
true;
537 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
538 call_.PerformOps(&finish_ops_);
541 void SendInitialMetadata()
override {
550 ServerWriteReactor<ResponseType>* reactor =
551 reactor_.load(std::memory_order_relaxed);
552 reactor->OnSendInitialMetadataDone(ok);
553 this->MaybeDone(true);
556 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
557 ctx_->initial_metadata_flags());
558 if (ctx_->compression_level_set()) {
559 meta_ops_.set_compression_level(ctx_->compression_level());
561 ctx_->sent_initial_metadata_ =
true;
562 meta_ops_.set_core_cq_tag(&meta_tag_);
563 call_.PerformOps(&meta_ops_);
566 void Write(
const ResponseType* resp,
572 if (!ctx_->sent_initial_metadata_) {
573 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
574 ctx_->initial_metadata_flags());
575 if (ctx_->compression_level_set()) {
576 write_ops_.set_compression_level(ctx_->compression_level());
578 ctx_->sent_initial_metadata_ =
true;
582 call_.PerformOps(&write_ops_);
590 Finish(std::move(s));
598 const RequestType* req,
599 std::function<
void()> call_requester)
603 call_requester_(
std::move(call_requester)) {}
605 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
606 reactor_.store(reactor, std::memory_order_relaxed);
612 [
this, reactor](
bool ok) {
613 reactor->OnWriteDone(ok);
614 this->MaybeDone(true);
617 write_ops_.set_core_cq_tag(&write_tag_);
625 ~ServerCallbackWriterImpl() {
626 if (req_ !=
nullptr) {
627 req_->~RequestType();
631 const RequestType* request() {
return req_; }
633 void CallOnDone()
override {
634 reactor_.load(std::memory_order_relaxed)->OnDone();
636 auto call_requester = std::move(call_requester_);
637 if (ctx_->context_allocator() !=
nullptr) {
638 ctx_->context_allocator()->Release(ctx_);
640 this->~ServerCallbackWriterImpl();
645 ServerReactor* reactor()
override {
646 return reactor_.load(std::memory_order_relaxed);
664 const RequestType* req_;
665 std::function<void()> call_requester_;
667 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
669 std::atomic<intptr_t> callbacks_outstanding_{
674 template <
class RequestType,
class ResponseType>
681 : get_reactor_(
std::move(get_reactor)) {}
686 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
687 ServerCallbackReaderWriterImpl(
689 param.call, param.call_requester);
693 param.server_context->BeginCompletionOp(
695 [stream](
bool) { stream->MaybeDone(
false); },
699 if (param.status.ok()) {
706 if (reactor ==
nullptr) {
715 stream->SetupReactor(reactor);
719 std::function<ServerBidiReactor<RequestType, ResponseType>*(
723 class ServerCallbackReaderWriterImpl
736 this->MaybeDone(false);
739 finish_ops_.set_core_cq_tag(&finish_tag_);
741 if (!ctx_->sent_initial_metadata_) {
743 ctx_->initial_metadata_flags());
744 if (ctx_->compression_level_set()) {
745 finish_ops_.set_compression_level(ctx_->compression_level());
747 ctx_->sent_initial_metadata_ =
true;
749 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
750 call_.PerformOps(&finish_ops_);
753 void SendInitialMetadata()
override {
762 ServerBidiReactor<RequestType, ResponseType>* reactor =
763 reactor_.load(std::memory_order_relaxed);
764 reactor->OnSendInitialMetadataDone(ok);
765 this->MaybeDone(true);
768 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
769 ctx_->initial_metadata_flags());
770 if (ctx_->compression_level_set()) {
771 meta_ops_.set_compression_level(ctx_->compression_level());
773 ctx_->sent_initial_metadata_ =
true;
774 meta_ops_.set_core_cq_tag(&meta_tag_);
775 call_.PerformOps(&meta_ops_);
778 void Write(
const ResponseType* resp,
784 if (!ctx_->sent_initial_metadata_) {
785 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
786 ctx_->initial_metadata_flags());
787 if (ctx_->compression_level_set()) {
788 write_ops_.set_compression_level(ctx_->compression_level());
790 ctx_->sent_initial_metadata_ =
true;
794 call_.PerformOps(&write_ops_);
801 Finish(std::move(s));
804 void Read(RequestType* req)
override {
806 read_ops_.RecvMessage(req);
807 call_.PerformOps(&read_ops_);
815 std::function<
void()> call_requester)
816 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
818 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
819 reactor_.store(reactor, std::memory_order_relaxed);
825 [
this, reactor](
bool ok) {
826 reactor->OnWriteDone(ok);
827 this->MaybeDone(true);
830 write_ops_.set_core_cq_tag(&write_tag_);
833 [
this, reactor](
bool ok) {
834 reactor->OnReadDone(ok);
835 this->MaybeDone(true);
838 read_ops_.set_core_cq_tag(&read_tag_);
847 void CallOnDone()
override {
848 reactor_.load(std::memory_order_relaxed)->OnDone();
850 auto call_requester = std::move(call_requester_);
851 if (ctx_->context_allocator() !=
nullptr) {
852 ctx_->context_allocator()->Release(ctx_);
854 this->~ServerCallbackReaderWriterImpl();
859 ServerReactor* reactor()
override {
860 return reactor_.load(std::memory_order_relaxed);
882 std::function<void()> call_requester_;
884 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
886 std::atomic<intptr_t> callbacks_outstanding_{
894 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H