18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
30 template <
class RequestType,
class ResponseType>
35 const RequestType*, ResponseType*)>
37 : get_reactor_(
std::move(get_reactor)) {}
42 allocator_ = allocator;
48 auto* allocator_state =
static_cast<
53 param.call->call(),
sizeof(ServerCallbackUnaryImpl)))
54 ServerCallbackUnaryImpl(
56 param.server_context),
57 param.call, allocator_state, std::move(param.call_requester));
58 param.server_context->BeginCompletionOp(
59 param.call, [call](
bool) { call->MaybeDone(); }, call);
62 if (param.status.ok()) {
63 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
66 param.server_context),
70 if (reactor ==
nullptr) {
79 call->SetupReactor(reactor);
86 RequestType* request =
nullptr;
88 allocator_state =
nullptr;
89 if (allocator_ !=
nullptr) {
97 *handler_data = allocator_state;
98 request = allocator_state->
request();
112 const RequestType*, ResponseType*)>
115 allocator_ =
nullptr;
130 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
133 finish_ops_.set_core_cq_tag(&finish_tag_);
135 if (!ctx_->sent_initial_metadata_) {
137 ctx_->initial_metadata_flags());
138 if (ctx_->compression_level_set()) {
139 finish_ops_.set_compression_level(ctx_->compression_level());
141 ctx_->sent_initial_metadata_ =
true;
145 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
146 finish_ops_.SendMessagePtr(response()));
148 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
150 finish_ops_.set_core_cq_tag(&finish_tag_);
151 call_.PerformOps(&finish_ops_);
154 void SendInitialMetadata()
override {
162 meta_tag_.Set(call_.call(),
164 ServerUnaryReactor* reactor =
165 reactor_.load(std::memory_order_relaxed);
166 reactor->OnSendInitialMetadataDone(ok);
167 this->MaybeDone(true);
170 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
171 ctx_->initial_metadata_flags());
172 if (ctx_->compression_level_set()) {
173 meta_ops_.set_compression_level(ctx_->compression_level());
175 ctx_->sent_initial_metadata_ =
true;
176 meta_ops_.set_core_cq_tag(&meta_tag_);
177 call_.PerformOps(&meta_ops_);
183 ServerCallbackUnaryImpl(
187 std::function<
void()> call_requester)
190 allocator_state_(allocator_state),
191 call_requester_(
std::move(call_requester)) {
192 ctx_->set_message_allocator_state(allocator_state);
200 reactor_.store(reactor, std::memory_order_relaxed);
203 this->
MaybeDone(reactor->InternalInlineable());
206 const RequestType* request() {
return allocator_state_->request(); }
207 ResponseType* response() {
return allocator_state_->response(); }
209 void CallOnDone()
override {
210 reactor_.load(std::memory_order_relaxed)->OnDone();
212 auto call_requester = std::move(call_requester_);
213 allocator_state_->Release();
214 this->~ServerCallbackUnaryImpl();
219 ServerReactor* reactor()
override {
220 return reactor_.load(std::memory_order_relaxed);
236 std::function<void()> call_requester_;
247 std::atomic<ServerUnaryReactor*> reactor_;
249 std::atomic<intptr_t> callbacks_outstanding_{
254 template <
class RequestType,
class ResponseType>
261 : get_reactor_(
std::move(get_reactor)) {}
267 param.call->call(),
sizeof(ServerCallbackReaderImpl)))
268 ServerCallbackReaderImpl(
270 param.server_context),
271 param.call, std::move(param.call_requester));
275 param.server_context->BeginCompletionOp(
277 [reader](
bool) { reader->MaybeDone(
false); },
281 if (param.status.ok()) {
286 param.server_context),
290 if (reactor ==
nullptr) {
298 reader->SetupReactor(reactor);
302 std::function<ServerReadReactor<RequestType>*(
312 finish_tag_.Set(call_.call(),
317 this->MaybeDone(false);
320 if (!ctx_->sent_initial_metadata_) {
322 ctx_->initial_metadata_flags());
323 if (ctx_->compression_level_set()) {
324 finish_ops_.set_compression_level(ctx_->compression_level());
326 ctx_->sent_initial_metadata_ =
true;
330 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
331 finish_ops_.SendMessagePtr(&resp_));
333 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
335 finish_ops_.set_core_cq_tag(&finish_tag_);
336 call_.PerformOps(&finish_ops_);
339 void SendInitialMetadata()
override {
345 meta_tag_.Set(call_.call(),
347 ServerReadReactor<RequestType>* reactor =
348 reactor_.load(std::memory_order_relaxed);
349 reactor->OnSendInitialMetadataDone(ok);
350 this->MaybeDone(true);
353 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
354 ctx_->initial_metadata_flags());
355 if (ctx_->compression_level_set()) {
356 meta_ops_.set_compression_level(ctx_->compression_level());
358 ctx_->sent_initial_metadata_ =
true;
359 meta_ops_.set_core_cq_tag(&meta_tag_);
360 call_.PerformOps(&meta_ops_);
363 void Read(RequestType* req)
override {
365 read_ops_.RecvMessage(req);
366 call_.PerformOps(&read_ops_);
374 std::function<
void()> call_requester)
375 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
377 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
378 reactor_.store(reactor, std::memory_order_relaxed);
382 read_tag_.Set(call_.call(),
383 [
this, reactor](
bool ok) {
384 reactor->OnReadDone(ok);
385 this->MaybeDone(true);
388 read_ops_.set_core_cq_tag(&read_tag_);
397 ~ServerCallbackReaderImpl() {}
399 ResponseType* response() {
return &resp_; }
401 void CallOnDone()
override {
402 reactor_.load(std::memory_order_relaxed)->OnDone();
404 auto call_requester = std::move(call_requester_);
405 this->~ServerCallbackReaderImpl();
410 ServerReactor* reactor()
override {
411 return reactor_.load(std::memory_order_relaxed);
430 std::function<void()> call_requester_;
432 std::atomic<ServerReadReactor<RequestType>*> reactor_;
434 std::atomic<intptr_t> callbacks_outstanding_{
439 template <
class RequestType,
class ResponseType>
446 : get_reactor_(
std::move(get_reactor)) {}
452 param.call->call(),
sizeof(ServerCallbackWriterImpl)))
453 ServerCallbackWriterImpl(
455 param.server_context),
456 param.call,
static_cast<RequestType*
>(param.request),
457 std::move(param.call_requester));
461 param.server_context->BeginCompletionOp(
463 [writer](
bool) { writer->MaybeDone(
false); },
467 if (param.status.ok()) {
472 param.server_context),
475 if (reactor ==
nullptr) {
483 writer->SetupReactor(reactor);
492 call,
sizeof(RequestType))) RequestType();
499 request->~RequestType();
504 std::function<ServerWriteReactor<ResponseType>*(
514 finish_tag_.Set(call_.call(),
519 this->MaybeDone(false);
522 finish_ops_.set_core_cq_tag(&finish_tag_);
524 if (!ctx_->sent_initial_metadata_) {
526 ctx_->initial_metadata_flags());
527 if (ctx_->compression_level_set()) {
528 finish_ops_.set_compression_level(ctx_->compression_level());
530 ctx_->sent_initial_metadata_ =
true;
532 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
533 call_.PerformOps(&finish_ops_);
536 void SendInitialMetadata()
override {
542 meta_tag_.Set(call_.call(),
544 ServerWriteReactor<ResponseType>* reactor =
545 reactor_.load(std::memory_order_relaxed);
546 reactor->OnSendInitialMetadataDone(ok);
547 this->MaybeDone(true);
550 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
551 ctx_->initial_metadata_flags());
552 if (ctx_->compression_level_set()) {
553 meta_ops_.set_compression_level(ctx_->compression_level());
555 ctx_->sent_initial_metadata_ =
true;
556 meta_ops_.set_core_cq_tag(&meta_tag_);
557 call_.PerformOps(&meta_ops_);
560 void Write(
const ResponseType* resp,
566 if (!ctx_->sent_initial_metadata_) {
567 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
568 ctx_->initial_metadata_flags());
569 if (ctx_->compression_level_set()) {
570 write_ops_.set_compression_level(ctx_->compression_level());
572 ctx_->sent_initial_metadata_ =
true;
576 call_.PerformOps(&write_ops_);
584 Finish(std::move(s));
592 const RequestType* req,
593 std::function<
void()> call_requester)
597 call_requester_(
std::move(call_requester)) {}
599 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
600 reactor_.store(reactor, std::memory_order_relaxed);
604 write_tag_.Set(call_.call(),
605 [
this, reactor](
bool ok) {
606 reactor->OnWriteDone(ok);
607 this->MaybeDone(true);
610 write_ops_.set_core_cq_tag(&write_tag_);
618 ~ServerCallbackWriterImpl() { req_->~RequestType(); }
620 const RequestType* request() {
return req_; }
622 void CallOnDone()
override {
623 reactor_.load(std::memory_order_relaxed)->OnDone();
625 auto call_requester = std::move(call_requester_);
626 this->~ServerCallbackWriterImpl();
631 ServerReactor* reactor()
override {
632 return reactor_.load(std::memory_order_relaxed);
650 const RequestType* req_;
651 std::function<void()> call_requester_;
653 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
655 std::atomic<intptr_t> callbacks_outstanding_{
660 template <
class RequestType,
class ResponseType>
667 : get_reactor_(
std::move(get_reactor)) {}
672 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
673 ServerCallbackReaderWriterImpl(
675 param.server_context),
676 param.call, std::move(param.call_requester));
680 param.server_context->BeginCompletionOp(
682 [stream](
bool) { stream->MaybeDone(
false); },
686 if (param.status.ok()) {
690 param.server_context));
693 if (reactor ==
nullptr) {
702 stream->SetupReactor(reactor);
706 std::function<ServerBidiReactor<RequestType, ResponseType>*(
710 class ServerCallbackReaderWriterImpl
717 finish_tag_.Set(call_.call(),
722 this->MaybeDone(false);
725 finish_ops_.set_core_cq_tag(&finish_tag_);
727 if (!ctx_->sent_initial_metadata_) {
729 ctx_->initial_metadata_flags());
730 if (ctx_->compression_level_set()) {
731 finish_ops_.set_compression_level(ctx_->compression_level());
733 ctx_->sent_initial_metadata_ =
true;
735 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
736 call_.PerformOps(&finish_ops_);
739 void SendInitialMetadata()
override {
745 meta_tag_.Set(call_.call(),
747 ServerBidiReactor<RequestType, ResponseType>* reactor =
748 reactor_.load(std::memory_order_relaxed);
749 reactor->OnSendInitialMetadataDone(ok);
750 this->MaybeDone(true);
753 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
754 ctx_->initial_metadata_flags());
755 if (ctx_->compression_level_set()) {
756 meta_ops_.set_compression_level(ctx_->compression_level());
758 ctx_->sent_initial_metadata_ =
true;
759 meta_ops_.set_core_cq_tag(&meta_tag_);
760 call_.PerformOps(&meta_ops_);
763 void Write(
const ResponseType* resp,
769 if (!ctx_->sent_initial_metadata_) {
770 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
771 ctx_->initial_metadata_flags());
772 if (ctx_->compression_level_set()) {
773 write_ops_.set_compression_level(ctx_->compression_level());
775 ctx_->sent_initial_metadata_ =
true;
779 call_.PerformOps(&write_ops_);
786 Finish(std::move(s));
789 void Read(RequestType* req)
override {
791 read_ops_.RecvMessage(req);
792 call_.PerformOps(&read_ops_);
800 std::function<
void()> call_requester)
801 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
803 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
804 reactor_.store(reactor, std::memory_order_relaxed);
808 write_tag_.Set(call_.call(),
809 [
this, reactor](
bool ok) {
810 reactor->OnWriteDone(ok);
811 this->MaybeDone(true);
814 write_ops_.set_core_cq_tag(&write_tag_);
815 read_tag_.Set(call_.call(),
816 [
this, reactor](
bool ok) {
817 reactor->OnReadDone(ok);
818 this->MaybeDone(true);
821 read_ops_.set_core_cq_tag(&read_tag_);
830 void CallOnDone()
override {
831 reactor_.load(std::memory_order_relaxed)->OnDone();
833 auto call_requester = std::move(call_requester_);
834 this->~ServerCallbackReaderWriterImpl();
839 ServerReactor* reactor()
override {
840 return reactor_.load(std::memory_order_relaxed);
862 std::function<void()> call_requester_;
864 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
866 std::atomic<intptr_t> callbacks_outstanding_{
874 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H