GRPC C++  1.17.0
completion_queue.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
32 #ifndef GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H
33 #define GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H
34 
35 #include <grpc/impl/codegen/atm.h>
41 
43 
44 namespace grpc {
45 
46 template <class R>
47 class ClientReader;
48 template <class W>
49 class ClientWriter;
50 template <class W, class R>
51 class ClientReaderWriter;
52 template <class R>
54 template <class W>
56 namespace internal {
57 template <class W, class R>
59 } // namespace internal
60 
61 class Channel;
62 class ChannelInterface;
63 class ClientContext;
64 class CompletionQueue;
65 class Server;
66 class ServerBuilder;
67 class ServerContext;
68 class ServerInterface;
69 
70 namespace internal {
71 class CompletionQueueTag;
72 class RpcMethod;
73 template <class ServiceType, class RequestType, class ResponseType>
74 class RpcMethodHandler;
75 template <class ServiceType, class RequestType, class ResponseType>
77 template <class ServiceType, class RequestType, class ResponseType>
79 template <class ServiceType, class RequestType, class ResponseType>
81 template <class Streamer, bool WriteNeeded>
83 template <StatusCode code>
84 class ErrorMethodHandler;
85 template <class InputMessage, class OutputMessage>
87 } // namespace internal
88 
90 
96  public:
102  nullptr}) {}
103 
107  explicit CompletionQueue(grpc_completion_queue* take);
108 
111  g_core_codegen_interface->grpc_completion_queue_destroy(cq_);
112  }
113 
115  enum NextStatus {
117  GOT_EVENT,
118  TIMEOUT
120  };
121 
170  bool Next(void** tag, bool* ok) {
171  return (AsyncNextInternal(tag, ok,
172  g_core_codegen_interface->gpr_inf_future(
173  GPR_CLOCK_REALTIME)) != SHUTDOWN);
174  }
175 
187  template <typename T>
188  NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) {
189  TimePoint<T> deadline_tp(deadline);
190  return AsyncNextInternal(tag, ok, deadline_tp.raw_time());
191  }
192 
207  template <typename T, typename F>
208  NextStatus DoThenAsyncNext(F&& f, void** tag, bool* ok, const T& deadline) {
209  CompletionQueueTLSCache cache = CompletionQueueTLSCache(this);
210  f();
211  if (cache.Flush(tag, ok)) {
212  return GOT_EVENT;
213  } else {
214  return AsyncNext(tag, ok, deadline);
215  }
216  }
217 
228  void Shutdown();
229 
235  grpc_completion_queue* cq() { return cq_; }
236 
237  protected:
240  cq_ = g_core_codegen_interface->grpc_completion_queue_create(
241  g_core_codegen_interface->grpc_completion_queue_factory_lookup(
242  &attributes),
243  &attributes, NULL);
244  InitialAvalanching(); // reserve this for the future shutdown
245  }
246 
247  private:
248  // Friend synchronous wrappers so that they can access Pluck(), which is
249  // a semi-private API geared towards the synchronous implementation.
250  template <class R>
251  friend class ::grpc::ClientReader;
252  template <class W>
253  friend class ::grpc::ClientWriter;
254  template <class W, class R>
255  friend class ::grpc::ClientReaderWriter;
256  template <class R>
257  friend class ::grpc::ServerReader;
258  template <class W>
259  friend class ::grpc::ServerWriter;
260  template <class W, class R>
261  friend class ::grpc::internal::ServerReaderWriterBody;
262  template <class ServiceType, class RequestType, class ResponseType>
263  friend class ::grpc::internal::RpcMethodHandler;
264  template <class ServiceType, class RequestType, class ResponseType>
265  friend class ::grpc::internal::ClientStreamingHandler;
266  template <class ServiceType, class RequestType, class ResponseType>
267  friend class ::grpc::internal::ServerStreamingHandler;
268  template <class Streamer, bool WriteNeeded>
269  friend class ::grpc::internal::TemplatedBidiStreamingHandler;
270  template <StatusCode code>
271  friend class ::grpc::internal::ErrorMethodHandler;
272  friend class ::grpc::Server;
273  friend class ::grpc::ServerContext;
274  friend class ::grpc::ServerInterface;
275  template <class InputMessage, class OutputMessage>
276  friend class ::grpc::internal::BlockingUnaryCallImpl;
277 
278  // Friends that need access to constructor for callback CQ
279  friend class ::grpc::Channel;
280 
285  class CompletionQueueTLSCache {
286  public:
287  CompletionQueueTLSCache(CompletionQueue* cq);
288  ~CompletionQueueTLSCache();
289  bool Flush(void** tag, bool* ok);
290 
291  private:
292  CompletionQueue* cq_;
293  bool flushed_;
294  };
295 
296  NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
297 
300  bool Pluck(internal::CompletionQueueTag* tag) {
301  auto deadline =
302  g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME);
303  while (true) {
304  auto ev = g_core_codegen_interface->grpc_completion_queue_pluck(
305  cq_, tag, deadline, nullptr);
306  bool ok = ev.success != 0;
307  void* ignored = tag;
308  if (tag->FinalizeResult(&ignored, &ok)) {
309  GPR_CODEGEN_ASSERT(ignored == tag);
310  // Ignore mutations by FinalizeResult: Pluck returns the C API status
311  return ev.success != 0;
312  }
313  }
314  }
315 
324  void TryPluck(internal::CompletionQueueTag* tag) {
325  auto deadline = g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME);
326  auto ev = g_core_codegen_interface->grpc_completion_queue_pluck(
327  cq_, tag, deadline, nullptr);
328  if (ev.type == GRPC_QUEUE_TIMEOUT) return;
329  bool ok = ev.success != 0;
330  void* ignored = tag;
331  // the tag must be swallowed if using TryPluck
332  GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok));
333  }
334 
340  void TryPluck(internal::CompletionQueueTag* tag, gpr_timespec deadline) {
341  auto ev = g_core_codegen_interface->grpc_completion_queue_pluck(
342  cq_, tag, deadline, nullptr);
343  if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) {
344  return;
345  }
346 
347  bool ok = ev.success != 0;
348  void* ignored = tag;
349  GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok));
350  }
351 
358  void InitialAvalanching() {
359  gpr_atm_rel_store(&avalanches_in_flight_, static_cast<gpr_atm>(1));
360  }
361  void RegisterAvalanching() {
362  gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
363  static_cast<gpr_atm>(1));
364  }
365  void CompleteAvalanching();
366 
367  grpc_completion_queue* cq_; // owned
368 
369  gpr_atm avalanches_in_flight_;
370 };
371 
375  public:
376  bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; }
377 
378  protected:
381 
382  private:
390  grpc_cq_polling_type polling_type,
393  GRPC_CQ_CURRENT_VERSION, completion_type, polling_type,
394  shutdown_cb}),
395  polling_type_(polling_type) {}
396 
397  grpc_cq_polling_type polling_type_;
398  friend class ServerBuilder;
399  friend class Server;
400 };
401 
402 } // namespace grpc
403 
404 #endif // GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H
A wrapper class of an application provided server streaming handler.
Definition: byte_buffer.h:47
virtual const grpc_completion_queue_factory * grpc_completion_queue_factory_lookup(const grpc_completion_queue_attributes *attributes)=0
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:82
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:141
virtual gpr_timespec gpr_inf_future(gpr_clock_type type)=0
Definition: completion_queue.h:58
An interface allowing implementors to process and filter event tags.
Definition: completion_queue_tag.h:26
virtual bool FinalizeResult(void **tag, bool *status)=0
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
No event before timeout.
Definition: grpc_types.h:458
int success
If the grpc_completion_type is GRPC_OP_COMPLETE, this field indicates whether the operation was succe...
Definition: grpc_types.h:474
#define GRPC_CQ_CURRENT_VERSION
Definition: grpc_types.h:683
The completion queue has been shutdown and fully-drained.
Definition: completion_queue.h:116
Events are popped out by calling grpc_completion_queue_next() API ONLY.
Definition: grpc_types.h:659
gpr_timespec raw_time()
Definition: time.h:43
bool IsFrequentlyPolled()
Definition: completion_queue.h:376
virtual void grpc_completion_queue_destroy(grpc_completion_queue *cq)=0
Classes that require gRPC to be initialized should inherit from this class.
Definition: grpc_library.h:38
If you are trying to use CompletionQueue::AsyncNext with a time class that isn&#39;t either gpr_timespec ...
Definition: time.h:40
Definition: grpc_types.h:685
grpc_completion_queue * cq()
Returns a raw pointer to the underlying grpc_completion_queue instance.
Definition: completion_queue.h:235
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:165
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:53
ServerCompletionQueue()
Default constructor.
Definition: completion_queue.h:380
bool Next(void **tag, bool *ok)
Read from the queue, blocking until an event is available or the queue is shutting down...
Definition: completion_queue.h:170
Descriptor of an RPC method.
Definition: rpc_method.h:29
NextStatus DoThenAsyncNext(F &&f, void **tag, bool *ok, const T &deadline)
EXPERIMENTAL First executes F, then reads from the queue, blocking up to deadline (or the queue&#39;s shu...
Definition: completion_queue.h:208
grpc_cq_completion_type
Specifies the type of APIs to use to pop events from the completion queue.
Definition: grpc_types.h:657
grpc_cq_polling_type
Completion queues internally MAY maintain a set of file descriptors in a structure called &#39;pollset&#39;...
Definition: grpc_types.h:639
CompletionQueue(const grpc_completion_queue_attributes &attributes)
Private constructor of CompletionQueue only visible to friend classes.
Definition: completion_queue.h:239
Shutting down.
Definition: grpc_types.h:456
#define gpr_atm_rel_store(p, value)
Definition: atm_gcc_atomic.h:52
NextStatus AsyncNext(void **tag, bool *ok, const T &deadline)
Read from the queue, blocking up to deadline (or the queue&#39;s shutdown).
Definition: completion_queue.h:188
~CompletionQueue()
Destructor. Destroys the owned wrapped completion queue / instance.
Definition: completion_queue.h:110
Represents a gRPC server.
Definition: server.h:53
An Alarm posts the user provided tag to its associated completion queue upon expiry or cancellation...
Definition: alarm.h:33
virtual grpc_completion_queue * grpc_completion_queue_create(const grpc_completion_queue_factory *factory, const grpc_completion_queue_attributes *attributes, void *reserved)=0
A wrapper class of an application provided client streaming handler.
Definition: completion_queue.h:76
#define gpr_atm_no_barrier_fetch_add(p, delta)
Definition: atm_gcc_atomic.h:57
Codegen interface for grpc::Channel.
Definition: channel_interface.h:60
A wrapper class of an application provided rpc method handler.
Definition: byte_buffer.h:45
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:50
NextStatus
Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT.
Definition: completion_queue.h:115
The completion queue will have an associated pollset and there is no restriction on the type of file ...
Definition: grpc_types.h:642
intptr_t gpr_atm
Definition: atm_gcc_atomic.h:30
Similar to GRPC_CQ_DEFAULT_POLLING except that the completion queues will not contain any &#39;listening ...
Definition: grpc_types.h:647
CompletionQueue()
Default constructor.
Definition: completion_queue.h:99
A ServerContext allows the person implementing a service handler to:
Definition: server_context.h:102
Definition: server_interface.h:50
Realtime clock.
Definition: gpr_types.h:36
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs, where the outgoing message stream coming from the server has messages of type W.
Definition: completion_queue.h:55
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue.h:95
Definition: channel_interface.h:45
Interface between the codegen library and the minimal subset of core features required by the generat...
Definition: core_codegen_interface.h:37
General method handler class for errors that prevent real method use e.g., handle unknown method by r...
Definition: byte_buffer.h:51
Analogous to struct timespec.
Definition: gpr_types.h:47
A specific type of completion queue used by the processing of notifications by servers.
Definition: completion_queue.h:374
Definition: completion_queue.h:80
struct grpc_completion_queue grpc_completion_queue
Completion Queues enable notification of the completion of asynchronous actions.
Definition: grpc_types.h:56
virtual gpr_timespec gpr_time_0(gpr_clock_type type)=0
EXPERIMENTAL: Specifies an interface class to be used as a tag for callback-based completion queues...
Definition: grpc_types.h:673
A builder class for the creation and startup of grpc::Server instances.
Definition: server_builder.h:53
virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)=0
Channels represent a connection to an endpoint. Created by CreateChannel.
Definition: channel.h:44