GRPC C++  0.12.0.0
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
server.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015, Google Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above
13  * copyright notice, this list of conditions and the following disclaimer
14  * in the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of Google Inc. nor the names of its
17  * contributors may be used to endorse or promote products derived from
18  * this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  */
33 
34 #ifndef GRPCXX_SERVER_H
35 #define GRPCXX_SERVER_H
36 
37 #include <list>
38 #include <memory>
39 
41 #include <grpc++/impl/call.h>
43 #include <grpc++/impl/sync.h>
46 #include <grpc++/support/config.h>
47 #include <grpc++/support/status.h>
48 #include <grpc/compression.h>
49 
50 struct grpc_server;
51 
52 namespace grpc {
53 
54 class AsynchronousService;
55 class GenericServerContext;
56 class AsyncGenericService;
57 class RpcService;
58 class RpcServiceMethod;
59 class ServerAsyncStreamingInterface;
60 class ServerContext;
61 class ThreadPoolInterface;
62 
66 class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
67  public:
68  ~Server();
69 
75  template <class T>
76  void Shutdown(const T& deadline) {
77  ShutdownInternal(TimePoint<T>(deadline).raw_time());
78  }
79 
81  void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); }
82 
87  void Wait();
88 
94  public:
95  virtual ~GlobalCallbacks() {}
97  virtual void PreSynchronousRequest(ServerContext* context) = 0;
99  virtual void PostSynchronousRequest(ServerContext* context) = 0;
100  };
104  static void SetGlobalCallbacks(GlobalCallbacks* callbacks);
105 
106  private:
107  friend class AsyncGenericService;
108  friend class AsynchronousService;
109  friend class ServerBuilder;
110 
111  class SyncRequest;
112  class AsyncRequest;
113  class ShutdownRequest;
114 
121  Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
122  int max_message_size, const ChannelArguments& args);
123 
126  bool RegisterService(const grpc::string* host, RpcService* service);
127 
130  bool RegisterAsyncService(const grpc::string* host,
131  AsynchronousService* service);
132 
135  void RegisterAsyncGenericService(AsyncGenericService* service);
136 
148  int AddListeningPort(const grpc::string& addr, ServerCredentials* creds);
149 
158  bool Start(ServerCompletionQueue** cqs, size_t num_cqs);
159 
160  void HandleQueueClosed();
161 
163  void RunRpc();
164 
166  void ScheduleCallback();
167 
168  void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
169 
170  void ShutdownInternal(gpr_timespec deadline);
171 
172  class BaseAsyncRequest : public CompletionQueueTag {
173  public:
174  BaseAsyncRequest(Server* server, ServerContext* context,
176  CompletionQueue* call_cq, void* tag,
177  bool delete_on_finalize);
178  virtual ~BaseAsyncRequest();
179 
180  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
181 
182  protected:
183  Server* const server_;
184  ServerContext* const context_;
185  ServerAsyncStreamingInterface* const stream_;
186  CompletionQueue* const call_cq_;
187  void* const tag_;
188  const bool delete_on_finalize_;
189  grpc_call* call_;
190  grpc_metadata_array initial_metadata_array_;
191  };
192 
193  class RegisteredAsyncRequest : public BaseAsyncRequest {
194  public:
195  RegisteredAsyncRequest(Server* server, ServerContext* context,
197  CompletionQueue* call_cq, void* tag);
198 
199  // uses BaseAsyncRequest::FinalizeResult
200 
201  protected:
202  void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
203  ServerCompletionQueue* notification_cq);
204  };
205 
206  class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
207  public:
208  NoPayloadAsyncRequest(void* registered_method, Server* server,
209  ServerContext* context,
210  ServerAsyncStreamingInterface* stream,
211  CompletionQueue* call_cq,
212  ServerCompletionQueue* notification_cq, void* tag)
213  : RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
214  IssueRequest(registered_method, nullptr, notification_cq);
215  }
216 
217  // uses RegisteredAsyncRequest::FinalizeResult
218  };
219 
220  template <class Message>
221  class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
222  public:
223  PayloadAsyncRequest(void* registered_method, Server* server,
224  ServerContext* context,
225  ServerAsyncStreamingInterface* stream,
226  CompletionQueue* call_cq,
227  ServerCompletionQueue* notification_cq, void* tag,
228  Message* request)
229  : RegisteredAsyncRequest(server, context, stream, call_cq, tag),
230  request_(request) {
231  IssueRequest(registered_method, &payload_, notification_cq);
232  }
233 
234  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
235  bool serialization_status =
236  *status && payload_ &&
237  SerializationTraits<Message>::Deserialize(
238  payload_, request_, server_->max_message_size_).ok();
239  bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status);
240  *status = serialization_status&&* status;
241  return ret;
242  }
243 
244  private:
245  grpc_byte_buffer* payload_;
246  Message* const request_;
247  };
248 
249  class GenericAsyncRequest : public BaseAsyncRequest {
250  public:
251  GenericAsyncRequest(Server* server, GenericServerContext* context,
252  ServerAsyncStreamingInterface* stream,
253  CompletionQueue* call_cq,
254  ServerCompletionQueue* notification_cq, void* tag,
255  bool delete_on_finalize);
256 
257  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
258 
259  private:
260  grpc_call_details call_details_;
261  };
262 
263  class UnimplementedAsyncRequestContext;
264  class UnimplementedAsyncRequest;
265  class UnimplementedAsyncResponse;
266 
267  template <class Message>
268  void RequestAsyncCall(void* registered_method, ServerContext* context,
269  ServerAsyncStreamingInterface* stream,
270  CompletionQueue* call_cq,
271  ServerCompletionQueue* notification_cq, void* tag,
272  Message* message) {
273  new PayloadAsyncRequest<Message>(registered_method, this, context, stream,
274  call_cq, notification_cq, tag, message);
275  }
276 
277  void RequestAsyncCall(void* registered_method, ServerContext* context,
278  ServerAsyncStreamingInterface* stream,
279  CompletionQueue* call_cq,
280  ServerCompletionQueue* notification_cq, void* tag) {
281  new NoPayloadAsyncRequest(registered_method, this, context, stream, call_cq,
282  notification_cq, tag);
283  }
284 
285  void RequestAsyncGenericCall(GenericServerContext* context,
286  ServerAsyncStreamingInterface* stream,
287  CompletionQueue* call_cq,
288  ServerCompletionQueue* notification_cq,
289  void* tag) {
290  new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
291  tag, true);
292  }
293 
294  const int max_message_size_;
295 
296  // Completion queue.
297  CompletionQueue cq_;
298 
299  // Sever status
300  grpc::mutex mu_;
301  bool started_;
302  bool shutdown_;
303  // The number of threads which are running callbacks.
304  int num_running_cb_;
305  grpc::condition_variable callback_cv_;
306 
307  std::list<SyncRequest>* sync_methods_;
308  std::unique_ptr<RpcServiceMethod> unknown_method_;
309  bool has_generic_service_;
310 
311  // Pointer to the c grpc server.
312  grpc_server* const server_;
313 
314  ThreadPoolInterface* thread_pool_;
315  // Whether the thread pool is created and owned by the server.
316  bool thread_pool_owned_;
317 };
318 
319 } // namespace grpc
320 
321 #endif // GRPCXX_SERVER_H
An interface allowing implementors to process and filter event tags.
Definition: completion_queue.h:192
void Shutdown()
Shutdown the server, waiting for all rpc processing to finish.
Definition: server.h:81
std::string string
Definition: config.h:112
An abstract collection of call ops, used to generate the grpc_call_op structure to pass down to the l...
Definition: call.h:477
Definition: service_type.h:68
Options for channel creation.
Definition: channel_arguments.h:52
void Shutdown(const T &deadline)
Shutdown the server, blocking until all rpc processing finishes.
Definition: server.h:76
Definition: sync_no_cxx11.h:45
Definition: service_type.h:57
virtual ~GlobalCallbacks()
Definition: server.h:95
#define GRPC_FINAL
Definition: config.h:71
Definition: async_generic_service.h:59
static void SetGlobalCallbacks(GlobalCallbacks *callbacks)
Set the global callback object.
Definition: time.h:54
Definition: sync_no_cxx11.h:87
virtual void PreSynchronousRequest(ServerContext *context)=0
Called before application callback for each synchronous server request.
Definition: grpc_library.h:41
Definition: rpc_service_method.h:249
Definition: server_credentials.h:49
Models a gRPC server.
Definition: server.h:66
Definition: call.h:559
void Wait()
Block waiting for all work to complete.
Definition: server_context.h:89
A thin wrapper around grpc_completion_queue (see / src/core/surface/completion_queue.h).
Definition: completion_queue.h:81
Global Callbacks.
Definition: server.h:93
A specific type of completion queue used by the processing of notifications by servers.
Definition: completion_queue.h:204
virtual void PostSynchronousRequest(ServerContext *context)=0
Called after application callback for each synchronous server request.
#define GRPC_OVERRIDE
Definition: config.h:77
Definition: call.h:552
A builder class for the creation and startup of grpc::Server instances.
Definition: server_builder.h:57
::google::protobuf::Message Message
Definition: config_protobuf.h:60