Procházet zdrojové kódy

Merge github.com:grpc/grpc into epexinf

Craig Tiller před 8 roky
rodič
revize
3f05498663

+ 9 - 0
src/core/lib/debug/stats_data.cc

@@ -112,6 +112,9 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
     "executor_push_retries",
     "server_requested_calls",
     "server_slowpath_requests_queued",
+    "cq_ev_queue_trylock_failures",
+    "cq_ev_queue_trylock_successes",
+    "cq_ev_queue_transient_pop_failures",
 };
 const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
     "Number of client side calls created by this process",
@@ -222,6 +225,12 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
     "How many calls were requested (not necessarily received) by the server",
     "How many times was the server slow path taken (indicates too few "
     "outstanding requests)",
+    "Number of lock (trylock) acquisition failures on completion queue event "
+    "queue. High value here indicates high contention on completion queues",
+    "Number of lock (trylock) acquisition successes on completion queue event "
+    "queue.",
+    "Number of times NULL was popped out of completion queue's event queue "
+    "even though the event queue was not empty",
 };
 const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = {
     "call_initial_size",

+ 12 - 0
src/core/lib/debug/stats_data.h

@@ -118,6 +118,9 @@ typedef enum {
   GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES,
   GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS,
   GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED,
+  GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES,
+  GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES,
+  GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES,
   GRPC_STATS_COUNTER_COUNT
 } grpc_stats_counters;
 extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT];
@@ -425,6 +428,15 @@ typedef enum {
 #define GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx) \
   GRPC_STATS_INC_COUNTER((exec_ctx),                             \
                          GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED)
+#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(exec_ctx) \
+  GRPC_STATS_INC_COUNTER((exec_ctx),                          \
+                         GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES)
+#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(exec_ctx) \
+  GRPC_STATS_INC_COUNTER((exec_ctx),                           \
+                         GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES)
+#define GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(exec_ctx) \
+  GRPC_STATS_INC_COUNTER(                                           \
+      (exec_ctx), GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES)
 #define GRPC_STATS_INC_CALL_INITIAL_SIZE(exec_ctx, value) \
   grpc_stats_inc_call_initial_size((exec_ctx), (int)(value))
 void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int x);

+ 10 - 0
src/core/lib/debug/stats_data.yaml

@@ -272,4 +272,14 @@
 - counter: server_slowpath_requests_queued
   doc: How many times was the server slow path taken (indicates too few
        outstanding requests)
+# cq
+- counter: cq_ev_queue_trylock_failures
+  doc: Number of lock (trylock) acquisition failures on completion queue event
+       queue. High value here indicates high contention on completion queues
+- counter: cq_ev_queue_trylock_successes
+  doc: Number of lock (trylock) acquisition successes on completion queue event
+       queue.
+- counter: cq_ev_queue_transient_pop_failures
+  doc: Number of times NULL was popped out of completion queue's event queue
+       even though the event queue was not empty
 

+ 4 - 1
src/core/lib/debug/stats_data_bq_schema.sql

@@ -86,4 +86,7 @@ executor_wakeup_initiated_per_iteration:FLOAT,
 executor_queue_drained_per_iteration:FLOAT,
 executor_push_retries_per_iteration:FLOAT,
 server_requested_calls_per_iteration:FLOAT,
-server_slowpath_requests_queued_per_iteration:FLOAT
+server_slowpath_requests_queued_per_iteration:FLOAT,
+cq_ev_queue_trylock_failures_per_iteration:FLOAT,
+cq_ev_queue_trylock_successes_per_iteration:FLOAT,
+cq_ev_queue_transient_pop_failures_per_iteration:FLOAT

+ 14 - 1
src/core/lib/surface/completion_queue.cc

@@ -362,11 +362,24 @@ static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
 
 static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) {
   grpc_cq_completion *c = NULL;
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
   if (gpr_spinlock_trylock(&q->queue_lock)) {
-    c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue);
+    GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx);
+
+    bool is_empty = false;
+    c = (grpc_cq_completion *)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty);
     gpr_spinlock_unlock(&q->queue_lock);
+
+    if (c == NULL && !is_empty) {
+      GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx);
+    }
+  } else {
+    GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx);
   }
 
+  grpc_exec_ctx_finish(&exec_ctx);
+
   if (c) {
     gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);
   }

+ 15 - 1
src/csharp/Grpc.Core/AsyncClientStreamingCall.cs

@@ -36,7 +36,21 @@ namespace Grpc.Core
         readonly Func<Metadata> getTrailersFunc;
         readonly Action disposeAction;
 
