19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H 20 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H 24 #include <type_traits> 49 namespace experimental {
52 template <
class Request,
class Response>
54 template <
class Request,
class Response>
56 template <
class Request,
class Response>
67 virtual void Finish(
Status s) = 0;
71 virtual void SendInitialMetadata(std::function<
void(
bool)>) = 0;
77 template <
class Request>
81 virtual void Finish(
Status s) = 0;
82 virtual void SendInitialMetadata() = 0;
83 virtual void Read(Request* msg) = 0;
86 template <
class Response>
88 reactor->BindReader(
this);
92 template <
class Response>
97 virtual void Finish(
Status s) = 0;
98 virtual void SendInitialMetadata() = 0;
99 virtual void Write(
const Response* msg,
WriteOptions options) = 0;
103 Write(msg, std::move(options));
104 Finish(std::move(s));
108 template <
class Request>
110 reactor->BindWriter(
this);
114 template <
class Request,
class Response>
119 virtual void Finish(
Status s) = 0;
120 virtual void SendInitialMetadata() = 0;
121 virtual void Read(Request* msg) = 0;
122 virtual void Write(
const Response* msg,
WriteOptions options) = 0;
126 Write(msg, std::move(options));
127 Finish(std::move(s));
132 reactor->BindStream(
this);
139 template <
class Request,
class Response>
152 stream_->Write(msg, std::move(options));
156 stream_->WriteAndFinish(msg, std::move(options), std::move(s));
172 template <
class Request,
class Response>
191 template <
class Request,
class Response>
202 writer_->Write(msg, std::move(options));
206 writer_->WriteAndFinish(msg, std::move(options), std::move(s));
224 template <
class Request,
class Response>
234 template <
class Request,
class Response>
244 template <
class Request,
class Response>
254 template <
class RequestType,
class ResponseType>
258 std::function<
void(
ServerContext*,
const RequestType*, ResponseType*,
266 param.call->call(),
sizeof(ServerCallbackRpcControllerImpl)))
267 ServerCallbackRpcControllerImpl(
268 param.server_context, param.call,
269 static_cast<RequestType*>(param.request),
270 std::move(param.call_requester));
271 Status status = param.status;
276 controller->response(), controller);
279 controller->Finish(status);
288 call,
sizeof(RequestType))) RequestType();
294 request->~RequestType();
299 std::function<void(
ServerContext*,
const RequestType*, ResponseType*,
306 class ServerCallbackRpcControllerImpl
309 void Finish(
Status s)
override {
310 finish_tag_.Set(call_.call(), [
this](bool) { MaybeDone(); },
312 if (!ctx_->sent_initial_metadata_) {
314 ctx_->initial_metadata_flags());
315 if (ctx_->compression_level_set()) {
316 finish_ops_.set_compression_level(ctx_->compression_level());
318 ctx_->sent_initial_metadata_ =
true;
322 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
323 finish_ops_.SendMessagePtr(&resp_));
325 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
327 finish_ops_.set_core_cq_tag(&finish_tag_);
328 call_.PerformOps(&finish_ops_);
331 void SendInitialMetadata(std::function<
void(
bool)> f)
override {
333 callbacks_outstanding_++;
336 meta_tag_.Set(call_.call(),
342 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
343 ctx_->initial_metadata_flags());
344 if (ctx_->compression_level_set()) {
345 meta_ops_.set_compression_level(ctx_->compression_level());
347 ctx_->sent_initial_metadata_ =
true;
348 meta_ops_.set_core_cq_tag(&meta_tag_);
349 call_.PerformOps(&meta_ops_);
356 const RequestType* req,
357 std::function<
void()> call_requester)
361 call_requester_(std::move(call_requester)) {
362 ctx_->BeginCompletionOp(call, [
this](
bool) { MaybeDone(); },
nullptr);
365 ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); }
367 const RequestType* request() {
return req_; }
368 ResponseType* response() {
return &resp_; }
371 if (--callbacks_outstanding_ == 0) {
373 auto call_requester = std::move(call_requester_);
374 this->~ServerCallbackRpcControllerImpl();
389 const RequestType* req_;
391 std::function<void()> call_requester_;
392 std::atomic_int callbacks_outstanding_{
397 template <
class RequestType,
class ResponseType>
404 : func_(
std::move(func)) {}
416 if (reactor ==
nullptr) {
422 param.call->call(),
sizeof(ServerCallbackReaderImpl)))
423 ServerCallbackReaderImpl(param.server_context, param.call,
424 std::move(param.call_requester), reactor);
426 reader->BindReactor(reactor);
427 reactor->
OnStarted(param.server_context, reader->response());
432 std::function<experimental::ServerReadReactor<RequestType, ResponseType>*()>
435 class ServerCallbackReaderImpl
438 void Finish(
Status s)
override {
439 finish_tag_.Set(call_.call(), [
this](bool) { MaybeDone(); },
441 if (!ctx_->sent_initial_metadata_) {
443 ctx_->initial_metadata_flags());
444 if (ctx_->compression_level_set()) {
445 finish_ops_.set_compression_level(ctx_->compression_level());
447 ctx_->sent_initial_metadata_ =
true;
451 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
452 finish_ops_.SendMessagePtr(&resp_));
454 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
456 finish_ops_.set_core_cq_tag(&finish_tag_);
457 call_.PerformOps(&finish_ops_);
460 void SendInitialMetadata()
override {
462 callbacks_outstanding_++;
463 meta_tag_.Set(call_.call(),
465 reactor_->OnSendInitialMetadataDone(ok);
469 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
470 ctx_->initial_metadata_flags());
471 if (ctx_->compression_level_set()) {
472 meta_ops_.set_compression_level(ctx_->compression_level());
474 ctx_->sent_initial_metadata_ =
true;
475 meta_ops_.set_core_cq_tag(&meta_tag_);
476 call_.PerformOps(&meta_ops_);
479 void Read(RequestType* req)
override {
480 callbacks_outstanding_++;
481 read_ops_.RecvMessage(req);
482 call_.PerformOps(&read_ops_);
488 ServerCallbackReaderImpl(
493 call_requester_(std::move(call_requester)),
495 ctx_->BeginCompletionOp(call, [
this](
bool) { MaybeDone(); }, reactor);
496 read_tag_.Set(call_.call(),
498 reactor_->OnReadDone(ok);
502 read_ops_.set_core_cq_tag(&read_tag_);
505 ~ServerCallbackReaderImpl() {}
507 ResponseType* response() {
return &resp_; }
510 if (--callbacks_outstanding_ == 0) {
513 auto call_requester = std::move(call_requester_);
514 this->~ServerCallbackReaderImpl();
532 std::function<void()> call_requester_;
534 std::atomic_int callbacks_outstanding_{
539 template <
class RequestType,
class ResponseType>
546 : func_(
std::move(func)) {}
558 if (reactor ==
nullptr) {
564 param.call->call(),
sizeof(ServerCallbackWriterImpl)))
565 ServerCallbackWriterImpl(param.server_context, param.call,
566 static_cast<RequestType*>(param.request),
567 std::move(param.call_requester), reactor);
568 writer->BindReactor(reactor);
569 reactor->
OnStarted(param.server_context, writer->request());
578 call,
sizeof(RequestType))) RequestType();
584 request->~RequestType();
589 std::function<experimental::ServerWriteReactor<RequestType, ResponseType>*()>
592 class ServerCallbackWriterImpl
595 void Finish(
Status s)
override {
596 finish_tag_.Set(call_.call(), [
this](bool) { MaybeDone(); },
598 finish_ops_.set_core_cq_tag(&finish_tag_);
600 if (!ctx_->sent_initial_metadata_) {
602 ctx_->initial_metadata_flags());
603 if (ctx_->compression_level_set()) {
604 finish_ops_.set_compression_level(ctx_->compression_level());
606 ctx_->sent_initial_metadata_ =
true;
608 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
609 call_.PerformOps(&finish_ops_);
612 void SendInitialMetadata()
override {
614 callbacks_outstanding_++;
615 meta_tag_.Set(call_.call(),
617 reactor_->OnSendInitialMetadataDone(ok);
621 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
622 ctx_->initial_metadata_flags());
623 if (ctx_->compression_level_set()) {
624 meta_ops_.set_compression_level(ctx_->compression_level());
626 ctx_->sent_initial_metadata_ =
true;
627 meta_ops_.set_core_cq_tag(&meta_tag_);
628 call_.PerformOps(&meta_ops_);
631 void Write(
const ResponseType* resp,
WriteOptions options)
override {
632 callbacks_outstanding_++;
636 if (!ctx_->sent_initial_metadata_) {
637 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
638 ctx_->initial_metadata_flags());
639 if (ctx_->compression_level_set()) {
640 write_ops_.set_compression_level(ctx_->compression_level());
642 ctx_->sent_initial_metadata_ =
true;
646 call_.PerformOps(&write_ops_);
649 void WriteAndFinish(
const ResponseType* resp,
WriteOptions options,
657 Finish(std::move(s));
663 ServerCallbackWriterImpl(
665 std::function<
void()> call_requester,
670 call_requester_(std::move(call_requester)),
672 ctx_->BeginCompletionOp(call, [
this](
bool) { MaybeDone(); }, reactor);
673 write_tag_.Set(call_.call(),
675 reactor_->OnWriteDone(ok);
679 write_ops_.set_core_cq_tag(&write_tag_);
681 ~ServerCallbackWriterImpl() { req_->~RequestType(); }
683 const RequestType* request() {
return req_; }
686 if (--callbacks_outstanding_ == 0) {
689 auto call_requester = std::move(call_requester_);
690 this->~ServerCallbackWriterImpl();
707 const RequestType* req_;
708 std::function<void()> call_requester_;
710 std::atomic_int callbacks_outstanding_{
715 template <
class RequestType,
class ResponseType>
722 : func_(
std::move(func)) {}
733 if (reactor ==
nullptr) {
739 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
740 ServerCallbackReaderWriterImpl(param.server_context, param.call,
741 std::move(param.call_requester),
744 stream->BindReactor(reactor);
745 reactor->
OnStarted(param.server_context);
750 std::function<experimental::ServerBidiReactor<RequestType, ResponseType>*()>
753 class ServerCallbackReaderWriterImpl
757 void Finish(
Status s)
override {
758 finish_tag_.Set(call_.call(), [
this](bool) { MaybeDone(); },
760 finish_ops_.set_core_cq_tag(&finish_tag_);
762 if (!ctx_->sent_initial_metadata_) {
764 ctx_->initial_metadata_flags());
765 if (ctx_->compression_level_set()) {
766 finish_ops_.set_compression_level(ctx_->compression_level());
768 ctx_->sent_initial_metadata_ =
true;
770 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
771 call_.PerformOps(&finish_ops_);
774 void SendInitialMetadata()
override {
776 callbacks_outstanding_++;
777 meta_tag_.Set(call_.call(),
779 reactor_->OnSendInitialMetadataDone(ok);
783 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
784 ctx_->initial_metadata_flags());
785 if (ctx_->compression_level_set()) {
786 meta_ops_.set_compression_level(ctx_->compression_level());
788 ctx_->sent_initial_metadata_ =
true;
789 meta_ops_.set_core_cq_tag(&meta_tag_);
790 call_.PerformOps(&meta_ops_);
793 void Write(
const ResponseType* resp,
WriteOptions options)
override {
794 callbacks_outstanding_++;
798 if (!ctx_->sent_initial_metadata_) {
799 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
800 ctx_->initial_metadata_flags());
801 if (ctx_->compression_level_set()) {
802 write_ops_.set_compression_level(ctx_->compression_level());
804 ctx_->sent_initial_metadata_ =
true;
808 call_.PerformOps(&write_ops_);
811 void WriteAndFinish(
const ResponseType* resp,
WriteOptions options,
818 Finish(std::move(s));
821 void Read(RequestType* req)
override {
822 callbacks_outstanding_++;
823 read_ops_.RecvMessage(req);
824 call_.PerformOps(&read_ops_);
830 ServerCallbackReaderWriterImpl(
835 call_requester_(std::move(call_requester)),
837 ctx_->BeginCompletionOp(call, [
this](
bool) { MaybeDone(); }, reactor);
838 write_tag_.Set(call_.call(),
840 reactor_->OnWriteDone(ok);
844 write_ops_.set_core_cq_tag(&write_tag_);
845 read_tag_.Set(call_.call(),
847 reactor_->OnReadDone(ok);
851 read_ops_.set_core_cq_tag(&read_tag_);
853 ~ServerCallbackReaderWriterImpl() {}
856 if (--callbacks_outstanding_ == 0) {
859 auto call_requester = std::move(call_requester_);
860 this->~ServerCallbackReaderWriterImpl();
879 std::function<void()> call_requester_;
881 std::atomic_int callbacks_outstanding_{
890 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
void OnDone() override
Definition: server_callback.h:238
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:126
void StartRead(Request *msg)
Definition: server_callback.h:149
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:142
virtual void grpc_call_ref(grpc_call *call)=0
Definition: server_callback.h:245
void StartWriteLast(const Response *msg, WriteOptions options)
Definition: server_callback.h:158
void BindReactor(ServerBidiReactor< Request, Response > *reactor)
Definition: server_callback.h:131
virtual void OnSendInitialMetadataDone(bool ok)
Definition: server_callback.h:144
Definition: server_callback.h:716
virtual ~ServerCallbackWriter()
Definition: server_callback.h:95
Definition: server_callback.h:225
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:165
virtual void OnReadDone(bool ok)
Definition: server_callback.h:178
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:828
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback.h:547
virtual void OnSendInitialMetadataDone(bool ok)
Definition: server_callback.h:196
virtual void grpc_call_unref(grpc_call *call)=0
virtual void SendInitialMetadata(std::function< void(bool)>)=0
Definition: async_unary_call.h:304
void StartWrite(const Response *msg)
Definition: server_callback.h:150
Definition: grpc_types.h:40
virtual void OnStarted(ServerContext *, const Request *req)
Definition: server_callback.h:195
virtual void OnWriteDone(bool ok)
Definition: server_callback.h:146
grpc_call * call() const
Definition: call.h:70
Definition: server_callback.h:115
Definition: server_callback.h:53
void StartWrite(const Response *msg)
Definition: server_callback.h:200
void BindReactor(ServerWriteReactor< Request, Response > *reactor)
Definition: server_callback.h:109
void StartSendInitialMetadata()
Definition: server_callback.h:148
void StartWriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:204
::google::protobuf::util::Status Status
Definition: config_protobuf.h:93
CallbackBidiHandler(std::function< experimental::ServerBidiReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:718
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback.h:723
void Finish(Status s)
Definition: server_callback.h:182
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
Definition: call_op_set.h:636
CallbackServerStreamingHandler(std::function< experimental::ServerWriteReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:542
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status) final
Definition: server_callback.h:573
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:134
virtual void OnDone()
Definition: server_callback.h:43
void OnDone() override
Definition: server_callback.h:228
Definition: call_op_set.h:294
Definition: server_callback.h:40
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
void Finish(Status s)
Definition: server_callback.h:161
Definition: server_callback.h:57
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:51
void StartWriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:154
Definition: server_callback.h:78
void StartRead(Request *msg)
Definition: server_callback.h:181
ReturnType * CatchingReactorCreator(Func &&func, Args &&... args)
Definition: callback_common.h:51
Definition: rpc_service_method.h:42
CallbackClientStreamingHandler(std::function< experimental::ServerReadReactor< RequestType, ResponseType > *()> func)
Definition: server_callback.h:400
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:109
Per-message write options.
Definition: call_op_set.h:86
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback.h:405
Definition: byte_buffer.h:49
void StartWrite(const Response *msg, WriteOptions options)
Definition: server_callback.h:151
virtual void SendInitialMetadata()=0
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
virtual void OnWriteDone(bool ok)
Definition: server_callback.h:197
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:132
Definition: server_callback.h:93
bool ok() const
Is the status OK?
Definition: status.h:118
virtual void OnStarted(ServerContext *)
Definition: server_callback.h:143
Base class for running an RPC handler.
Definition: rpc_service_method.h:39
Definition: server_callback.h:398
void OnStarted(ServerContext *, Response *) override
Definition: server_callback.h:229
virtual void SendInitialMetadata()=0
Did it work? If it didn't, why?
Definition: status.h:31
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
void CatchingCallback(Func &&func, Args &&... args)
An exception-safe way of invoking a user-specified callback function.
Definition: callback_common.h:38
void StartWriteLast(const Response *msg, WriteOptions options)
Definition: server_callback.h:208
Definition: server_callback.h:61
void StartSendInitialMetadata()
Definition: server_callback.h:180
virtual void OnStarted(ServerContext *, Response *resp)
Definition: server_callback.h:176
virtual void OnSendInitialMetadataDone(bool ok)
Definition: server_callback.h:177
Definition: server_callback.h:55
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:190
void OnStarted(ServerContext *) override
Definition: server_callback.h:249
virtual ~ServerCallbackReader()
Definition: server_callback.h:80
Definition: byte_buffer.h:51
void StartSendInitialMetadata()
Definition: server_callback.h:199
void Finish(Status s)
Definition: server_callback.h:211
void BindReactor(ServerReadReactor< Request, Response > *reactor)
Definition: server_callback.h:87
virtual void OnReadDone(bool ok)
Definition: server_callback.h:145
A sequence of bytes.
Definition: byte_buffer.h:64
void OnDone() override
Definition: server_callback.h:248
virtual void WriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:100
void StartWrite(const Response *msg, WriteOptions options)
Definition: server_callback.h:201
virtual void SendInitialMetadata()=0
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback.h:262
virtual void OnCancel()
Definition: server_callback.h:44
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, Status *status) final
Definition: server_callback.h:283
Straightforward wrapping of the C call object.
Definition: call.h:36
virtual void WriteAndFinish(const Response *msg, WriteOptions options, Status s)
Definition: server_callback.h:123
virtual ~ServerReactor()=default
void OnStarted(ServerContext *, const Request *) override
Definition: server_callback.h:239
Definition: server_callback.h:235
virtual ~ServerCallbackReaderWriter()
Definition: server_callback.h:117
CallbackUnaryHandler(std::function< void(ServerContext *, const RequestType *, ResponseType *, experimental::ServerCallbackRpcController *)> func)
Definition: server_callback.h:257