GRPC C++  1.20.0
server.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_SERVER_H
20 #define GRPCPP_SERVER_H
21 
22 #include <condition_variable>
23 #include <list>
24 #include <memory>
25 #include <mutex>
26 #include <vector>
27 
28 #include <grpc/compression.h>
29 #include <grpc/support/atm.h>
31 #include <grpcpp/impl/call.h>
38 #include <grpcpp/support/config.h>
39 #include <grpcpp/support/status.h>
40 
41 struct grpc_server;
42 
43 namespace grpc {
44 
45 class AsyncGenericService;
46 class HealthCheckServiceInterface;
47 class ServerContext;
48 class ServerInitializer;
49 
54 class Server : public ServerInterface, private GrpcLibraryCodegen {
55  public:
56  ~Server();
57 
62  void Wait() override;
63 
71  public:
72  virtual ~GlobalCallbacks() {}
74  virtual void UpdateArguments(ChannelArguments* args) {}
76  virtual void PreSynchronousRequest(ServerContext* context) = 0;
78  virtual void PostSynchronousRequest(ServerContext* context) = 0;
80  virtual void PreServerStart(Server* server) {}
82  virtual void AddPort(Server* server, const grpc::string& addr,
83  ServerCredentials* creds, int port) {}
84  };
90  static void SetGlobalCallbacks(GlobalCallbacks* callbacks);
91 
95 
98  return health_check_service_.get();
99  }
100 
102  std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args);
103 
108  public:
109  explicit experimental_type(Server* server) : server_(server) {}
110 
113  std::shared_ptr<Channel> InProcessChannelWithInterceptors(
114  const ChannelArguments& args,
115  std::vector<
116  std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
117  interceptor_creators);
118 
119  private:
120  Server* server_;
121  };
122 
127 
128  protected:
131  bool RegisterService(const grpc::string* host, Service* service) override;
132 
146  int AddListeningPort(const grpc::string& addr,
147  ServerCredentials* creds) override;
148 
174  Server(int max_message_size, ChannelArguments* args,
175  std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
176  sync_server_cqs,
177  int min_pollers, int max_pollers, int sync_cq_timeout_msec,
178  grpc_resource_quota* server_rq = nullptr,
179  std::vector<
180  std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
181  interceptor_creators = std::vector<std::unique_ptr<
183 
190  void Start(ServerCompletionQueue** cqs, size_t num_cqs) override;
191 
192  grpc_server* server() override { return server_; }
193 
194  private:
195  std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>*
196  interceptor_creators() override {
197  return &interceptor_creators_;
198  }
199 
200  friend class AsyncGenericService;
201  friend class ServerBuilder;
202  friend class ServerInitializer;
203 
204  class SyncRequest;
205  class CallbackRequestBase;
206  template <class ServerContextType>
207  class CallbackRequest;
208  class UnimplementedAsyncRequest;
209  class UnimplementedAsyncResponse;
210 
215  class SyncRequestThreadManager;
216 
219  void RegisterAsyncGenericService(AsyncGenericService* service) override;
220 
225  class experimental_registration_type final
227  public:
228  explicit experimental_registration_type(Server* server) : server_(server) {}
229  void RegisterCallbackGenericService(
230  experimental::CallbackGenericService* service) override {
231  server_->RegisterCallbackGenericService(service);
232  }
233 
234  private:
235  Server* server_;
236  };
237 
239  void RegisterCallbackGenericService(
241 
245  experimental_registration_interface* experimental_registration() override {
246  return &experimental_registration_;
247  }
248 
249  void PerformOpsOnCall(internal::CallOpSetInterface* ops,
250  internal::Call* call) override;
251 
252  void ShutdownInternal(gpr_timespec deadline) override;
253 
254  int max_receive_message_size() const override {
255  return max_receive_message_size_;
256  }
257 
258  CompletionQueue* CallbackCQ() override;
259 
260  ServerInitializer* initializer();
261 
262  // A vector of interceptor factory objects.
263  // This should be destroyed after health_check_service_ and this requirement
264  // is satisfied by declaring interceptor_creators_ before
265  // health_check_service_. (C++ mandates that member objects be destroyed in
266  // the reverse order of initialization.)
267  std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
268  interceptor_creators_;
269 
270  const int max_receive_message_size_;
271 
275  std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
276  sync_server_cqs_;
277 
280  std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_;
281 
282  // Outstanding unmatched callback requests, indexed by method.
283  // NOTE: Using a gpr_atm rather than atomic_int because atomic_int isn't
284  // copyable or movable and thus will cause compilation errors. We
285  // actually only want to extend the vector before the threaded use
286  // starts, but this is still a limitation.
287  std::vector<gpr_atm> callback_unmatched_reqs_count_;
288 
289  // List of callback requests to start when server actually starts.
290  std::list<CallbackRequestBase*> callback_reqs_to_start_;
291 
292  // For registering experimental callback generic service; remove when that
293  // method longer experimental
294  experimental_registration_type experimental_registration_{this};
295 
296  // Server status
297  std::mutex mu_;
298  bool started_;
299  bool shutdown_;
300  bool shutdown_notified_; // Was notify called on the shutdown_cv_
301 
302  std::condition_variable shutdown_cv_;
303 
304  // It is ok (but not required) to nest callback_reqs_mu_ under mu_ .
305  // Incrementing callback_reqs_outstanding_ is ok without a lock but it must be
306  // decremented under the lock in case it is the last request and enables the
307  // server shutdown. The increment is performance-critical since it happens
308  // during periods of increasing load; the decrement happens only when memory
309  // is maxed out, during server shutdown, or (possibly in a future version)
310  // during decreasing load, so it is less performance-critical.
311  std::mutex callback_reqs_mu_;
312  std::condition_variable callback_reqs_done_cv_;
313  std::atomic_int callback_reqs_outstanding_{0};
314 
315  std::shared_ptr<GlobalCallbacks> global_callbacks_;
316 
317  std::vector<grpc::string> services_;
318  bool has_async_generic_service_{false};
319  bool has_callback_generic_service_{false};
320 
321  // Pointer to the wrapped grpc_server.
322  grpc_server* server_;
323 
324  std::unique_ptr<ServerInitializer> server_initializer_;
325 
326  std::unique_ptr<HealthCheckServiceInterface> health_check_service_;
327  bool health_check_service_disabled_;
328 
329  // When appropriate, use a default callback generic service to handle
330  // unimplemented methods
331  std::unique_ptr<experimental::CallbackGenericService> unimplemented_service_;
332 
333  // A special handler for resource exhausted in sync case
334  std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_;
335 
336  // Handler for callback generic service, if any
337  std::unique_ptr<internal::MethodHandler> generic_handler_;
338 
339  // callback_cq_ references the callbackable completion queue associated
340  // with this server (if any). It is set on the first call to CallbackCQ().
341  // It is _not owned_ by the server; ownership belongs with its internal
342  // shutdown callback tag (invoked when the CQ is fully shutdown).
343  // It is protected by mu_
344  CompletionQueue* callback_cq_ = nullptr;
345 };
346 
347 } // namespace grpc
348 
349 #endif // GRPCPP_SERVER_H
bool RegisterService(const grpc::string *host, Service *service) override
Register a service.
std::string string
Definition: config.h:35
NOTE: class experimental_registration_interface is not part of the public API of this class TODO(vjpa...
Definition: server_interface.h:125
void Start(ServerCompletionQueue **cqs, size_t num_cqs) override
Start the server.
void Wait() override
Block until the server shuts down.
virtual void AddPort(Server *server, const grpc::string &addr, ServerCredentials *creds, int port)
Called after a server port is added.
Definition: server.h:82
Options for channel creation.
Definition: channel_arguments.h:39
Desriptor of an RPC service and its various RPC methods.
Definition: service_type.h:58
Definition: server_initializer.h:32
struct grpc_server grpc_server
A server listens to some port and responds to request calls.
Definition: grpc_types.h:65
virtual ~GlobalCallbacks()
Definition: server.h:72
Definition: async_generic_service.h:72
static void SetGlobalCallbacks(GlobalCallbacks *callbacks)
Set the global callback object.
Classes that require gRPC to be initialized should inherit from this class.
Definition: grpc_library.h:38
grpc_server * server() override
Definition: server.h:192
The gRPC server uses this interface to expose the health checking service without depending on protob...
Definition: health_check_service_interface.h:31
virtual void PreSynchronousRequest(ServerContext *context)=0
Called before application callback for each synchronous server request.
virtual void PreServerStart(Server *server)
Called before server is started.
Definition: server.h:80
int AddListeningPort(const grpc::string &addr, ServerCredentials *creds) override
Try binding the server to the given addr endpoint (port, and optionally including IP address to bind ...
Wrapper around grpc_server_credentials, a way to authenticate a server.
Definition: server_credentials.h:35
Represents a gRPC server.
Definition: server.h:54
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
NOTE: class experimental_type is not part of the public API of this class.
Definition: server.h:107
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:110
Definition: server_interface.h:54
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue.h:97
CallbackGenericService is the base class for generic services implemented using the callback API and ...
Definition: async_generic_service.h:125
experimental_type(Server *server)
Definition: server.h:109
An abstract collection of call ops, used to generate the grpc_call_op structure to pass down to the l...
Definition: call_op_set_interface.h:34
experimental_type experimental()
NOTE: The function experimental() is not stable public API.
Definition: server.h:126
Global callbacks are a set of hooks that are called when server events occur.
Definition: server.h:70
struct grpc_resource_quota grpc_resource_quota
Definition: grpc_types.h:649
Analogous to struct timespec.
Definition: gpr_types.h:47
grpc_server * c_server()
Returns a raw pointer to the underlying grpc_server instance.
virtual void UpdateArguments(ChannelArguments *args)
Called before server is created.
Definition: server.h:74
A specific type of completion queue used by the processing of notifications by servers.
Definition: completion_queue.h:384
virtual void PostSynchronousRequest(ServerContext *context)=0
Called after application callback for each synchronous server request.
A builder class for the creation and startup of grpc::Server instances.
Definition: server_builder.h:57
std::shared_ptr< Channel > InProcessChannel(const ChannelArguments &args)
Establish a channel for in-process communication.
Server(int max_message_size, ChannelArguments *args, std::shared_ptr< std::vector< std::unique_ptr< ServerCompletionQueue >>> sync_server_cqs, int min_pollers, int max_pollers, int sync_cq_timeout_msec, grpc_resource_quota *server_rq=nullptr, std::vector< std::unique_ptr< experimental::ServerInterceptorFactoryInterface >> interceptor_creators=std::vector< std::unique_ptr< experimental::ServerInterceptorFactoryInterface >>())
NOTE: This is NOT a public API.
Straightforward wrapping of the C call object.
Definition: call.h:36
HealthCheckServiceInterface * GetHealthCheckService() const
Returns the health check service.
Definition: server.h:97