-        internal AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+        /// <summary>
+        /// Creates a new AsyncClientStreamingCall object with the specified properties.
+        /// </summary>
+        /// <param name="requestStream">Stream of request values.</param>
+        /// <param name="responseAsync">The response of the asynchronous call.</param>
+        /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
+        /// <param name="getStatusFunc">Delegate returning the status of the call.</param>
+        /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
+        /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
+        public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream,
+                                        Task<TResponse> responseAsync,
+                                        Task<Metadata> responseHeadersAsync,
+                                        Func<Status> getStatusFunc,
+                                        Func<Metadata> getTrailersFunc,
+                                        Action disposeAction)
         {
             this.requestStream = requestStream;
             this.responseAsync = responseAsync;

+ 15 - 1
src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs

@@ -35,7 +35,21 @@ namespace Grpc.Core
         readonly Func<Metadata> getTrailersFunc;
         readonly Action disposeAction;
 
-        internal AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+        /// <summary>
+        /// Creates a new AsyncDuplexStreamingCall object with the specified properties.
+        /// </summary>
+        /// <param name="requestStream">Stream of request values.</param>
+        /// <param name="responseStream">Stream of response values.</param>
+        /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
+        /// <param name="getStatusFunc">Delegate returning the status of the call.</param>
+        /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
+        /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
+        public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream,
+                                        IAsyncStreamReader<TResponse> responseStream,
+                                        Task<Metadata> responseHeadersAsync,
+                                        Func<Status> getStatusFunc,
+                                        Func<Metadata> getTrailersFunc,
+                                        Action disposeAction)
         {
             this.requestStream = requestStream;
             this.responseStream = responseStream;

+ 13 - 1
src/csharp/Grpc.Core/AsyncServerStreamingCall.cs

@@ -33,7 +33,19 @@ namespace Grpc.Core
         readonly Func<Metadata> getTrailersFunc;
         readonly Action disposeAction;
 
-        internal AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+        /// <summary>
+        /// Creates a new AsyncDuplexStreamingCall object with the specified properties.
+        /// </summary>
+        /// <param name="responseStream">Stream of response values.</param>
+        /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
+        /// <param name="getStatusFunc">Delegate returning the status of the call.</param>
+        /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
+        /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
+        public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream,
+                                        Task<Metadata> responseHeadersAsync,
+                                        Func<Status> getStatusFunc,
+                                        Func<Metadata> getTrailersFunc,
+                                        Action disposeAction)
         {
             this.responseStream = responseStream;
             this.responseHeadersAsync = responseHeadersAsync;

+ 14 - 1
src/csharp/Grpc.Core/AsyncUnaryCall.cs

@@ -34,7 +34,20 @@ namespace Grpc.Core
         readonly Func<Metadata> getTrailersFunc;
         readonly Action disposeAction;
 
-        internal AsyncUnaryCall(Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+
+        /// <summary>
+        /// Creates a new AsyncUnaryCall object with the specified properties.
+        /// </summary>
+        /// <param name="responseAsync">The response of the asynchronous call.</param>
+        /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
+        /// <param name="getStatusFunc">Delegate returning the status of the call.</param>
+        /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
+        /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
+        public AsyncUnaryCall(Task<TResponse> responseAsync,
+                              Task<Metadata> responseHeadersAsync,
+                              Func<Status> getStatusFunc,
+                              Func<Metadata> getTrailersFunc,
+                              Action disposeAction)
         {
             this.responseAsync = responseAsync;
             this.responseHeadersAsync = responseHeadersAsync;

+ 3 - 0
tools/run_tests/performance/massage_qps_stats.py

@@ -109,6 +109,9 @@ def massage_qps_stats(scenario_result):
     stats["core_executor_push_retries"] = massage_qps_stats_helpers.counter(core_stats, "executor_push_retries")
     stats["core_server_requested_calls"] = massage_qps_stats_helpers.counter(core_stats, "server_requested_calls")
     stats["core_server_slowpath_requests_queued"] = massage_qps_stats_helpers.counter(core_stats, "server_slowpath_requests_queued")
+    stats["core_cq_ev_queue_trylock_failures"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_trylock_failures")
+    stats["core_cq_ev_queue_trylock_successes"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_trylock_successes")
+    stats["core_cq_ev_queue_transient_pop_failures"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_transient_pop_failures")
     h = massage_qps_stats_helpers.histogram(core_stats, "call_initial_size")
     stats["core_call_initial_size"] = ",".join("%f" % x for x in h.buckets)
     stats["core_call_initial_size_bkts"] = ",".join("%f" % x for x in h.boundaries)

+ 30 - 0
tools/run_tests/performance/scenario_result_schema.json

@@ -555,6 +555,21 @@
         "name": "core_server_slowpath_requests_queued", 
         "type": "INTEGER"
       }, 
+      {
+        "mode": "NULLABLE", 
+        "name": "core_cq_ev_queue_trylock_failures", 
+        "type": "INTEGER"
+      }, 
+      {
+        "mode": "NULLABLE", 
+        "name": "core_cq_ev_queue_trylock_successes", 
+        "type": "INTEGER"
+      }, 
+      {
+        "mode": "NULLABLE", 
+        "name": "core_cq_ev_queue_transient_pop_failures", 
+        "type": "INTEGER"
+      }, 
       {
         "mode": "NULLABLE", 
         "name": "core_call_initial_size", 
@@ -1352,6 +1367,21 @@
         "name": "core_server_slowpath_requests_queued", 
         "type": "INTEGER"
       }, 
+      {
+        "mode": "NULLABLE", 
+        "name": "core_cq_ev_queue_trylock_failures", 
+        "type": "INTEGER"
+      }, 
+      {
+        "mode": "NULLABLE", 
+        "name": "core_cq_ev_queue_trylock_successes", 
+        "type": "INTEGER"
+      }, 
+      {
+        "mode": "NULLABLE", 
+        "name": "core_cq_ev_queue_transient_pop_failures", 
+        "type": "INTEGER"
+      }, 
       {
         "mode": "NULLABLE", 
         "name": "core_call_initial_size",