18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
30 template <
class RequestType,
class ResponseType>
35 const RequestType*, ResponseType*)>
37 : get_reactor_(
std::move(get_reactor)) {}
42 allocator_ = allocator;
48 auto* allocator_state =
static_cast<
53 param.call->call(),
sizeof(ServerCallbackUnaryImpl)))
54 ServerCallbackUnaryImpl(
56 param.call, allocator_state, std::move(param.call_requester));
57 param.server_context->BeginCompletionOp(
58 param.call, [call](
bool) { call->MaybeDone(); }, call);
61 if (param.status.ok()) {
62 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
68 if (reactor ==
nullptr) {
77 call->SetupReactor(reactor);
84 RequestType* request =
nullptr;
86 allocator_state =
nullptr;
87 if (allocator_ !=
nullptr) {
95 *handler_data = allocator_state;
96 request = allocator_state->
request();
110 const RequestType*, ResponseType*)>
113 allocator_ =
nullptr;
128 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
131 finish_ops_.set_core_cq_tag(&finish_tag_);
133 if (!ctx_->sent_initial_metadata_) {
135 ctx_->initial_metadata_flags());
136 if (ctx_->compression_level_set()) {
137 finish_ops_.set_compression_level(ctx_->compression_level());
139 ctx_->sent_initial_metadata_ =
true;
143 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
144 finish_ops_.SendMessagePtr(response()));
146 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
148 finish_ops_.set_core_cq_tag(&finish_tag_);
149 call_.PerformOps(&finish_ops_);
152 void SendInitialMetadata()
override {
160 meta_tag_.Set(call_.call(),
162 ServerUnaryReactor* reactor =
163 reactor_.load(std::memory_order_relaxed);
164 reactor->OnSendInitialMetadataDone(ok);
165 this->MaybeDone(true);
168 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
169 ctx_->initial_metadata_flags());
170 if (ctx_->compression_level_set()) {
171 meta_ops_.set_compression_level(ctx_->compression_level());
173 ctx_->sent_initial_metadata_ =
true;
174 meta_ops_.set_core_cq_tag(&meta_tag_);
175 call_.PerformOps(&meta_ops_);
181 ServerCallbackUnaryImpl(
185 std::function<
void()> call_requester)
188 allocator_state_(allocator_state),
189 call_requester_(
std::move(call_requester)) {
190 ctx_->set_message_allocator_state(allocator_state);
198 reactor_.store(reactor, std::memory_order_relaxed);
201 this->
MaybeDone(reactor->InternalInlineable());
204 const RequestType* request() {
return allocator_state_->request(); }
205 ResponseType* response() {
return allocator_state_->response(); }
207 void CallOnDone()
override {
208 reactor_.load(std::memory_order_relaxed)->OnDone();
210 auto call_requester = std::move(call_requester_);
211 allocator_state_->Release();
212 this->~ServerCallbackUnaryImpl();
217 ServerReactor* reactor()
override {
218 return reactor_.load(std::memory_order_relaxed);
234 std::function<void()> call_requester_;
245 std::atomic<ServerUnaryReactor*> reactor_;
247 std::atomic<intptr_t> callbacks_outstanding_{
252 template <
class RequestType,
class ResponseType>
259 : get_reactor_(
std::move(get_reactor)) {}
265 param.call->call(),
sizeof(ServerCallbackReaderImpl)))
266 ServerCallbackReaderImpl(
268 param.call, std::move(param.call_requester));
272 param.server_context->BeginCompletionOp(
274 [reader](
bool) { reader->MaybeDone(
false); },
278 if (param.status.ok()) {
286 if (reactor ==
nullptr) {
294 reader->SetupReactor(reactor);
308 finish_tag_.Set(call_.call(),
313 this->MaybeDone(false);
316 if (!ctx_->sent_initial_metadata_) {
318 ctx_->initial_metadata_flags());
319 if (ctx_->compression_level_set()) {
320 finish_ops_.set_compression_level(ctx_->compression_level());
322 ctx_->sent_initial_metadata_ =
true;
326 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
327 finish_ops_.SendMessagePtr(&resp_));
329 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
331 finish_ops_.set_core_cq_tag(&finish_tag_);
332 call_.PerformOps(&finish_ops_);
335 void SendInitialMetadata()
override {
341 meta_tag_.Set(call_.call(),
343 ServerReadReactor<RequestType>* reactor =
344 reactor_.load(std::memory_order_relaxed);
345 reactor->OnSendInitialMetadataDone(ok);
346 this->MaybeDone(true);
349 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
350 ctx_->initial_metadata_flags());
351 if (ctx_->compression_level_set()) {
352 meta_ops_.set_compression_level(ctx_->compression_level());
354 ctx_->sent_initial_metadata_ =
true;
355 meta_ops_.set_core_cq_tag(&meta_tag_);
356 call_.PerformOps(&meta_ops_);
359 void Read(RequestType* req)
override {
361 read_ops_.RecvMessage(req);
362 call_.PerformOps(&read_ops_);
370 std::function<
void()> call_requester)
371 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
373 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
374 reactor_.store(reactor, std::memory_order_relaxed);
378 read_tag_.Set(call_.call(),
379 [
this, reactor](
bool ok) {
380 reactor->OnReadDone(ok);
381 this->MaybeDone(true);
384 read_ops_.set_core_cq_tag(&read_tag_);
393 ~ServerCallbackReaderImpl() {}
395 ResponseType* response() {
return &resp_; }
397 void CallOnDone()
override {
398 reactor_.load(std::memory_order_relaxed)->OnDone();
400 auto call_requester = std::move(call_requester_);
401 this->~ServerCallbackReaderImpl();
406 ServerReactor* reactor()
override {
407 return reactor_.load(std::memory_order_relaxed);
426 std::function<void()> call_requester_;
428 std::atomic<ServerReadReactor<RequestType>*> reactor_;
430 std::atomic<intptr_t> callbacks_outstanding_{
435 template <
class RequestType,
class ResponseType>
442 : get_reactor_(
std::move(get_reactor)) {}
448 param.call->call(),
sizeof(ServerCallbackWriterImpl)))
449 ServerCallbackWriterImpl(
451 param.call,
static_cast<RequestType*
>(param.request),
452 std::move(param.call_requester));
456 param.server_context->BeginCompletionOp(
458 [writer](
bool) { writer->MaybeDone(
false); },
462 if (param.status.ok()) {
469 if (reactor ==
nullptr) {
477 writer->SetupReactor(reactor);
486 call,
sizeof(RequestType))) RequestType();
493 request->~RequestType();
498 std::function<ServerWriteReactor<ResponseType>*(
508 finish_tag_.Set(call_.call(),
513 this->MaybeDone(false);
516 finish_ops_.set_core_cq_tag(&finish_tag_);
518 if (!ctx_->sent_initial_metadata_) {
520 ctx_->initial_metadata_flags());
521 if (ctx_->compression_level_set()) {
522 finish_ops_.set_compression_level(ctx_->compression_level());
524 ctx_->sent_initial_metadata_ =
true;
526 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
527 call_.PerformOps(&finish_ops_);
530 void SendInitialMetadata()
override {
536 meta_tag_.Set(call_.call(),
538 ServerWriteReactor<ResponseType>* reactor =
539 reactor_.load(std::memory_order_relaxed);
540 reactor->OnSendInitialMetadataDone(ok);
541 this->MaybeDone(true);
544 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
545 ctx_->initial_metadata_flags());
546 if (ctx_->compression_level_set()) {
547 meta_ops_.set_compression_level(ctx_->compression_level());
549 ctx_->sent_initial_metadata_ =
true;
550 meta_ops_.set_core_cq_tag(&meta_tag_);
551 call_.PerformOps(&meta_ops_);
554 void Write(
const ResponseType* resp,
560 if (!ctx_->sent_initial_metadata_) {
561 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
562 ctx_->initial_metadata_flags());
563 if (ctx_->compression_level_set()) {
564 write_ops_.set_compression_level(ctx_->compression_level());
566 ctx_->sent_initial_metadata_ =
true;
570 call_.PerformOps(&write_ops_);
578 Finish(std::move(s));
586 const RequestType* req,
587 std::function<
void()> call_requester)
591 call_requester_(
std::move(call_requester)) {}
593 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
594 reactor_.store(reactor, std::memory_order_relaxed);
598 write_tag_.Set(call_.call(),
599 [
this, reactor](
bool ok) {
600 reactor->OnWriteDone(ok);
601 this->MaybeDone(true);
604 write_ops_.set_core_cq_tag(&write_tag_);
612 ~ServerCallbackWriterImpl() { req_->~RequestType(); }
614 const RequestType* request() {
return req_; }
616 void CallOnDone()
override {
617 reactor_.load(std::memory_order_relaxed)->OnDone();
619 auto call_requester = std::move(call_requester_);
620 this->~ServerCallbackWriterImpl();
625 ServerReactor* reactor()
override {
626 return reactor_.load(std::memory_order_relaxed);
644 const RequestType* req_;
645 std::function<void()> call_requester_;
647 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
649 std::atomic<intptr_t> callbacks_outstanding_{
654 template <
class RequestType,
class ResponseType>
661 : get_reactor_(
std::move(get_reactor)) {}
666 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
667 ServerCallbackReaderWriterImpl(
669 param.call, std::move(param.call_requester));
673 param.server_context->BeginCompletionOp(
675 [stream](
bool) { stream->MaybeDone(
false); },
679 if (param.status.ok()) {
686 if (reactor ==
nullptr) {
695 stream->SetupReactor(reactor);
699 std::function<ServerBidiReactor<RequestType, ResponseType>*(
703 class ServerCallbackReaderWriterImpl
710 finish_tag_.Set(call_.call(),
715 this->MaybeDone(false);
718 finish_ops_.set_core_cq_tag(&finish_tag_);
720 if (!ctx_->sent_initial_metadata_) {
722 ctx_->initial_metadata_flags());
723 if (ctx_->compression_level_set()) {
724 finish_ops_.set_compression_level(ctx_->compression_level());
726 ctx_->sent_initial_metadata_ =
true;
728 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
729 call_.PerformOps(&finish_ops_);
732 void SendInitialMetadata()
override {
738 meta_tag_.Set(call_.call(),
740 ServerBidiReactor<RequestType, ResponseType>* reactor =
741 reactor_.load(std::memory_order_relaxed);
742 reactor->OnSendInitialMetadataDone(ok);
743 this->MaybeDone(true);
746 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
747 ctx_->initial_metadata_flags());
748 if (ctx_->compression_level_set()) {
749 meta_ops_.set_compression_level(ctx_->compression_level());
751 ctx_->sent_initial_metadata_ =
true;
752 meta_ops_.set_core_cq_tag(&meta_tag_);
753 call_.PerformOps(&meta_ops_);
756 void Write(
const ResponseType* resp,
762 if (!ctx_->sent_initial_metadata_) {
763 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
764 ctx_->initial_metadata_flags());
765 if (ctx_->compression_level_set()) {
766 write_ops_.set_compression_level(ctx_->compression_level());
768 ctx_->sent_initial_metadata_ =
true;
772 call_.PerformOps(&write_ops_);
779 Finish(std::move(s));
782 void Read(RequestType* req)
override {
784 read_ops_.RecvMessage(req);
785 call_.PerformOps(&read_ops_);
793 std::function<
void()> call_requester)
794 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
796 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
797 reactor_.store(reactor, std::memory_order_relaxed);
801 write_tag_.Set(call_.call(),
802 [
this, reactor](
bool ok) {
803 reactor->OnWriteDone(ok);
804 this->MaybeDone(true);
807 write_ops_.set_core_cq_tag(&write_tag_);
808 read_tag_.Set(call_.call(),
809 [
this, reactor](
bool ok) {
810 reactor->OnReadDone(ok);
811 this->MaybeDone(true);
814 read_ops_.set_core_cq_tag(&read_tag_);
823 void CallOnDone()
override {
824 reactor_.load(std::memory_order_relaxed)->OnDone();
826 auto call_requester = std::move(call_requester_);
827 this->~ServerCallbackReaderWriterImpl();
832 ServerReactor* reactor()
override {
833 return reactor_.load(std::memory_order_relaxed);
855 std::function<void()> call_requester_;
857 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
859 std::atomic<intptr_t> callbacks_outstanding_{
867 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H