18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
30 template <
class RequestType,
class ResponseType>
35 const RequestType*, ResponseType*)>
37 : get_reactor_(
std::move(get_reactor)) {}
42 allocator_ = allocator;
48 auto* allocator_state =
static_cast<
53 param.call->call(),
sizeof(ServerCallbackUnaryImpl)))
54 ServerCallbackUnaryImpl(
56 param.call, allocator_state, std::move(param.call_requester));
57 param.server_context->BeginCompletionOp(
58 param.call, [call](
bool) { call->MaybeDone(); }, call);
61 if (param.status.ok()) {
62 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
68 if (reactor ==
nullptr) {
77 call->SetupReactor(reactor);
84 RequestType* request =
nullptr;
86 allocator_state =
nullptr;
87 if (allocator_ !=
nullptr) {
95 *handler_data = allocator_state;
96 request = allocator_state->
request();
110 const RequestType*, ResponseType*)>
113 allocator_ =
nullptr;
128 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
131 finish_ops_.set_core_cq_tag(&finish_tag_);
133 if (!ctx_->sent_initial_metadata_) {
135 ctx_->initial_metadata_flags());
136 if (ctx_->compression_level_set()) {
137 finish_ops_.set_compression_level(ctx_->compression_level());
139 ctx_->sent_initial_metadata_ =
true;
143 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
144 finish_ops_.SendMessagePtr(response()));
146 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
148 finish_ops_.set_core_cq_tag(&finish_tag_);
149 call_.PerformOps(&finish_ops_);
152 void SendInitialMetadata()
override {
163 ServerUnaryReactor* reactor =
164 reactor_.load(std::memory_order_relaxed);
165 reactor->OnSendInitialMetadataDone(ok);
166 this->MaybeDone(true);
169 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
170 ctx_->initial_metadata_flags());
171 if (ctx_->compression_level_set()) {
172 meta_ops_.set_compression_level(ctx_->compression_level());
174 ctx_->sent_initial_metadata_ =
true;
175 meta_ops_.set_core_cq_tag(&meta_tag_);
176 call_.PerformOps(&meta_ops_);
182 ServerCallbackUnaryImpl(
186 std::function<
void()> call_requester)
189 allocator_state_(allocator_state),
190 call_requester_(
std::move(call_requester)) {
191 ctx_->set_message_allocator_state(allocator_state);
199 reactor_.store(reactor, std::memory_order_relaxed);
202 this->
MaybeDone(reactor->InternalInlineable());
205 const RequestType* request() {
return allocator_state_->request(); }
206 ResponseType* response() {
return allocator_state_->response(); }
208 void CallOnDone()
override {
209 reactor_.load(std::memory_order_relaxed)->OnDone();
211 auto call_requester = std::move(call_requester_);
212 allocator_state_->Release();
213 this->~ServerCallbackUnaryImpl();
218 ServerReactor* reactor()
override {
219 return reactor_.load(std::memory_order_relaxed);
235 std::function<void()> call_requester_;
246 std::atomic<ServerUnaryReactor*> reactor_;
248 std::atomic<intptr_t> callbacks_outstanding_{
253 template <
class RequestType,
class ResponseType>
260 : get_reactor_(
std::move(get_reactor)) {}
266 param.call->call(),
sizeof(ServerCallbackReaderImpl)))
267 ServerCallbackReaderImpl(
269 param.call, std::move(param.call_requester));
273 param.server_context->BeginCompletionOp(
275 [reader](
bool) { reader->MaybeDone(
false); },
279 if (param.status.ok()) {
287 if (reactor ==
nullptr) {
295 reader->SetupReactor(reactor);
315 this->MaybeDone(false);
318 if (!ctx_->sent_initial_metadata_) {
320 ctx_->initial_metadata_flags());
321 if (ctx_->compression_level_set()) {
322 finish_ops_.set_compression_level(ctx_->compression_level());
324 ctx_->sent_initial_metadata_ =
true;
328 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
329 finish_ops_.SendMessagePtr(&resp_));
331 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
333 finish_ops_.set_core_cq_tag(&finish_tag_);
334 call_.PerformOps(&finish_ops_);
337 void SendInitialMetadata()
override {
346 ServerReadReactor<RequestType>* reactor =
347 reactor_.load(std::memory_order_relaxed);
348 reactor->OnSendInitialMetadataDone(ok);
349 this->MaybeDone(true);
352 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
353 ctx_->initial_metadata_flags());
354 if (ctx_->compression_level_set()) {
355 meta_ops_.set_compression_level(ctx_->compression_level());
357 ctx_->sent_initial_metadata_ =
true;
358 meta_ops_.set_core_cq_tag(&meta_tag_);
359 call_.PerformOps(&meta_ops_);
362 void Read(RequestType* req)
override {
364 read_ops_.RecvMessage(req);
365 call_.PerformOps(&read_ops_);
373 std::function<
void()> call_requester)
374 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
376 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
377 reactor_.store(reactor, std::memory_order_relaxed);
383 [
this, reactor](
bool ok) {
384 reactor->OnReadDone(ok);
385 this->MaybeDone(true);
388 read_ops_.set_core_cq_tag(&read_tag_);
397 ~ServerCallbackReaderImpl() {}
399 ResponseType* response() {
return &resp_; }
401 void CallOnDone()
override {
402 reactor_.load(std::memory_order_relaxed)->OnDone();
404 auto call_requester = std::move(call_requester_);
405 this->~ServerCallbackReaderImpl();
410 ServerReactor* reactor()
override {
411 return reactor_.load(std::memory_order_relaxed);
430 std::function<void()> call_requester_;
432 std::atomic<ServerReadReactor<RequestType>*> reactor_;
434 std::atomic<intptr_t> callbacks_outstanding_{
439 template <
class RequestType,
class ResponseType>
446 : get_reactor_(
std::move(get_reactor)) {}
452 param.call->call(),
sizeof(ServerCallbackWriterImpl)))
453 ServerCallbackWriterImpl(
455 param.call,
static_cast<RequestType*
>(param.request),
456 std::move(param.call_requester));
460 param.server_context->BeginCompletionOp(
462 [writer](
bool) { writer->MaybeDone(
false); },
466 if (param.status.ok()) {
473 if (reactor ==
nullptr) {
481 writer->SetupReactor(reactor);
490 call,
sizeof(RequestType))) RequestType();
497 request->~RequestType();
502 std::function<ServerWriteReactor<ResponseType>*(
518 this->MaybeDone(false);
521 finish_ops_.set_core_cq_tag(&finish_tag_);
523 if (!ctx_->sent_initial_metadata_) {
525 ctx_->initial_metadata_flags());
526 if (ctx_->compression_level_set()) {
527 finish_ops_.set_compression_level(ctx_->compression_level());
529 ctx_->sent_initial_metadata_ =
true;
531 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
532 call_.PerformOps(&finish_ops_);
535 void SendInitialMetadata()
override {
544 ServerWriteReactor<ResponseType>* reactor =
545 reactor_.load(std::memory_order_relaxed);
546 reactor->OnSendInitialMetadataDone(ok);
547 this->MaybeDone(true);
550 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
551 ctx_->initial_metadata_flags());
552 if (ctx_->compression_level_set()) {
553 meta_ops_.set_compression_level(ctx_->compression_level());
555 ctx_->sent_initial_metadata_ =
true;
556 meta_ops_.set_core_cq_tag(&meta_tag_);
557 call_.PerformOps(&meta_ops_);
560 void Write(
const ResponseType* resp,
566 if (!ctx_->sent_initial_metadata_) {
567 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
568 ctx_->initial_metadata_flags());
569 if (ctx_->compression_level_set()) {
570 write_ops_.set_compression_level(ctx_->compression_level());
572 ctx_->sent_initial_metadata_ =
true;
576 call_.PerformOps(&write_ops_);
584 Finish(std::move(s));
592 const RequestType* req,
593 std::function<
void()> call_requester)
597 call_requester_(
std::move(call_requester)) {}
599 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
600 reactor_.store(reactor, std::memory_order_relaxed);
606 [
this, reactor](
bool ok) {
607 reactor->OnWriteDone(ok);
608 this->MaybeDone(true);
611 write_ops_.set_core_cq_tag(&write_tag_);
619 ~ServerCallbackWriterImpl() { req_->~RequestType(); }
621 const RequestType* request() {
return req_; }
623 void CallOnDone()
override {
624 reactor_.load(std::memory_order_relaxed)->OnDone();
626 auto call_requester = std::move(call_requester_);
627 this->~ServerCallbackWriterImpl();
632 ServerReactor* reactor()
override {
633 return reactor_.load(std::memory_order_relaxed);
651 const RequestType* req_;
652 std::function<void()> call_requester_;
654 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
656 std::atomic<intptr_t> callbacks_outstanding_{
661 template <
class RequestType,
class ResponseType>
668 : get_reactor_(
std::move(get_reactor)) {}
673 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
674 ServerCallbackReaderWriterImpl(
676 param.call, std::move(param.call_requester));
680 param.server_context->BeginCompletionOp(
682 [stream](
bool) { stream->MaybeDone(
false); },
686 if (param.status.ok()) {
693 if (reactor ==
nullptr) {
702 stream->SetupReactor(reactor);
706 std::function<ServerBidiReactor<RequestType, ResponseType>*(
710 class ServerCallbackReaderWriterImpl
723 this->MaybeDone(false);
726 finish_ops_.set_core_cq_tag(&finish_tag_);
728 if (!ctx_->sent_initial_metadata_) {
730 ctx_->initial_metadata_flags());
731 if (ctx_->compression_level_set()) {
732 finish_ops_.set_compression_level(ctx_->compression_level());
734 ctx_->sent_initial_metadata_ =
true;
736 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
737 call_.PerformOps(&finish_ops_);
740 void SendInitialMetadata()
override {
749 ServerBidiReactor<RequestType, ResponseType>* reactor =
750 reactor_.load(std::memory_order_relaxed);
751 reactor->OnSendInitialMetadataDone(ok);
752 this->MaybeDone(true);
755 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
756 ctx_->initial_metadata_flags());
757 if (ctx_->compression_level_set()) {
758 meta_ops_.set_compression_level(ctx_->compression_level());
760 ctx_->sent_initial_metadata_ =
true;
761 meta_ops_.set_core_cq_tag(&meta_tag_);
762 call_.PerformOps(&meta_ops_);
765 void Write(
const ResponseType* resp,
771 if (!ctx_->sent_initial_metadata_) {
772 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
773 ctx_->initial_metadata_flags());
774 if (ctx_->compression_level_set()) {
775 write_ops_.set_compression_level(ctx_->compression_level());
777 ctx_->sent_initial_metadata_ =
true;
781 call_.PerformOps(&write_ops_);
788 Finish(std::move(s));
791 void Read(RequestType* req)
override {
793 read_ops_.RecvMessage(req);
794 call_.PerformOps(&read_ops_);
802 std::function<
void()> call_requester)
803 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
805 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
806 reactor_.store(reactor, std::memory_order_relaxed);
812 [
this, reactor](
bool ok) {
813 reactor->OnWriteDone(ok);
814 this->MaybeDone(true);
817 write_ops_.set_core_cq_tag(&write_tag_);
820 [
this, reactor](
bool ok) {
821 reactor->OnReadDone(ok);
822 this->MaybeDone(true);
825 read_ops_.set_core_cq_tag(&read_tag_);
834 void CallOnDone()
override {
835 reactor_.load(std::memory_order_relaxed)->OnDone();
837 auto call_requester = std::move(call_requester_);
838 this->~ServerCallbackReaderWriterImpl();
843 ServerReactor* reactor()
override {
844 return reactor_.load(std::memory_order_relaxed);
866 std::function<void()> call_requester_;
868 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
870 std::atomic<intptr_t> callbacks_outstanding_{
878 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H