19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H 20 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H 24 #include <type_traits> 49 namespace experimental {
52 template <
class Request,
class Response>
54 template <
class Request,
class Response>
56 template <
class Request,
class Response>
67 virtual void Finish(
Status s) = 0;
71 virtual void SendInitialMetadata(std::function<
void(
bool)>) = 0;
104 virtual void SetCancelCallback(std::function<
void()> callback) = 0;
105 virtual void ClearCancelCallback() = 0;
111 template <
class Request>
115 virtual void Finish(
Status s) = 0;
116 virtual void SendInitialMetadata() = 0;
117 virtual void Read(Request* msg) = 0;
120 template <
class Response>
122 reactor->BindReader(
this);
126 template <
class Response>
131 virtual void Finish(
Status s) = 0;
132 virtual void SendInitialMetadata() = 0;
133 virtual void Write(
const Response* msg,
WriteOptions options) = 0;
137 Write(msg, std::move(options));
138 Finish(std::move(s));
142 template <
class Request>
144 reactor->BindWriter(
this);
148 template <
class Request,
class Response>
153 virtual void Finish(
Status s) = 0;
154 virtual void SendInitialMetadata() = 0;
155 virtual void Read(Request* msg) = 0;
156 virtual void Write(
const Response* msg,
WriteOptions options) = 0;
160 Write(msg, std::move(options));
161 Finish(std::move(s));
166 reactor->BindStream(
this);
181 template <
class Request,
class Response>
214 stream_->Write(resp, std::move(options));
232 stream_->WriteAndFinish(resp, std::move(options), std::move(s));
302 template <
class Request,
class Response>
334 template <
class Request,
class Response>
343 writer_->Write(resp, std::move(options));
347 writer_->WriteAndFinish(resp, std::move(options), std::move(s));
378 template <
class Request,
class Response>
388 template <
class Request,
class Response>
398 template <
class Request,
class Response>
408 template <
class RequestType,
class ResponseType>
412 std::function<
void(
ServerContext*,
const RequestType*, ResponseType*,
420 param.call->call(),
sizeof(ServerCallbackRpcControllerImpl)))
421 ServerCallbackRpcControllerImpl(
422 param.server_context, param.call,
423 static_cast<RequestType*>(param.request),
424 std::move(param.call_requester));
425 Status status = param.status;
430 controller->response(), controller);
433 controller->Finish(status);
442 call,
sizeof(RequestType))) RequestType();
448 request->~RequestType();
453 std::function<void(
ServerContext*,
const RequestType*, ResponseType*,
460 class ServerCallbackRpcControllerImpl
463 void Finish(
Status s)
override {
464 finish_tag_.Set(call_.call(), [
this](bool) { MaybeDone(); },
466 if (!ctx_->sent_initial_metadata_) {
468 ctx_->initial_metadata_flags());
469 if (ctx_->compression_level_set()) {
470 finish_ops_.set_compression_level(ctx_->compression_level());
472 ctx_->sent_initial_metadata_ =
true;
476 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
477 finish_ops_.SendMessagePtr(&resp_));
479 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
481 finish_ops_.set_core_cq_tag(&finish_tag_);
482 call_.PerformOps(&finish_ops_);
485 void SendInitialMetadata(std::function<
void(
bool)> f)
override {
487 callbacks_outstanding_++;
490 meta_tag_.Set(call_.call(),
496 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
497 ctx_->initial_metadata_flags());
498 if (ctx_->compression_level_set()) {
499 meta_ops_.set_compression_level(ctx_->compression_level());
501 ctx_->sent_initial_metadata_ =
true;
502 meta_ops_.set_core_cq_tag(&meta_tag_);
503 call_.PerformOps(&meta_ops_);
509 void SetCancelCallback(std::function<
void()> callback)
override {
510 ctx_->SetCancelCallback(std::move(callback));
513 void ClearCancelCallback()
override { ctx_->ClearCancelCallback(); }
519 const RequestType* req,
520 std::function<
void()> call_requester)
524 call_requester_(std::move(call_requester)) {
525 ctx_->BeginCompletionOp(call, [
this](
bool) { MaybeDone(); },
nullptr);
528 ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); }
530 const RequestType* request() {
return req_; }
531 ResponseType* response() {
return &resp_; }
534 if (--callbacks_outstanding_ == 0) {
536 auto call_requester = std::move(call_requester_);
537 this->~ServerCallbackRpcControllerImpl();
552 const RequestType* req_;
554 std::function<void()> call_requester_;
555 std::atomic_int callbacks_outstanding_{
560 template <
class RequestType,
class ResponseType>
567 : func_(
std::move(func)) {}
579 if (reactor ==
nullptr) {
585 param.call->call(),
sizeof(ServerCallbackReaderImpl)))
586 ServerCallbackReaderImpl(param.server_context, param.call,
587 std::move(param.call_requester), reactor);
589 reader->BindReactor(reactor);
590 reactor->
OnStarted(param.server_context, reader->response());
595 std::function<experimental::ServerReadReactor<RequestType, ResponseType>*()>
598 class ServerCallbackReaderImpl
601 void Finish(
Status s)
override {
602 finish_tag_.Set(call_.call(), [
this](bool) { MaybeDone(); },
604 if (!ctx_->sent_initial_metadata_) {
606 ctx_->initial_metadata_flags());
607 if (ctx_->compression_level_set()) {
608 finish_ops_.set_compression_level(ctx_->compression_level());
610 ctx_->sent_initial_metadata_ =
true;
614 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
615 finish_ops_.SendMessagePtr(&resp_));
617 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
619 finish_ops_.set_core_cq_tag(&finish_tag_);
620 call_.PerformOps(&finish_ops_);
623 void SendInitialMetadata()
override {
625 callbacks_outstanding_++;
626 meta_tag_.Set(call_.call(),
628 reactor_->OnSendInitialMetadataDone(ok);
632 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
633 ctx_->initial_metadata_flags());
634 if (ctx_->compression_level_set()) {
635 meta_ops_.set_compression_level(ctx_->compression_level());
637 ctx_->sent_initial_metadata_ =
true;
638 meta_ops_.set_core_cq_tag(&meta_tag_);
639 call_.PerformOps(&meta_ops_);
642 void Read(RequestType* req)
override {
643 callbacks_outstanding_++;
644 read_ops_.RecvMessage(req);
645 call_.PerformOps(&read_ops_);
651 ServerCallbackReaderImpl(
656 call_requester_(std::move(call_requester)),
658 ctx_->BeginCompletionOp(call, [
this](
bool) { MaybeDone(); }, reactor);
659 read_tag_.Set(call_.call(),
661 reactor_->OnReadDone(ok);
665 read_ops_.set_core_cq_tag(&read_tag_);
668 ~ServerCallbackReaderImpl() {}
670 ResponseType* response() {
return &resp_; }
673 if (--callbacks_outstanding_ == 0) {
676 auto call_requester = std::move(call_requester_);
677 this->~ServerCallbackReaderImpl();
695 std::function<void()> call_requester_;
697 std::atomic_int callbacks_outstanding_{
702 template <
class RequestType,
class ResponseType>
709 : func_(
std::move(func)) {}
721 if (reactor ==
nullptr) {
727 param.call->call(),
sizeof(ServerCallbackWriterImpl)))
728 ServerCallbackWriterImpl(param.server_context, param.call,
729 static_cast<RequestType*>(param.request),
730 std::move(param.call_requester), reactor);
731 writer->BindReactor(reactor);
732 reactor->
OnStarted(param.server_context, writer->request());
741 call,
sizeof(RequestType))) RequestType();
747 request->~RequestType();
752 std::function<experimental::ServerWriteReactor<RequestType, ResponseType>*()>
755 class ServerCallbackWriterImpl
758 void Finish(
Status s)
override {
759 finish_tag_.Set(call_.call(), [
this](bool) { MaybeDone(); },
761 finish_ops_.set_core_cq_tag(&finish_tag_);
763 if (!ctx_->sent_initial_metadata_) {
765 ctx_->initial_metadata_flags());
766 if (ctx_->compression_level_set()) {
767 finish_ops_.set_compression_level(ctx_->compression_level());
769 ctx_->sent_initial_metadata_ =
true;
771 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
772 call_.PerformOps(&finish_ops_);
775 void SendInitialMetadata()
override {
777 callbacks_outstanding_++;
778 meta_tag_.Set(call_.call(),
780 reactor_->OnSendInitialMetadataDone(ok);
784 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
785 ctx_->initial_metadata_flags());
786 if (ctx_->compression_level_set()) {
787 meta_ops_.set_compression_level(ctx_->compression_level());
789 ctx_->sent_initial_metadata_ =
true;
790 meta_ops_.set_core_cq_tag(&meta_tag_);
791 call_.PerformOps(&meta_ops_);
794 void Write(
const ResponseType* resp,
WriteOptions options)
override {
795 callbacks_outstanding_++;
799 if (!ctx_->sent_initial_metadata_) {
800 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
801 ctx_->initial_metadata_flags());
802 if (ctx_->compression_level_set()) {
803 write_ops_.set_compression_level(ctx_->compression_level());
805 ctx_->sent_initial_metadata_ =
true;
809 call_.PerformOps(&write_ops_);
812 void WriteAndFinish(
const ResponseType* resp,
WriteOptions options,
820 Finish(std::move(s));
826 ServerCallbackWriterImpl(
828 std::function<
void()> call_requester,
833 call_requester_(std::move(call_requester)),
835 ctx_->BeginCompletionOp(call, [
this](
bool) { MaybeDone(); }, reactor);
836 write_tag_.Set(call_.call(),
838 reactor_->OnWriteDone(ok);
842 write_ops_.set_core_cq_tag(&write_tag_);
844 ~ServerCallbackWriterImpl() { req_->~RequestType(); }
846 const RequestType* request() {
return req_; }
849 if (--callbacks_outstanding_ == 0) {
852 auto call_requester = std::move(call_requester_);
853 this->~ServerCallbackWriterImpl();
870 const RequestType* req_;
871 std::function<void()> call_requester_;
873 std::atomic_int callbacks_outstanding_{
878 template <
class RequestType,
class ResponseType>
885 : func_(
std::move(func)) {}
896 if (reactor ==
nullptr) {
902 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
903 ServerCallbackReaderWriterImpl(param.server_context, param.call,
904 std::move(param.call_requester),
907 stream->BindReactor(reactor);
908 reactor->
OnStarted(param.server_context);
913 std::function<experimental::ServerBidiReactor<RequestType, ResponseType>*()>
916 class ServerCallbackReaderWriterImpl
920 void Finish(
Status s)
override {
921 finish_tag_.Set(call_.call(), [
this](bool) { MaybeDone(); },
923 finish_ops_.set_core_cq_tag(&finish_tag_);
925 if (!ctx_->sent_initial_metadata_) {
927 ctx_->initial_metadata_flags());
928 if (ctx_->compression_level_set()) {
929 finish_ops_.set_compression_level(ctx_->compression_level());
931 ctx_->sent_initial_metadata_ =
true;
933 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
934 call_.PerformOps(&finish_ops_);
937 void SendInitialMetadata()
override {
939 callbacks_outstanding_++;
940 meta_tag_.Set(call_.call(),
942 reactor_->OnSendInitialMetadataDone(ok);
946 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
947 ctx_->initial_metadata_flags());
948 if (ctx_->compression_level_set()) {
949 meta_ops_.set_compression_level(ctx_->compression_level());
951 ctx_->sent_initial_metadata_ =
true;
952 meta_ops_.set_core_cq_tag(&meta_tag_);
953 call_.PerformOps(&meta_ops_);
956 void Write(
const ResponseType* resp,
WriteOptions options)
override {
957 callbacks_outstanding_++;
961 if (!ctx_->sent_initial_metadata_) {
962 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
963 ctx_->initial_metadata_flags());
964 if (ctx_->compression_level_set()) {
965 write_ops_.set_compression_level(ctx_->compression_level());
967 ctx_->sent_initial_metadata_ =
true;
971 call_.PerformOps(&write_ops_);
974 void WriteAndFinish(
const ResponseType* resp,
WriteOptions options,
981 Finish(std::move(s));
984 void Read(RequestType* req)
override {
985 callbacks_outstanding_++;
986 read_ops_.RecvMessage(req);
987 call_.PerformOps(&read_ops_);
993 ServerCallbackReaderWriterImpl(
998 call_requester_(std::move(call_requester)),
1000 ctx_->BeginCompletionOp(call, [
this](
bool) { MaybeDone(); }, reactor);
1001 write_tag_.Set(call_.call(),
1003 reactor_->OnWriteDone(ok);
1007 write_ops_.set_core_cq_tag(&write_tag_);
1008 read_tag_.Set(call_.call(),
1010 reactor_->OnReadDone(ok);
1014 read_ops_.set_core_cq_tag(&read_tag_);
1016 ~ServerCallbackReaderWriterImpl() {}
1019 if (--callbacks_outstanding_ == 0) {
1022 auto call_requester = std::move(call_requester_);
1023 this->~ServerCallbackReaderWriterImpl();
1042 std::function<void()> call_requester_;
1044 std::atomic_int callbacks_outstanding_{
1053 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
void OnDone() override
Definition: server_callback.h:392
virtual void OnCancel()=0
virtual void OnStarted(ServerContext *context, Response *resp)
Similar to ServerBidiReactor::OnStarted, except that this also provides the response object that the ...
Definition: server_callback.h:318
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:126
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:144
virtual void grpc_call_ref(grpc_call *call)=0
Definition: server_callback.h:399
void StartWriteAndFinish(const Response *resp, WriteOptions options, Status s)
Initiate a write operation with specified options and final RPC Status, which also causes any trailin...
Definition: server_callback.h:230
void BindReactor(ServerBidiReactor< Request, Response > *reactor)
Definition: server_callback.h:165
virtual void OnSendInitialMetadataDone(bool ok)
Notifies the application that an explicit StartSendInitialMetadata operation completed.
Definition: server_callback.h:267
Definition: server_callback.h:879
void StartWrite(const Response *resp, WriteOptions options)
Initiate a write operation with specified options.
Definition: server_callback.h:213
virtual ~ServerCallbackWriter()
Definition: server_callback.h:129
Definition: server_callback.h:379
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:165
virtual void OnReadDone(bool ok)
Definition: server_callback.h:322
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:828
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback.h:710
void StartRead(Request *req)
Definition: server_callback.h:309
virtual void OnSendInitialMetadataDone(bool ok)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:362
virtual void grpc_call_unref(grpc_call *call)=0
virtual void SendInitialMetadata(std::function< void(bool)>)=0
Definition: async_unary_call.h:304
void StartWriteAndFinish(const Response *resp, WriteOptions options, Status s)
Definition: server_callback.h:345
void OnDone() override
Definition: server_callback.h:364
Definition: grpc_types.h:40
virtual void OnWriteDone(bool ok)
Notifies the application that a StartWrite (or StartWriteLast) operation completed.
Definition: server_callback.h:280
grpc_call * call() const
Definition: call.h:70
Definition: server_callback.h:149
void StartWrite(const Response *resp)
Initiate a write operation.
Definition: server_callback.h:205
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:53
void BindReactor(ServerWriteReactor< Request, Response > *reactor)
Definition: server_callback.h:143
void StartSendInitialMetadata()
Do NOT call any operation initiation method (names that start with Start) until after the library has...
Definition: server_callback.h:192
::google::protobuf::util::Status Status
Definition: config_protobuf.h:93
void OnCancel() override
Notifies the application that this RPC has been cancelled.
Definition: server_callback.h:290
CallbackBidiHandler(std::function< experimental::ServerBidiReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:881
virtual void OnStarted(ServerContext *context, const Request *req)
Similar to ServerBidiReactor::OnStarted, except that this also provides the request object sent by th...
Definition: server_callback.h:359
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback.h:886
void Finish(Status s)
Definition: server_callback.h:310
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
virtual void OnStarted(ServerContext *context)
Notify the application that a streaming RPC has started and that it is now ok to call any operation i...
Definition: server_callback.h:259
void StartWriteLast(const Response *resp, WriteOptions options)
Definition: server_callback.h:349
Definition: call_op_set.h:636
CallbackServerStreamingHandler(std::function< experimental::ServerWriteReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:705
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status) final
Definition: server_callback.h:736
void StartWrite(const Response *resp, WriteOptions options)
Definition: server_callback.h:342
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:143
void OnDone() override
Definition: server_callback.h:382
void StartRead(Request *req)
Initiate a read operation.
Definition: server_callback.h:198
Definition: call_op_set.h:294
Definition: server_callback.h:40
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void Finish(Status s)
Indicate that the stream is to be finished and the trailing metadata and RPC status are to be sent...
Definition: server_callback.h:253
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:57
void OnCancel() override
Definition: server_callback.h:324
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:51
Definition: server_callback.h:112
ReturnType * CatchingReactorCreator(Func &&func, Args &&... args)
Definition: callback_common.h:51
Definition: rpc_service_method.h:42
CallbackClientStreamingHandler(std::function< experimental::ServerReadReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:563
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:110
Per-message write options.
Definition: call_op_set.h:86
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback.h:568
Definition: byte_buffer.h:49
virtual void SendInitialMetadata()=0
void OnCancel() override
Definition: server_callback.h:365
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
virtual void OnWriteDone(bool ok)
Definition: server_callback.h:363
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
Definition: server_callback.h:127
bool ok() const
Is the status OK?
Definition: status.h:118
Base class for running an RPC handler.
Definition: rpc_service_method.h:39
Definition: server_callback.h:561
void StartWriteLast(const Response *resp, WriteOptions options)
Inform system of a planned write operation with specified options, but allow the library to schedule ...
Definition: server_callback.h:243
void OnStarted(ServerContext *, Response *) override
Similar to ServerBidiReactor::OnStarted, except that this also provides the response object that the ...
Definition: server_callback.h:383
virtual void SendInitialMetadata()=0
Did it work? If it didn't, why?
Definition: status.h:31
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
void CatchingCallback(Func &&func, Args &&... args)
An exception-safe way of invoking a user-specified callback function.
Definition: callback_common.h:38
Definition: server_callback.h:61
void StartSendInitialMetadata()
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:308
virtual void OnSendInitialMetadataDone(bool ok)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:321
ServerReadReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:55
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:190
void OnStarted(ServerContext *) override
Notify the application that a streaming RPC has started and that it is now ok to call any operation i...
Definition: server_callback.h:403
virtual ~ServerCallbackReader()
Definition: server_callback.h:114
Definition: byte_buffer.h:51
void OnDone() override
Notifies the application that all operations associated with this RPC have completed.
Definition: server_callback.h:285
void StartWrite(const Response *resp)
Definition: server_callback.h:341
void StartSendInitialMetadata()
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:340
void Finish(Status s)
Definition: server_callback.h:352
void BindReactor(ServerReadReactor< Request, Response > *reactor)
Definition: server_callback.h:121
void OnDone() override
Definition: server_callback.h:323
virtual void OnReadDone(bool ok)
Notifies the application that a StartRead operation completed.
Definition: server_callback.h:273
A sequence of bytes.
Definition: byte_buffer.h:64
void OnDone() override
Notifies the application that all operations associated with this RPC have completed.
Definition: server_callback.h:402
virtual void WriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:134
virtual void SendInitialMetadata()=0
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback.h:416
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status) final
Definition: server_callback.h:437
Straightforward wrapping of the C call object.
Definition: call.h:36
virtual void WriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:157
virtual ~ServerReactor()=default
void OnStarted(ServerContext *, const Request *) override
Similar to ServerBidiReactor::OnStarted, except that this also provides the request object sent by th...
Definition: server_callback.h:393
Definition: server_callback.h:389
virtual ~ServerCallbackReaderWriter()
Definition: server_callback.h:151
CallbackUnaryHandler(std::function< void(ServerContext *, const RequestType *, ResponseType *, experimental::ServerCallbackRpcController *)> func)
Definition: server_callback.h:411