GRPC C++  1.36.1
completion_queue.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
32 #ifndef GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H
33 #define GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H
34 
35 #include <list>
36 
37 #include <grpc/impl/codegen/atm.h>
45 
47 
48 namespace grpc {
49 template <class R>
50 class ClientReader;
51 template <class W>
52 class ClientWriter;
53 template <class W, class R>
54 class ClientReaderWriter;
55 template <class R>
57 template <class W>
59 namespace internal {
60 template <class W, class R>
62 
63 template <class ResponseType>
65  const ::grpc::internal::MethodHandler::HandlerParameter&, ResponseType*,
66  ::grpc::Status&);
67 template <class ServiceType, class RequestType, class ResponseType,
68  class BaseRequestType, class BaseResponseType>
70 template <class ServiceType, class RequestType, class ResponseType>
72 template <class ServiceType, class RequestType, class ResponseType>
74 template <class Streamer, bool WriteNeeded>
76 template <::grpc::StatusCode code>
77 class ErrorMethodHandler;
78 } // namespace internal
79 
80 class Channel;
81 class ChannelInterface;
82 class Server;
83 class ServerBuilder;
84 class ServerContextBase;
85 class ServerInterface;
86 
87 namespace internal {
88 class CompletionQueueTag;
89 class RpcMethod;
90 template <class InputMessage, class OutputMessage>
92 template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
93 class CallOpSet;
94 } // namespace internal
95 
97 
103  public:
109  nullptr}) {}
110 
114  explicit CompletionQueue(grpc_completion_queue* take);
115 
117  ~CompletionQueue() override {
119  }
120 
122  enum NextStatus {
125  TIMEOUT
127  };
128 
177  bool Next(void** tag, bool* ok) {
178  return (AsyncNextInternal(tag, ok,
181  }
182 
194  template <typename T>
195  NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) {
196  ::grpc::TimePoint<T> deadline_tp(deadline);
197  return AsyncNextInternal(tag, ok, deadline_tp.raw_time());
198  }
199 
214  template <typename T, typename F>
215  NextStatus DoThenAsyncNext(F&& f, void** tag, bool* ok, const T& deadline) {
216  CompletionQueueTLSCache cache = CompletionQueueTLSCache(this);
217  f();
218  if (cache.Flush(tag, ok)) {
219  return GOT_EVENT;
220  } else {
221  return AsyncNext(tag, ok, deadline);
222  }
223  }
224 
235  void Shutdown();
236 
242  grpc_completion_queue* cq() { return cq_; }
243 
244  protected:
246  explicit CompletionQueue(const grpc_completion_queue_attributes& attributes) {
249  &attributes),
250  &attributes, nullptr);
251  InitialAvalanching(); // reserve this for the future shutdown
252  }
253 
254  private:
255  // Friends for access to server registration lists that enable checking and
256  // logging on shutdown
257  friend class ::grpc::ServerBuilder;
258  friend class ::grpc::Server;
259 
260  // Friend synchronous wrappers so that they can access Pluck(), which is
261  // a semi-private API geared towards the synchronous implementation.
262  template <class R>
263  friend class ::grpc::ClientReader;
264  template <class W>
265  friend class ::grpc::ClientWriter;
266  template <class W, class R>
267  friend class ::grpc::ClientReaderWriter;
268  template <class R>
269  friend class ::grpc::ServerReader;
270  template <class W>
271  friend class ::grpc::ServerWriter;
272  template <class W, class R>
273  friend class ::grpc::internal::ServerReaderWriterBody;
274  template <class ResponseType>
276  const ::grpc::internal::MethodHandler::HandlerParameter&, ResponseType*,
277  ::grpc::Status&);
278  template <class ServiceType, class RequestType, class ResponseType>
279  friend class ::grpc::internal::ClientStreamingHandler;
280  template <class ServiceType, class RequestType, class ResponseType>
281  friend class ::grpc::internal::ServerStreamingHandler;
282  template <class Streamer, bool WriteNeeded>
283  friend class ::grpc::internal::TemplatedBidiStreamingHandler;
284  template <::grpc::StatusCode code>
285  friend class ::grpc::internal::ErrorMethodHandler;
287  friend class ::grpc::ServerInterface;
288  template <class InputMessage, class OutputMessage>
289  friend class ::grpc::internal::BlockingUnaryCallImpl;
290 
291  // Friends that need access to constructor for callback CQ
292  friend class ::grpc::Channel;
293 
294  // For access to Register/CompleteAvalanching
295  template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
296  friend class ::grpc::internal::CallOpSet;
297 
302  class CompletionQueueTLSCache {
303  public:
304  explicit CompletionQueueTLSCache(CompletionQueue* cq);
305  ~CompletionQueueTLSCache();
306  bool Flush(void** tag, bool* ok);
307 
308  private:
309  CompletionQueue* cq_;
310  bool flushed_;
311  };
312 
313  NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
314 
317  bool Pluck(::grpc::internal::CompletionQueueTag* tag) {
318  auto deadline =
320  while (true) {
322  cq_, tag, deadline, nullptr);
323  bool ok = ev.success != 0;
324  void* ignored = tag;
325  if (tag->FinalizeResult(&ignored, &ok)) {
326  GPR_CODEGEN_ASSERT(ignored == tag);
327  return ok;
328  }
329  }
330  }
331 
340  void TryPluck(::grpc::internal::CompletionQueueTag* tag) {
341  auto deadline =
344  cq_, tag, deadline, nullptr);
345  if (ev.type == GRPC_QUEUE_TIMEOUT) return;
346  bool ok = ev.success != 0;
347  void* ignored = tag;
348  // the tag must be swallowed if using TryPluck
349  GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok));
350  }
351 
357  void TryPluck(::grpc::internal::CompletionQueueTag* tag,
358  gpr_timespec deadline) {
360  cq_, tag, deadline, nullptr);
361  if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) {
362  return;
363  }
364 
365  bool ok = ev.success != 0;
366  void* ignored = tag;
367  GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok));
368  }
369 
376  void InitialAvalanching() {
377  gpr_atm_rel_store(&avalanches_in_flight_, static_cast<gpr_atm>(1));
378  }
379  void RegisterAvalanching() {
380  gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
381  static_cast<gpr_atm>(1));
382  }
383  void CompleteAvalanching() {
384  if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
385  static_cast<gpr_atm>(-1)) == 1) {
387  }
388  }
389 
390  void RegisterServer(const ::grpc::Server* server) {
391  (void)server;
392 #ifndef NDEBUG
393  grpc::internal::MutexLock l(&server_list_mutex_);
394  server_list_.push_back(server);
395 #endif
396  }
397  void UnregisterServer(const ::grpc::Server* server) {
398  (void)server;
399 #ifndef NDEBUG
400  grpc::internal::MutexLock l(&server_list_mutex_);
401  server_list_.remove(server);
402 #endif
403  }
404  bool ServerListEmpty() const {
405 #ifndef NDEBUG
406  grpc::internal::MutexLock l(&server_list_mutex_);
407  return server_list_.empty();
408 #endif
409  return true;
410  }
411 
412  grpc_completion_queue* cq_; // owned
413 
414  gpr_atm avalanches_in_flight_;
415 
416  // List of servers associated with this CQ. Even though this is only used with
417  // NDEBUG, instantiate it in all cases since otherwise the size will be
418  // inconsistent.
419  mutable grpc::internal::Mutex server_list_mutex_;
420  std::list<const ::grpc::Server*>
421  server_list_ /* GUARDED_BY(server_list_mutex_) */;
422 };
423 
427  public:
428  bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; }
429 
430  protected:
433 
434  private:
442  grpc_cq_polling_type polling_type,
445  GRPC_CQ_CURRENT_VERSION, completion_type, polling_type,
446  shutdown_cb}),
447  polling_type_(polling_type) {}
448 
449  grpc_cq_polling_type polling_type_;
450  friend class ::grpc::ServerBuilder;
451  friend class ::grpc::Server;
452 };
453 
454 } // namespace grpc
455 
456 #endif // GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H
grpc::GrpcLibraryCodegen
Classes that require gRPC to be initialized should inherit from this class.
Definition: grpc_library.h:38
atm.h
grpc::CompletionQueue::Shutdown
void Shutdown()
Request the shutdown of the queue.
grpc::CoreCodegenInterface::gpr_time_0
virtual gpr_timespec gpr_time_0(gpr_clock_type type)=0
grpc::CoreCodegenInterface::grpc_completion_queue_create
virtual grpc_completion_queue * grpc_completion_queue_create(const grpc_completion_queue_factory *factory, const grpc_completion_queue_attributes *attributes, void *reserved)=0
GRPC_CQ_NEXT
@ GRPC_CQ_NEXT
Events are popped out by calling grpc_completion_queue_next() API ONLY.
Definition: grpc_types.h:729
grpc::Server
Represents a gRPC server.
Definition: server.h:59
time.h
rpc_service_method.h
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:852
status.h
grpc_cq_polling_type
grpc_cq_polling_type
Completion queues internally MAY maintain a set of file descriptors in a structure called 'pollset'.
Definition: grpc_types.h:709
grpc::internal::ErrorMethodHandler
General method handler class for errors that prevent real method use e.g., handle unknown method by r...
Definition: byte_buffer.h:49
grpc::internal::RpcMethodHandler
A wrapper class of an application provided rpc method handler.
Definition: completion_queue.h:69
grpc::ServerWriter
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs,...
Definition: completion_queue.h:58
grpc_cq_completion_type
grpc_cq_completion_type
Specifies the type of APIs to use to pop events from the completion queue.
Definition: grpc_types.h:727
grpc::CompletionQueue::AsyncNext
NextStatus AsyncNext(void **tag, bool *ok, const T &deadline)
Read from the queue, blocking up to deadline (or the queue's shutdown).
Definition: completion_queue.h:195
grpc::internal::BlockingUnaryCallImpl
Definition: channel_interface.h:67
GRPC_QUEUE_SHUTDOWN
@ GRPC_QUEUE_SHUTDOWN
Shutting down.
Definition: grpc_types.h:523
grpc::ServerContextBase
Base class of ServerContext. Experimental until callback API is final.
Definition: server_context.h:131
core_codegen_interface.h
grpc::experimental::ServerContextBase
::grpc::ServerContextBase ServerContextBase
Definition: server_context.h:108
grpc::CompletionQueue::DoThenAsyncNext
NextStatus DoThenAsyncNext(F &&f, void **tag, bool *ok, const T &deadline)
EXPERIMENTAL First executes F, then reads from the queue, blocking up to deadline (or the queue's shu...
Definition: completion_queue.h:215
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
The zero time interval.
grpc::internal::ServerReaderWriterBody
Definition: completion_queue.h:61
grpc::CompletionQueue::CompletionQueue
CompletionQueue(const grpc_completion_queue_attributes &attributes)
Private constructor of CompletionQueue only visible to friend classes.
Definition: completion_queue.h:246
grpc::CoreCodegenInterface::gpr_inf_future
virtual gpr_timespec gpr_inf_future(gpr_clock_type type)=0
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:31
GRPC_CQ_DEFAULT_POLLING
@ GRPC_CQ_DEFAULT_POLLING
The completion queue will have an associated pollset and there is no restriction on the type of file ...
Definition: grpc_types.h:712
GRPC_CQ_NON_LISTENING
@ GRPC_CQ_NON_LISTENING
Similar to GRPC_CQ_DEFAULT_POLLING except that the completion queues will not contain any 'listening ...
Definition: grpc_types.h:717
grpc::ServerReader
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:56
grpc_experimental_completion_queue_functor
EXPERIMENTAL: Specifies an interface class to be used as a tag for callback-based completion queues.
Definition: grpc_types.h:743
grpc_completion_queue_factory_lookup
const GRPCAPI grpc_completion_queue_factory * grpc_completion_queue_factory_lookup(const grpc_completion_queue_attributes *attributes)
Returns the completion queue factory based on the attributes.
grpc::CompletionQueue::NextStatus
NextStatus
Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT.
Definition: completion_queue.h:122
grpc::TimePoint::raw_time
gpr_timespec raw_time()=delete
completion_queue_tag.h
gpr_atm_rel_store
#define gpr_atm_rel_store(p, value)
Definition: atm_gcc_atomic.h:52
gpr_atm_no_barrier_fetch_add
#define gpr_atm_no_barrier_fetch_add(p, delta)
Definition: atm_gcc_atomic.h:57
grpc::ChannelInterface
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
grpc::CompletionQueue::TIMEOUT
@ TIMEOUT
deadline was reached.
Definition: completion_queue.h:126
grpc::CompletionQueue::cq
grpc_completion_queue * cq()
Returns a raw pointer to the underlying grpc_completion_queue instance.
Definition: completion_queue.h:242
sync.h
grpc_completion_queue_attributes
Definition: grpc_types.h:763
grpc::internal::CompletionQueueTag
An interface allowing implementors to process and filter event tags.
Definition: completion_queue_tag.h:26
grpc::CoreCodegenInterface
Interface between the codegen library and the minimal subset of core features required by the generat...
Definition: core_codegen_interface.h:38
grpc_library.h
gpr_atm
intptr_t gpr_atm
Definition: atm_gcc_atomic.h:30
grpc::internal::ServerStreamingHandler
A wrapper class of an application provided server streaming handler.
Definition: byte_buffer.h:47
grpc::ServerCompletionQueue::IsFrequentlyPolled
bool IsFrequentlyPolled()
Definition: completion_queue.h:428
grpc::internal::MutexLock
Definition: sync.h:69
grpc::internal::ClientStreamingHandler
A wrapper class of an application provided client streaming handler.
Definition: completion_queue.h:71
grpc::CompletionQueue::GOT_EVENT
@ GOT_EVENT
Got a new event; tag will be filled in with its associated value; ok indicating its success.
Definition: completion_queue.h:124
grpc::internal::TemplatedBidiStreamingHandler
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:75
GRPC_CQ_CURRENT_VERSION
#define GRPC_CQ_CURRENT_VERSION
Definition: grpc_types.h:761
grpc::CompletionQueue::CompletionQueue
CompletionQueue()
Default constructor.
Definition: completion_queue.h:106
grpc::CompletionQueue::~CompletionQueue
~CompletionQueue() override
Destructor. Destroys the owned wrapped completion queue / instance.
Definition: completion_queue.h:117
grpc::ServerCompletionQueue
A specific type of completion queue used by the processing of notifications by servers.
Definition: completion_queue.h:426
grpc_completion_queue
struct grpc_completion_queue grpc_completion_queue
Completion Queues enable notification of the completion of asynchronous actions.
Definition: grpc_types.h:56
grpc::internal::CompletionQueueTag::FinalizeResult
virtual bool FinalizeResult(void **tag, bool *status)=0
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
grpc::CoreCodegenInterface::grpc_completion_queue_destroy
virtual void grpc_completion_queue_destroy(grpc_completion_queue *cq)=0
grpc::CompletionQueue
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:102
grpc::Channel
Channels represent a connection to an endpoint. Created by CreateChannel.
Definition: channel.h:54
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:96
GPR_CODEGEN_ASSERT
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
grpc::internal::Mutex
Definition: sync.h:47
grpc::internal::RpcMethod
Descriptor of an RPC method.
Definition: rpc_method.h:29
grpc::ServerCompletionQueue::ServerCompletionQueue
ServerCompletionQueue()
Default constructor.
Definition: completion_queue.h:432
grpc::CoreCodegenInterface::grpc_completion_queue_pluck
virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)=0
grpc::ServerBuilder
A builder class for the creation and startup of grpc::Server instances.
Definition: server_builder.h:90
grpc::ServerInterface
Definition: server_interface.h:65
grpc::internal::UnaryRunHandlerHelper
void UnaryRunHandlerHelper(const ::grpc::internal::MethodHandler::HandlerParameter &, ResponseType *, ::grpc::Status &)
gpr_timespec
Analogous to struct timespec.
Definition: gpr_types.h:47
grpc::CoreCodegenInterface::grpc_completion_queue_shutdown
virtual void grpc_completion_queue_shutdown(grpc_completion_queue *cq)=0
grpc::CompletionQueue::SHUTDOWN
@ SHUTDOWN
The completion queue has been shutdown and fully-drained.
Definition: completion_queue.h:123
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Realtime clock.
Definition: gpr_types.h:36
grpc_event::success
int success
If the grpc_completion_type is GRPC_OP_COMPLETE, this field indicates whether the operation was succe...
Definition: grpc_types.h:541
GRPC_QUEUE_TIMEOUT
@ GRPC_QUEUE_TIMEOUT
No event before timeout.
Definition: grpc_types.h:525
grpc::TimePoint
If you are trying to use CompletionQueue::AsyncNext with a time class that isn't either gpr_timespec ...
Definition: time.h:40
grpc::CompletionQueue::Next
bool Next(void **tag, bool *ok)
Read from the queue, blocking until an event is available or the queue is shutting down.
Definition: completion_queue.h:177