Kaynağa Gözat

Merge github.com:grpc/grpc into grpc_slice

Craig Tiller 9 yıl önce
ebeveyn
işleme
b7982319c9

+ 2 - 0
package.json

@@ -34,6 +34,8 @@
   },
   "devDependencies": {
     "async": "^1.5.0",
+    "body-parser": "^1.15.2",
+    "express": "^4.14.0",
     "google-auth-library": "^0.9.2",
     "google-protobuf": "^3.0.0",
     "istanbul": "^0.3.21",

+ 4 - 0
src/core/ext/client_channel/client_channel.c

@@ -1022,6 +1022,10 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
   GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
   gpr_mu_destroy(&calld->mu);
   GPR_ASSERT(calld->waiting_ops_count == 0);
+  if (calld->connected_subchannel != NULL) {
+    GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
+                                    "picked");
+  }
   gpr_free(calld->waiting_ops);
   gpr_free(and_free_memory);
 }

+ 2 - 1
src/core/ext/client_channel/subchannel.c

@@ -183,9 +183,10 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
   gpr_free(c);
 }
 
-void grpc_connected_subchannel_ref(
+grpc_connected_subchannel *grpc_connected_subchannel_ref(
     grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
   GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
+  return c;
 }
 
 void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,

+ 1 - 1
src/core/ext/client_channel/subchannel.h

@@ -97,7 +97,7 @@ grpc_subchannel *grpc_subchannel_weak_ref(
 void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
                                 grpc_subchannel *channel
                                     GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-void grpc_connected_subchannel_ref(
+grpc_connected_subchannel *grpc_connected_subchannel_ref(
     grpc_connected_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
 void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
                                      grpc_connected_subchannel *channel

+ 3 - 3
src/core/ext/lb_policy/pick_first/pick_first.c

@@ -209,7 +209,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
   /* Check atomically for a selected channel */
   grpc_connected_subchannel *selected = GET_SELECTED(p);
   if (selected != NULL) {
-    *target = selected;
+    *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
     return 1;
   }
 
@@ -218,7 +218,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
   selected = GET_SELECTED(p);
   if (selected) {
     gpr_mu_unlock(&p->mu);
-    *target = selected;
+    *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
     return 1;
   } else {
     if (!p->started_picking) {
@@ -310,7 +310,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
         /* update any calls that were waiting for a pick */
         while ((pp = p->pending_picks)) {
           p->pending_picks = pp->next;
-          *pp->target = selected;
+          *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
           grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
           gpr_free(pp);
         }

+ 9 - 4
src/core/ext/lb_policy/round_robin/round_robin.c

@@ -397,7 +397,9 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
   gpr_mu_lock(&p->mu);
   if ((selected = peek_next_connected_locked(p))) {
     /* readily available, report right away */
-    *target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
+    *target = GRPC_CONNECTED_SUBCHANNEL_REF(
+        grpc_subchannel_get_connected_subchannel(selected->subchannel),
+        "picked");
 
     if (user_data != NULL) {
       *user_data = selected->user_data;
@@ -463,8 +465,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
         while ((pp = p->pending_picks)) {
           p->pending_picks = pp->next;
 
-          *pp->target =
-              grpc_subchannel_get_connected_subchannel(selected->subchannel);
+          *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
+              grpc_subchannel_get_connected_subchannel(selected->subchannel),
+              "picked");
           if (pp->user_data != NULL) {
             *pp->user_data = selected->user_data;
           }
@@ -578,7 +581,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
   gpr_mu_lock(&p->mu);
   if ((selected = peek_next_connected_locked(p))) {
     gpr_mu_unlock(&p->mu);
-    target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
+    target = GRPC_CONNECTED_SUBCHANNEL_REF(
+        grpc_subchannel_get_connected_subchannel(selected->subchannel),
+        "picked");
     grpc_connected_subchannel_ping(exec_ctx, target, closure);
   } else {
     gpr_mu_unlock(&p->mu);

+ 9 - 4
src/core/ext/transport/chttp2/transport/chttp2_transport.c

@@ -1023,9 +1023,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
       }
       if (!s->write_closed) {
         if (t->is_client) {
-          GPR_ASSERT(s->id == 0);
-          grpc_chttp2_list_add_waiting_for_concurrency(t, s);
-          maybe_start_some_streams(exec_ctx, t);
+          if (!t->closed) {
+            GPR_ASSERT(s->id == 0);
+            grpc_chttp2_list_add_waiting_for_concurrency(t, s);
+            maybe_start_some_streams(exec_ctx, t);
+          } else {
+            grpc_chttp2_cancel_stream(exec_ctx, t, s,
+                                      GRPC_ERROR_CREATE("Transport closed"));
+          }
         } else {
           GPR_ASSERT(s->id != 0);
           grpc_chttp2_become_writable(exec_ctx, t, s, true,
@@ -2190,7 +2195,7 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
                                            GRPC_ERROR_INT_HTTP2_ERROR,
                                            GRPC_CHTTP2_ENHANCE_YOUR_CALM));
     if (n > 1) {
-      /* Since we cancel one stream per destructive reclaimation, if
+      /* Since we cancel one stream per destructive reclamation, if
          there are more streams left, we can immediately post a new
          reclaimer in case the resource quota needs to free more
          memory */

+ 6 - 0
src/core/lib/iomgr/ev_epoll_linux.c

@@ -1711,6 +1711,12 @@ retry:
             "pollset_add_fd: Raced creating new polling island. pi_new: %p "
             "(fd: %d, pollset: %p)",
             (void *)pi_new, fd->fd, (void *)pollset);
+
+        /* No need to lock 'pi_new' here since this is a new polling island and
+         * no one has a reference to it yet */
+        polling_island_remove_all_fds_locked(pi_new, true, &error);
+
+        /* Ref and unref so that the polling island gets deleted during unref */
         PI_ADD_REF(pi_new, "dance_of_destruction");
         PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
         goto retry;

+ 10 - 16
src/core/lib/iomgr/resource_quota.c

@@ -89,6 +89,7 @@ static void rulist_add_head(grpc_resource_user *resource_user,
     resource_user->links[list].prev = (*root)->links[list].prev;
     resource_user->links[list].next->links[list].prev =
         resource_user->links[list].prev->links[list].next = resource_user;
+    *root = resource_user;
   }
 }
 
@@ -105,7 +106,6 @@ static void rulist_add_tail(grpc_resource_user *resource_user,
     resource_user->links[list].prev = *root;
     resource_user->links[list].next->links[list].prev =
         resource_user->links[list].prev->links[list].next = resource_user;
-    *root = resource_user;
   }
 }
 
@@ -114,7 +114,7 @@ static bool rulist_empty(grpc_resource_quota *resource_quota,
   return resource_quota->roots[list] == NULL;
 }
 
-static grpc_resource_user *rulist_pop_tail(grpc_resource_quota *resource_quota,
+static grpc_resource_user *rulist_pop_head(grpc_resource_quota *resource_quota,
                                            grpc_rulist list) {
   grpc_resource_user **root = &resource_quota->roots[list];
   grpc_resource_user *resource_user = *root;
@@ -186,7 +186,7 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx,
 static bool rq_alloc(grpc_exec_ctx *exec_ctx,
                      grpc_resource_quota *resource_quota) {
   grpc_resource_user *resource_user;
-  while ((resource_user = rulist_pop_tail(resource_quota,
+  while ((resource_user = rulist_pop_head(resource_quota,
                                           GRPC_RULIST_AWAITING_ALLOCATION))) {
     gpr_mu_lock(&resource_user->mu);
     if (resource_user->free_pool < 0 &&
@@ -209,7 +209,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
       grpc_exec_ctx_enqueue_list(exec_ctx, &resource_user->on_allocated, NULL);
       gpr_mu_unlock(&resource_user->mu);
     } else {
-      rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
+      rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
       gpr_mu_unlock(&resource_user->mu);
       return false;
     }
@@ -221,7 +221,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
 static bool rq_reclaim_from_per_user_free_pool(
     grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) {
   grpc_resource_user *resource_user;
-  while ((resource_user = rulist_pop_tail(resource_quota,
+  while ((resource_user = rulist_pop_head(resource_quota,
                                           GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
     gpr_mu_lock(&resource_user->mu);
     if (resource_user->free_pool > 0) {
@@ -249,7 +249,7 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx,
   if (resource_quota->reclaiming) return true;
   grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
                                  : GRPC_RULIST_RECLAIMER_BENIGN;
-  grpc_resource_user *resource_user = rulist_pop_tail(resource_quota, list);
+  grpc_resource_user *resource_user = rulist_pop_head(resource_quota, list);
   if (resource_user == NULL) return false;
   if (grpc_resource_quota_trace) {
     gpr_log(GPR_DEBUG, "RQ %s %s: initiate %s reclamation",
@@ -325,7 +325,7 @@ static void ru_allocate(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
                    GRPC_RULIST_AWAITING_ALLOCATION)) {
     rq_step_sched(exec_ctx, resource_user->resource_quota);
   }
-  rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
+  rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
 }
 
 static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *ru,
@@ -337,7 +337,7 @@ static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *ru,
                    GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
     rq_step_sched(exec_ctx, resource_user->resource_quota);
   }
-  rulist_add_head(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
+  rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
 }
 
 static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
@@ -351,7 +351,7 @@ static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
                    GRPC_RULIST_RECLAIMER_BENIGN)) {
     rq_step_sched(exec_ctx, resource_user->resource_quota);
   }
-  rulist_add_head(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
+  rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
 }
 
 static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
@@ -367,7 +367,7 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
                    GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
     rq_step_sched(exec_ctx, resource_user->resource_quota);
   }
-  rulist_add_head(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
+  rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
 }
 
 static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
@@ -563,9 +563,6 @@ void grpc_resource_user_init(grpc_resource_user *resource_user,
   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
     resource_user->links[i].next = resource_user->links[i].prev = NULL;
   }
-#ifndef NDEBUG
-  resource_user->asan_canary = gpr_malloc(1);
-#endif
   if (name != NULL) {
     resource_user->name = gpr_strdup(name);
   } else {
@@ -592,9 +589,6 @@ void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
 
 void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx,
                                 grpc_resource_user *resource_user) {
-#ifndef NDEBUG
-  gpr_free(resource_user->asan_canary);
-#endif
   grpc_resource_quota_internal_unref(exec_ctx, resource_user->resource_quota);
   gpr_mu_destroy(&resource_user->mu);
   gpr_free(resource_user->name);

+ 10 - 9
src/core/lib/iomgr/resource_quota.h

@@ -49,7 +49,8 @@
     resource constrained, grpc_resource_user instances are asked (in turn) to
     free up whatever they can so that the system as a whole can make progress.
 
-    There are three kinds of reclamation that take place:
+    There are three kinds of reclamation that take place, in order of increasing
+    invasiveness:
     - an internal reclamation, where cached resource at the resource user level
       is returned to the quota
     - a benign reclamation phase, whereby resources that are in use but are not
@@ -58,9 +59,14 @@
       make progress may be enacted so that at least one part of the system can
       complete.
 
-    These reclamations are tried in priority order, and only one reclamation
-    is outstanding for a quota at any given time (meaning that if a destructive
-    reclamation makes progress, we may follow up with a benign reclamation).
+    Only one reclamation will be outstanding for a given quota at a given time.
+    On each reclamation attempt, the kinds of reclamation are tried in order of
+    increasing invasiveness, stopping at the first one that succeeds. Thus, on a
+    given reclamation attempt, if internal and benign reclamation both fail, it
+    will wind up doing a destructive reclamation. However, the next reclamation
+    attempt may then be able to get what it needs via internal or benign
+    reclamation, due to resources that may have been freed up by the destructive
+    reclamation in the previous attempt.
 
     Future work will be to expose the current resource pressure so that back
     pressure can be applied to avoid reclamation phases starting.
@@ -112,11 +118,6 @@ struct grpc_resource_user {
      lock */
   grpc_closure add_to_free_pool_closure;
 
-#ifndef NDEBUG
-  /* Canary object to detect leaked resource users with ASAN */
-  void *asan_canary;
-#endif
-
   gpr_mu mu;
   /* Total allocated memory outstanding by this resource user in bytes;
      always positive */

+ 3 - 0
src/core/lib/iomgr/tcp_uv.c

@@ -315,6 +315,9 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) {
     gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp);
   }
 
+  /* Disable Nagle's Algorithm */
+  uv_tcp_nodelay(handle, 1);
+
   memset(tcp, 0, sizeof(grpc_tcp));
   tcp->base.vtable = &vtable;
   tcp->handle = handle;

+ 1 - 0
src/core/lib/surface/server.c

@@ -842,6 +842,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
       grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
   if (error != GRPC_ERROR_NONE) {
     got_initial_metadata(exec_ctx, elem, error);
+    GRPC_ERROR_UNREF(error);
     return;
   }
   call_data *calld = elem->call_data;

+ 9 - 8
src/csharp/Grpc.Core/Internal/AsyncCall.cs

@@ -52,9 +52,8 @@ namespace Grpc.Core.Internal
         // Completion of a pending unary response if not null.
         TaskCompletionSource<TResponse> unaryResponseTcs;
 
-        // TODO(jtattermusch): this field doesn't need to be initialized for unary response calls.
-        // Indicates that response streaming call has finished.
-        TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
+        // Completion of a streaming response call if not null.
+        TaskCompletionSource<object> streamingResponseCallFinishedTcs;
 
         // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers).
         // Response headers set here once received.
@@ -198,6 +197,7 @@ namespace Grpc.Core.Internal
 
                 byte[] payload = UnsafeSerialize(msg);
 
+                streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
                 using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
                 {
                     call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
@@ -219,6 +219,7 @@ namespace Grpc.Core.Internal
 
                 Initialize(details.Channel.CompletionQueue);
 
+                streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
                 using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
                 {
                     call.StartDuplexStreaming(HandleFinished, metadataArray);
@@ -276,13 +277,13 @@ namespace Grpc.Core.Internal
         }
 
         /// <summary>
-        /// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise.
+        /// Get the task that completes once if streaming response call finishes with ok status and throws RpcException with given status otherwise.
         /// </summary>
-        public Task StreamingCallFinishedTask
+        public Task StreamingResponseCallFinishedTask
         {
             get
             {
-                return streamingCallFinishedTcs.Task;
+                return streamingResponseCallFinishedTcs.Task;
             }
         }
 
@@ -529,11 +530,11 @@ namespace Grpc.Core.Internal
             var status = receivedStatus.Status;
             if (status.StatusCode != StatusCode.OK)
             {
-                streamingCallFinishedTcs.SetException(new RpcException(status));
+                streamingResponseCallFinishedTcs.SetException(new RpcException(status));
                 return;
             }
 
-            streamingCallFinishedTcs.SetResult(null);
+            streamingResponseCallFinishedTcs.SetResult(null);
         }
     }
 }

+ 1 - 1
src/csharp/Grpc.Core/Internal/ClientResponseStream.cs

@@ -73,7 +73,7 @@ namespace Grpc.Core.Internal
 
             if (result == null)
             {
-                await call.StreamingCallFinishedTask.ConfigureAwait(false);
+                await call.StreamingResponseCallFinishedTask.ConfigureAwait(false);
                 return false;
             }
             return true;

+ 291 - 0
src/node/performance/benchmark_client_express.js

@@ -0,0 +1,291 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * Benchmark client module
+ * @module
+ */
+
+'use strict';
+
+var fs = require('fs');
+var path = require('path');
+var util = require('util');
+var EventEmitter = require('events');
+var http = require('http');
+var https = require('https');
+
+var async = require('async');
+var _ = require('lodash');
+var PoissonProcess = require('poisson-process');
+var Histogram = require('./histogram');
+
+/**
+ * Convert a time difference, as returned by process.hrtime, to a number of
+ * nanoseconds.
+ * @param {Array.<number>} time_diff The time diff, represented as
+ *     [seconds, nanoseconds]
+ * @return {number} The total number of nanoseconds
+ */
+function timeDiffToNanos(time_diff) {
+  return time_diff[0] * 1e9 + time_diff[1];
+}
+
+function BenchmarkClient(server_targets, channels, histogram_params,
+    security_params) {
+  var options = {
+    method: 'PUT',
+    headers: {
+      'Content-Type': 'application/json'
+    }
+  };
+  var protocol;
+  if (security_params) {
+    var ca_path;
+    protocol = https;
+    this.request = _.bind(https.request, https);
+    if (security_params.use_test_ca) {
+      ca_path = path.join(__dirname, '../test/data/ca.pem');
+      var ca_data = fs.readFileSync(ca_path);
+      options.ca = ca_data;
+    }
+    if (security_params.server_host_override) {
+      var host_override = security_params.server_host_override;
+      options.servername = host_override;
+    }
+  } else {
+    protocol = http;
+  }
+
+  this.request = _.bind(protocol.request, protocol);
+
+  this.client_options = [];
+
+  for (var i = 0; i < channels; i++) {
+    var host_port;
+    host_port = server_targets[i % server_targets.length].split(':')
+    var new_options = _.assign({hostname: host_port[0], port: +host_port[1]}, options);
+    new_options.agent = new protocol.Agent(new_options);
+    this.client_options[i] = new_options;
+  }
+
+  this.histogram = new Histogram(histogram_params.resolution,
+                                 histogram_params.max_possible);
+
+  this.running = false;
+
+  this.pending_calls = 0;
+}
+
+util.inherits(BenchmarkClient, EventEmitter);
+
+function startAllClients(client_options_list, outstanding_rpcs_per_channel,
+                         makeCall, emitter) {
+  _.each(client_options_list, function(client_options) {
+    _.times(outstanding_rpcs_per_channel, function() {
+      makeCall(client_options);
+    });
+  });
+}
+
+BenchmarkClient.prototype.startClosedLoop = function(
+    outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, generic) {
+  var self = this;
+
+  var options = {};
+
+  self.running = true;
+
+  if (rpc_type == 'UNARY') {
+    options.path = '/serviceProto.BenchmarkService.service/unaryCall';
+  } else {
+    self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type));
+  }
+
+  if (generic) {
+    self.emit('error', new Error('Generic client not supported'));
+  }
+
+  self.last_wall_time = process.hrtime();
+
+  var argument = {
+    response_size: resp_size,
+    payload: {
+      body: '0'.repeat(req_size)
+    }
+  };
+
+  function makeCall(client_options) {
+    if (self.running) {
+      self.pending_calls++;
+      var start_time = process.hrtime();
+      var req = self.request(client_options, function(res) {
+        var res_data = '';
+        res.on('data', function(data) {
+          res_data += data;
+        });
+        res.on('end', function() {
+          JSON.parse(res_data);
+          var time_diff = process.hrtime(start_time);
+          self.histogram.add(timeDiffToNanos(time_diff));
+          makeCall(client_options);
+          self.pending_calls--;
+          if ((!self.running) && self.pending_calls == 0) {
+            self.emit('finished');
+          }
+        });
+      });
+      req.write(JSON.stringify(argument));
+      req.end();
+      req.on('error', function(error) {
+        self.emit('error', new Error('Client error: ' + error.message));
+        self.running = false;
+      });
+    }
+  }
+
+  startAllClients(_.map(self.client_options, _.partial(_.assign, options)),
+                  outstanding_rpcs_per_channel, makeCall, self);
+};
+
+BenchmarkClient.prototype.startPoisson = function(
+    outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, offered_load,
+    generic) {
+  var self = this;
+
+  var options = {};
+
+  self.running = true;
+
+  if (rpc_type == 'UNARY') {
+    options.path = '/serviceProto.BenchmarkService.service/unaryCall';
+  } else {
+    self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type));
+  }
+
+  if (generic) {
+    self.emit('error', new Error('Generic client not supported'));
+  }
+
+  self.last_wall_time = process.hrtime();
+
+  var argument = {
+    response_size: resp_size,
+    payload: {
+      body: '0'.repeat(req_size)
+    }
+  };
+
+  function makeCall(client_options, poisson) {
+    if (self.running) {
+      self.pending_calls++;
+      var start_time = process.hrtime();
+      var req = self.request(client_options, function(res) {
+        var res_data = '';
+        res.on('data', function(data) {
+          res_data += data;
+        });
+        res.on('end', function() {
+          JSON.parse(res_data);
+          var time_diff = process.hrtime(start_time);
+          self.histogram.add(timeDiffToNanos(time_diff));
+          self.pending_calls--;
+          if ((!self.running) && self.pending_calls == 0) {
+            self.emit('finished');
+          }
+        });
+      });
+      req.write(JSON.stringify(argument));
+      req.end();
+      req.on('error', function(error) {
+        self.emit('error', new Error('Client error: ' + error.message));
+        self.running = false;
+      });
+    } else {
+      poisson.stop();
+    }
+  }
+
+  var averageIntervalMs = (1 / offered_load) * 1000;
+
+  startAllClients(_.map(self.client_options, _.partial(_.assign, options)),
+                  outstanding_rpcs_per_channel, function(opts){
+                    var p = PoissonProcess.create(averageIntervalMs, function() {
+                      makeCall(opts, p);
+                    });
+                    p.start();
+                  }, self);
+};
+
+/**
+ * Return curent statistics for the client. If reset is set, restart
+ * statistic collection.
+ * @param {boolean} reset Indicates that statistics should be reset
+ * @return {object} Client statistics
+ */
+BenchmarkClient.prototype.mark = function(reset) {
+  var wall_time_diff = process.hrtime(this.last_wall_time);
+  var histogram = this.histogram;
+  if (reset) {
+    this.last_wall_time = process.hrtime();
+    this.histogram = new Histogram(histogram.resolution,
+                                   histogram.max_possible);
+  }
+
+  return {
+    latencies: {
+      bucket: histogram.getContents(),
+      min_seen: histogram.minimum(),
+      max_seen: histogram.maximum(),
+      sum: histogram.getSum(),
+      sum_of_squares: histogram.sumOfSquares(),
+      count: histogram.getCount()
+    },
+    time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9,
+    // Not sure how to measure these values
+    time_user: 0,
+    time_system: 0
+  };
+};
+
+/**
+ * Stop the clients.
+ * @param {function} callback Called when the clients have finished shutting
+ *     down
+ */
+BenchmarkClient.prototype.stop = function(callback) {
+  this.running = false;
+  this.on('finished', callback);
+};
+
+module.exports = BenchmarkClient;

+ 5 - 0
src/node/performance/benchmark_server.js

@@ -40,6 +40,8 @@
 
 var fs = require('fs');
 var path = require('path');
+var EventEmitter = require('events');
+var util = require('util');
 
 var genericService = require('./generic_service');
 
@@ -138,12 +140,15 @@ function BenchmarkServer(host, port, tls, generic, response_size) {
   this.server = server;
 }
 
+util.inherits(BenchmarkServer, EventEmitter);
+
 /**
  * Start the benchmark server.
  */
 BenchmarkServer.prototype.start = function() {
   this.server.start();
   this.last_wall_time = process.hrtime();
+  this.emit('started');
 };
 
 /**

+ 109 - 0
src/node/performance/benchmark_server_express.js

@@ -0,0 +1,109 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * Benchmark server module
+ * @module
+ */
+
+'use strict';
+
+var fs = require('fs');
+var path = require('path');
+var http = require('http');
+var https = require('https');
+var EventEmitter = require('events');
+var util = require('util');
+
+var express = require('express');
+var bodyParser = require('body-parser')
+
+function unaryCall(req, res) {
+  var reqObj = req.body;
+  var payload = {body: '0'.repeat(reqObj.response_size)};
+  res.json(payload);
+}
+
+function BenchmarkServer(host, port, tls, generic, response_size) {
+  var app = express();
+  app.use(bodyParser.json())
+  app.put('/serviceProto.BenchmarkService.service/unaryCall', unaryCall);
+  this.input_host = host;
+  this.input_port = port;
+  if (tls) {
+    var credentials = {};
+    var key_path = path.join(__dirname, '../test/data/server1.key');
+    var pem_path = path.join(__dirname, '../test/data/server1.pem');
+
+    var key_data = fs.readFileSync(key_path);
+    var pem_data = fs.readFileSync(pem_path);
+    credentials['key'] = key_data;
+    credentials['cert'] = pem_data;
+    this.server = https.createServer(credentials, app);
+  } else {
+    this.server = http.createServer(app);
+  }
+}
+
+util.inherits(BenchmarkServer, EventEmitter);
+
+BenchmarkServer.prototype.start = function() {
+  var self = this;
+  this.server.listen(this.input_port, this.input_hostname, function() {
+    self.last_wall_time = process.hrtime();
+    self.emit('started');
+  });
+};
+
+BenchmarkServer.prototype.getPort = function() {
+  return this.server.address().port;
+};
+
+BenchmarkServer.prototype.mark = function(reset) {
+  var wall_time_diff = process.hrtime(this.last_wall_time);
+  if (reset) {
+    this.last_wall_time = process.hrtime();
+  }
+  return {
+    time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9,
+    // Not sure how to measure these values
+    time_user: 0,
+    time_system: 0
+  };
+};
+
+BenchmarkServer.prototype.stop = function(callback) {
+  this.server.close(callback);
+};
+
+module.exports = BenchmarkServer;

+ 5 - 5
src/node/performance/worker.js

@@ -34,18 +34,18 @@
 'use strict';
 
 var console = require('console');
-var worker_service_impl = require('./worker_service_impl');
+var WorkerServiceImpl = require('./worker_service_impl');
 
 var grpc = require('../../../');
 var serviceProto = grpc.load({
   root: __dirname + '/../../..',
   file: 'src/proto/grpc/testing/services.proto'}).grpc.testing;
 
-function runServer(port) {
+function runServer(port, benchmark_impl) {
   var server_creds = grpc.ServerCredentials.createInsecure();
   var server = new grpc.Server();
   server.addProtoService(serviceProto.WorkerService.service,
-                         worker_service_impl);
+                         new WorkerServiceImpl(benchmark_impl, server));
   var address = '0.0.0.0:' + port;
   server.bind(address, server_creds);
   server.start();
@@ -57,9 +57,9 @@ if (require.main === module) {
   Error.stackTraceLimit = Infinity;
   var parseArgs = require('minimist');
   var argv = parseArgs(process.argv, {
-    string: ['driver_port']
+    string: ['driver_port', 'benchmark_impl']
   });
-  runServer(argv.driver_port);
+  runServer(argv.driver_port, argv.benchmark_impl);
 }
 
 exports.runServer = runServer;

+ 124 - 104
src/node/performance/worker_service_impl.js

@@ -38,121 +38,141 @@ var console = require('console');
 var BenchmarkClient = require('./benchmark_client');
 var BenchmarkServer = require('./benchmark_server');
 
-exports.quitWorker = function quitWorker(call, callback) {
-  callback(null, {});
-  process.exit(0);
-}
+module.exports = function WorkerServiceImpl(benchmark_impl, server) {
+  var BenchmarkClient;
+  var BenchmarkServer;
+  switch (benchmark_impl) {
+    case 'grpc':
+    BenchmarkClient = require('./benchmark_client');
+    BenchmarkServer = require('./benchmark_server');
+    break;
+    case 'express':
+    BenchmarkClient = require('./benchmark_client_express');
+    BenchmarkServer = require('./benchmark_server_express');
+    break;
+    default:
+    throw new Error('Unrecognized benchmark impl: ' + benchmark_impl);
+  }
 
-exports.runClient = function runClient(call) {
-  var client;
-  call.on('data', function(request) {
-    var stats;
-    switch (request.argtype) {
-      case 'setup':
-      var setup = request.setup;
-      console.log('ClientConfig %j', setup);
-      client = new BenchmarkClient(setup.server_targets,
-                                   setup.client_channels,
-                                   setup.histogram_params,
-                                   setup.security_params);
-      client.on('error', function(error) {
-        call.emit('error', error);
-      });
-      var req_size, resp_size, generic;
-      switch (setup.payload_config.payload) {
-        case 'bytebuf_params':
-        req_size = setup.payload_config.bytebuf_params.req_size;
-        resp_size = setup.payload_config.bytebuf_params.resp_size;
-        generic = true;
+  this.quitWorker = function quitWorker(call, callback) {
+    server.tryShutdown(function() {
+      callback(null, {});
+    });
+  };
+
+  this.runClient = function runClient(call) {
+    var client;
+    call.on('data', function(request) {
+      var stats;
+      switch (request.argtype) {
+        case 'setup':
+        var setup = request.setup;
+        console.log('ClientConfig %j', setup);
+        client = new BenchmarkClient(setup.server_targets,
+                                     setup.client_channels,
+                                     setup.histogram_params,
+                                     setup.security_params);
+        client.on('error', function(error) {
+          call.emit('error', error);
+        });
+        var req_size, resp_size, generic;
+        switch (setup.payload_config.payload) {
+          case 'bytebuf_params':
+          req_size = setup.payload_config.bytebuf_params.req_size;
+          resp_size = setup.payload_config.bytebuf_params.resp_size;
+          generic = true;
+          break;
+          case 'simple_params':
+          req_size = setup.payload_config.simple_params.req_size;
+          resp_size = setup.payload_config.simple_params.resp_size;
+          generic = false;
+          break;
+          default:
+          call.emit('error', new Error('Unsupported PayloadConfig type' +
+              setup.payload_config.payload));
+        }
+        switch (setup.load_params.load) {
+          case 'closed_loop':
+          client.startClosedLoop(setup.outstanding_rpcs_per_channel,
+                                 setup.rpc_type, req_size, resp_size, generic);
+          break;
+          case 'poisson':
+          client.startPoisson(setup.outstanding_rpcs_per_channel,
+                              setup.rpc_type, req_size, resp_size,
+                              setup.load_params.poisson.offered_load, generic);
+          break;
+          default:
+          call.emit('error', new Error('Unsupported LoadParams type' +
+              setup.load_params.load));
+        }
+        stats = client.mark();
+        call.write({
+          stats: stats
+        });
         break;
-        case 'simple_params':
-        req_size = setup.payload_config.simple_params.req_size;
-        resp_size = setup.payload_config.simple_params.resp_size;
-        generic = false;
+        case 'mark':
+        if (client) {
+          stats = client.mark(request.mark.reset);
+          call.write({
+            stats: stats
+          });
+        } else {
+          call.emit('error', new Error('Got Mark before ClientConfig'));
+        }
         break;
         default:
-        call.emit('error', new Error('Unsupported PayloadConfig type' +
-            setup.payload_config.payload));
+        throw new Error('Nonexistent client argtype option: ' + request.argtype);
       }
-      switch (setup.load_params.load) {
-        case 'closed_loop':
-        client.startClosedLoop(setup.outstanding_rpcs_per_channel,
-                               setup.rpc_type, req_size, resp_size, generic);
+    });
+    call.on('end', function() {
+      client.stop(function() {
+        call.end();
+      });
+    });
+  };
+
+  this.runServer = function runServer(call) {
+    var server;
+    call.on('data', function(request) {
+      var stats;
+      switch (request.argtype) {
+        case 'setup':
+        console.log('ServerConfig %j', request.setup);
+        server = new BenchmarkServer('[::]', request.setup.port,
+                                     request.setup.security_params);
+        server.on('started', function() {
+          stats = server.mark();
+          call.write({
+            stats: stats,
+            port: server.getPort()
+          });
+        });
+        server.start();
         break;
-        case 'poisson':
-        client.startPoisson(setup.outstanding_rpcs_per_channel,
-                            setup.rpc_type, req_size, resp_size,
-                            setup.load_params.poisson.offered_load, generic);
+        case 'mark':
+        if (server) {
+          stats = server.mark(request.mark.reset);
+          call.write({
+            stats: stats,
+            port: server.getPort(),
+            cores: 1
+          });
+        } else {
+          call.emit('error', new Error('Got Mark before ServerConfig'));
+        }
         break;
         default:
-        call.emit('error', new Error('Unsupported LoadParams type' +
-            setup.load_params.load));
+        throw new Error('Nonexistent server argtype option');
       }
-      stats = client.mark();
-      call.write({
-        stats: stats
-      });
-      break;
-      case 'mark':
-      if (client) {
-        stats = client.mark(request.mark.reset);
-        call.write({
-          stats: stats
-        });
-      } else {
-        call.emit('error', new Error('Got Mark before ClientConfig'));
-      }
-      break;
-      default:
-      throw new Error('Nonexistent client argtype option: ' + request.argtype);
-    }
-  });
-  call.on('end', function() {
-    client.stop(function() {
-      call.end();
     });
-  });
-};
-
-exports.runServer = function runServer(call) {
-  var server;
-  call.on('data', function(request) {
-    var stats;
-    switch (request.argtype) {
-      case 'setup':
-      console.log('ServerConfig %j', request.setup);
-      server = new BenchmarkServer('[::]', request.setup.port,
-                                   request.setup.security_params);
-      server.start();
-      stats = server.mark();
-      call.write({
-        stats: stats,
-        port: server.getPort()
+    call.on('end', function() {
+      server.stop(function() {
+        call.end();
       });
-      break;
-      case 'mark':
-      if (server) {
-        stats = server.mark(request.mark.reset);
-        call.write({
-          stats: stats,
-          port: server.getPort(),
-          cores: 1
-        });
-      } else {
-        call.emit('error', new Error('Got Mark before ServerConfig'));
-      }
-      break;
-      default:
-      throw new Error('Nonexistent server argtype option');
-    }
-  });
-  call.on('end', function() {
-    server.stop(function() {
-      call.end();
     });
-  });
-};
+  };
 
-exports.coreCount = function coreCount(call, callback) {
-  callback(null, {cores: os.cpus().length});
+  this.coreCount = function coreCount(call, callback) {
+    callback(null, {cores: os.cpus().length});
+  };
 };

+ 2 - 0
templates/package.json.template

@@ -36,6 +36,8 @@
     },
     "devDependencies": {
       "async": "^1.5.0",
+      "body-parser": "^1.15.2",
+      "express": "^4.14.0",
       "google-auth-library": "^0.9.2",
       "google-protobuf": "^3.0.0",
       "istanbul": "^0.3.21",

+ 5 - 5
test/core/end2end/gen_build_yaml.py

@@ -84,8 +84,8 @@ END2END_FIXTURES = {
 
 TestOptions = collections.namedtuple(
     'TestOptions',
-    'needs_fullstack needs_dns proxyable secure traceable cpu_cost exclude_iomgrs large_writes')
-default_test_options = TestOptions(False, False, True, False, True, 1.0, [], False)
+    'needs_fullstack needs_dns proxyable secure traceable cpu_cost exclude_iomgrs large_writes flaky')
+default_test_options = TestOptions(False, False, True, False, True, 1.0, [], False, False)
 connectivity_test_options = default_test_options._replace(needs_fullstack=True)
 
 LOWCPU = 0.1
@@ -108,7 +108,7 @@ END2END_TESTS = {
         proxyable=False, cpu_cost=LOWCPU, exclude_iomgrs=['uv']),
     'default_host': default_test_options._replace(needs_fullstack=True,
                                                   needs_dns=True),
-    'disappearing_server': connectivity_test_options,
+    'disappearing_server': connectivity_test_options._replace(flaky=True),
     'empty_batch': default_test_options,
     'filter_causes_close': default_test_options,
     'filter_call_init_fails': default_test_options,
@@ -263,7 +263,7 @@ def main():
               'ci_platforms': (END2END_FIXTURES[f].platforms
                                if END2END_FIXTURES[f].ci_mac else without(
                                    END2END_FIXTURES[f].platforms, 'mac')),
-              'flaky': False,
+              'flaky': END2END_TESTS[t].flaky,
               'language': 'c',
               'cpu_cost': END2END_TESTS[t].cpu_cost,
           }
@@ -280,7 +280,7 @@ def main():
               'ci_platforms': (END2END_FIXTURES[f].platforms
                                if END2END_FIXTURES[f].ci_mac else without(
                                    END2END_FIXTURES[f].platforms, 'mac')),
-              'flaky': False,
+              'flaky': END2END_TESTS[t].flaky,
               'language': 'c',
               'cpu_cost': END2END_TESTS[t].cpu_cost,
           }

+ 3 - 3
test/core/end2end/tests/resource_quota_server.c

@@ -340,9 +340,9 @@ void resource_quota_server(grpc_end2end_test_config config) {
       "Done. %d total calls: %d cancelled at server, %d cancelled at client.",
       NUM_CALLS, cancelled_calls_on_server, cancelled_calls_on_client);
 
-  /* The server call may be cancelled after it's received it's status, but
-   * before the client does: this means that we should see strictly more
-   * failures on the client than on the server */
+  /* The call may be cancelled after the server has sent its status but before
+   * the client has received it. This means that we should see strictly more
+   * failures on the client than on the server. */
   GPR_ASSERT(cancelled_calls_on_client >= cancelled_calls_on_server);
   /* However, we shouldn't see radically more... 0.9 is a guessed bound on what
    * we'd want that ratio to be... to at least trigger some investigation should

+ 7 - 3
test/core/iomgr/resource_quota_test.c

@@ -553,9 +553,13 @@ static void test_resource_user_stays_allocated_until_memory_released(void) {
 static void
 test_resource_user_stays_allocated_and_reclaimers_unrun_until_memory_released(
     void) {
-  gpr_log(GPR_INFO, "** test_pools_merged_on_resource_user_deletion **");
-  grpc_resource_quota *q =
-      grpc_resource_quota_create("test_pools_merged_on_resource_user_deletion");
+  gpr_log(GPR_INFO,
+          "** "
+          "test_resource_user_stays_allocated_and_reclaimers_unrun_until_"
+          "memory_released **");
+  grpc_resource_quota *q = grpc_resource_quota_create(
+      "test_resource_user_stays_allocated_and_reclaimers_unrun_until_memory_"
+      "released");
   grpc_resource_quota_resize(q, 1024);
   for (int i = 0; i < 10; i++) {
     grpc_resource_user usr;

+ 1 - 1
test/core/iomgr/udp_server_test.c

@@ -185,7 +185,7 @@ static void test_receive(int number_of_clients) {
     /* Create a socket, send a packet to the UDP server. */
     clifd = socket(addr->ss_family, SOCK_DGRAM, 0);
     GPR_ASSERT(clifd >= 0);
-    GPR_ASSERT(connect(clifd, (struct sockaddr *)&addr,
+    GPR_ASSERT(connect(clifd, (struct sockaddr *)addr,
                        (socklen_t)resolved_addr.len) == 0);
     GPR_ASSERT(5 == write(clifd, "hello", 5));
     while (g_number_of_reads == number_of_reads_before &&

+ 2 - 5
test/cpp/qps/client_sync.cc

@@ -130,11 +130,8 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
     grpc::Status s =
         stub->UnaryCall(&context, request_, &responses_[thread_idx]);
     entry->set_value((UsageTimer::Now() - start) * 1e9);
-    if (!s.ok()) {
-      gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
-              s.error_message().c_str());
-    }
-    return s.ok();
+    entry->set_status(s.error_code());
+    return true;
   }
 };
 

+ 14 - 0
tools/run_tests/build_python.sh

@@ -39,6 +39,14 @@ cd $(dirname $0)/../..
 
 PLATFORM=`uname -s`
 
+function is_msys() {
+  if [ "${PLATFORM/MSYS}" != "$PLATFORM" ]; then
+    echo true
+  else
+    exit 1
+  fi
+}
+
 function is_mingw() {
   if [ "${PLATFORM/MINGW}" != "$PLATFORM" ]; then
     echo true
@@ -108,6 +116,12 @@ VENV=${2:-$(venv $PYTHON)}
 VENV_RELATIVE_PYTHON=${3:-$(venv_relative_python)}
 TOOLCHAIN=${4:-$(toolchain)}
 
+if [ $(is_msys) ]; then
+  echo "MSYS doesn't directly provide the right compiler(s);"
+  echo "switch to a MinGW shell."
+  exit 1
+fi
+
 ROOT=`pwd`
 export CFLAGS="-I$ROOT/include -std=gnu99 -fno-wrapv $CFLAGS"
 export GRPC_PYTHON_BUILD_WITH_CYTHON=1

+ 69 - 1
tools/run_tests/performance/scenario_config.py

@@ -232,6 +232,15 @@ class CXXLanguage:
           secure=secure,
           categories=smoketest_categories + [SCALABLE])
 
+      yield _ping_pong_scenario(
+          'cpp_protobuf_async_client_sync_server_streaming_qps_unconstrained_%s' % secstr,
+          rpc_type='STREAMING',
+          client_type='ASYNC_CLIENT',
+          server_type='SYNC_SERVER',
+          unconstrained_client='async',
+          secure=secure,
+          categories=smoketest_categories+[SCALABLE])
+
       for rpc_type in ['unary', 'streaming']:
         for synchronicity in ['sync', 'async']:
           yield _ping_pong_scenario(
@@ -361,7 +370,8 @@ class NodeLanguage:
     self.safename = str(self)
 
   def worker_cmdline(self):
-    return ['tools/run_tests/performance/run_worker_node.sh']
+    return ['tools/run_tests/performance/run_worker_node.sh',
+            '--benchmark_impl=grpc']
 
   def worker_port_offset(self):
     return 200
@@ -664,11 +674,69 @@ class GoLanguage:
   def __str__(self):
     return 'go'
 
+class NodeExpressLanguage:
+
+  def __init__(self):
+    pass
+    self.safename = str(self)
+
+  def worker_cmdline(self):
+    return ['tools/run_tests/performance/run_worker_node.sh',
+            '--benchmark_impl=express']
+
+  def worker_port_offset(self):
+    return 700
+
+  def scenarios(self):
+    # TODO(jtattermusch): make this scenario work
+    #yield _ping_pong_scenario(
+    #    'node_generic_async_streaming_ping_pong', rpc_type='STREAMING',
+    #    client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
+    #    use_generic_payload=True)
+
+    # TODO(jtattermusch): make this scenario work
+    #yield _ping_pong_scenario(
+    #    'node_protobuf_async_streaming_ping_pong', rpc_type='STREAMING',
+    #    client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER')
+
+    yield _ping_pong_scenario(
+        'node_protobuf_unary_ping_pong', rpc_type='UNARY',
+        client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+        categories=[SCALABLE, SMOKETEST])
+
+    yield _ping_pong_scenario(
+        'node_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY',
+        client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+        unconstrained_client='async',
+        categories=[SCALABLE, SMOKETEST])
+
+    # TODO(jtattermusch): make this scenario work
+    #yield _ping_pong_scenario(
+    #    'node_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING',
+    #    client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+    #    unconstrained_client='async')
+
+    # TODO(jtattermusch): make this scenario work
+    #yield _ping_pong_scenario(
+    #    'node_to_cpp_protobuf_async_unary_ping_pong', rpc_type='UNARY',
+    #    client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+    #    server_language='c++', server_core_limit=1, async_server_threads=1)
+
+    # TODO(jtattermusch): make this scenario work
+    #yield _ping_pong_scenario(
+    #    'node_to_cpp_protobuf_async_streaming_ping_pong', rpc_type='STREAMING',
+    #    client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+    #    server_language='c++', server_core_limit=1, async_server_threads=1)
+
+  def __str__(self):
+    return 'node_express'
+
 
 LANGUAGES = {
     'c++' : CXXLanguage(),
     'csharp' : CSharpLanguage(),
     'node' : NodeLanguage(),
+    'node_express': NodeExpressLanguage(),
     'ruby' : RubyLanguage(),
     'java' : JavaLanguage(),
     'python' : PythonLanguage(),

+ 11 - 3
tools/run_tests/run_performance_tests.py

@@ -42,6 +42,7 @@ import os
 import performance.scenario_config as scenario_config
 import pipes
 import re
+import report_utils
 import subprocess
 import sys
 import tempfile
@@ -453,6 +454,7 @@ if not scenarios:
 
 total_scenario_failures = 0
 qps_workers_killed = 0
+merged_resultset = {}
 for scenario in scenarios:
   if args.dry_run:
     print(scenario.name)
@@ -460,14 +462,20 @@ for scenario in scenarios:
     try:
       for worker in scenario.workers:
         worker.start()
-      scenario_failures, _ = jobset.run([scenario.jobspec,
-                                 create_quit_jobspec(scenario.workers, remote_host=args.remote_driver_host)],
-                                 newline_on_success=True, maxjobs=1)
+      scenario_failures, resultset = jobset.run([scenario.jobspec,
+                                                create_quit_jobspec(scenario.workers, remote_host=args.remote_driver_host)],
+                                                newline_on_success=True, maxjobs=1)
       total_scenario_failures += scenario_failures
+      merged_resultset = dict(itertools.chain(merged_resultset.iteritems(),
+                                              resultset.iteritems()))
     finally:
       # Consider qps workers that need to be killed as failures
       qps_workers_killed += finish_qps_workers(scenario.workers)
 
+
+report_utils.render_junit_xml_report(merged_resultset, 'report.xml',
+                                     suite_name='benchmarks')
+
 if total_scenario_failures > 0 or qps_workers_killed > 0:
   print ("%s scenarios failed and %s qps worker jobs killed" % (total_scenario_failures, qps_workers_killed))
   sys.exit(1)

+ 48 - 0
tools/run_tests/run_tests.py

@@ -842,6 +842,53 @@ class Sanity(object):
   def __str__(self):
     return 'sanity'
 
+class NodeExpressLanguage(object):
+  """Dummy Node express test target to enable running express performance
+  benchmarks"""
+
+  def __init__(self):
+    self.platform = platform_string()
+
+  def configure(self, config, args):
+    self.config = config
+    self.args = args
+    _check_compiler(self.args.compiler, ['default', 'node0.12',
+                                         'node4', 'node5', 'node6'])
+    if self.args.compiler == 'default':
+      self.node_version = '4'
+    else:
+      # Take off the word "node"
+      self.node_version = self.args.compiler[4:]
+
+  def test_specs(self):
+    return []
+
+  def pre_build_steps(self):
+    if self.platform == 'windows':
+      return [['tools\\run_tests\\pre_build_node.bat']]
+    else:
+      return [['tools/run_tests/pre_build_node.sh', self.node_version]]
+
+  def make_targets(self):
+    return []
+
+  def make_options(self):
+    return []
+
+  def build_steps(self):
+    return []
+
+  def post_tests_steps(self):
+    return []
+
+  def makefile_name(self):
+    return 'Makefile'
+
+  def dockerfile_dir(self):
+    return 'tools/dockerfile/test/node_jessie_%s' % _docker_arch_suffix(self.args.arch)
+
+  def __str__(self):
+    return 'node_express'
 
 # different configurations we can run under
 with open('tools/run_tests/configs.json') as f:
@@ -852,6 +899,7 @@ _LANGUAGES = {
     'c++': CLanguage('cxx', 'c++'),
     'c': CLanguage('c', 'c'),
     'node': NodeLanguage(),
+    'node_express': NodeExpressLanguage(),
     'php': PhpLanguage(),
     'php7': Php7Language(),
     'python': PythonLanguage(),

+ 67 - 25
tools/run_tests/tests.json

@@ -5156,7 +5156,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_census_test", 
     "platforms": [
@@ -6193,7 +6193,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_compress_test", 
     "platforms": [
@@ -7230,7 +7230,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_fake_resolver_test", 
     "platforms": [
@@ -8254,7 +8254,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_fakesec_test", 
     "platforms": [
@@ -10179,7 +10179,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_full_test", 
     "platforms": [
@@ -11165,7 +11165,7 @@
     "exclude_iomgrs": [
       "uv"
     ], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_full+pipe_test", 
     "platforms": [
@@ -12071,7 +12071,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_full+trace_test", 
     "platforms": [
@@ -13073,7 +13073,7 @@
     "exclude_iomgrs": [
       "uv"
     ], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_http_proxy_test", 
     "platforms": [
@@ -14142,7 +14142,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_load_reporting_test", 
     "platforms": [
@@ -15190,7 +15190,7 @@
     "exclude_iomgrs": [
       "uv"
     ], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_oauth2_test", 
     "platforms": [
@@ -16222,7 +16222,7 @@
     "exclude_iomgrs": [
       "uv"
     ], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_proxy_test", 
     "platforms": [
@@ -20033,7 +20033,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_ssl_test", 
     "platforms": [
@@ -21070,7 +21070,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_ssl_cert_test", 
     "platforms": [
@@ -22070,7 +22070,7 @@
     "exclude_iomgrs": [
       "uv"
     ], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_ssl_proxy_test", 
     "platforms": [
@@ -22995,7 +22995,7 @@
     "exclude_iomgrs": [
       "uv"
     ], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_uds_test", 
     "platforms": [
@@ -24008,7 +24008,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_census_nosec_test", 
     "platforms": [
@@ -25022,7 +25022,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_compress_nosec_test", 
     "platforms": [
@@ -26036,7 +26036,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_fake_resolver_nosec_test", 
     "platforms": [
@@ -27947,7 +27947,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_full_nosec_test", 
     "platforms": [
@@ -28914,7 +28914,7 @@
     "exclude_iomgrs": [
       "uv"
     ], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_full+pipe_nosec_test", 
     "platforms": [
@@ -29797,7 +29797,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_full+trace_nosec_test", 
     "platforms": [
@@ -30775,7 +30775,7 @@
     "exclude_iomgrs": [
       "uv"
     ], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_http_proxy_nosec_test", 
     "platforms": [
@@ -31821,7 +31821,7 @@
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_load_reporting_nosec_test", 
     "platforms": [
@@ -32797,7 +32797,7 @@
     "exclude_iomgrs": [
       "uv"
     ], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_proxy_nosec_test", 
     "platforms": [
@@ -36487,7 +36487,7 @@
     "exclude_iomgrs": [
       "uv"
     ], 
-    "flaky": false, 
+    "flaky": true, 
     "language": "c", 
     "name": "h2_uds_nosec_test", 
     "platforms": [
@@ -37316,6 +37316,27 @@
     "shortname": "json_run_localhost:cpp_protobuf_async_client_sync_server_unary_qps_unconstrained_secure", 
     "timeout_seconds": 180
   }, 
+  {
+    "args": [
+      "--scenarios_json", 
+      "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_client_sync_server_streaming_qps_unconstrained_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}"
+    ], 
+    "boringssl": true, 
+    "ci_platforms": [
+      "linux"
+    ], 
+    "cpu_cost": 8, 
+    "defaults": "boringssl", 
+    "exclude_configs": [], 
+    "flaky": false, 
+    "language": "c++", 
+    "name": "json_run_localhost", 
+    "platforms": [
+      "linux"
+    ], 
+    "shortname": "json_run_localhost:cpp_protobuf_async_client_sync_server_streaming_qps_unconstrained_secure", 
+    "timeout_seconds": 180
+  }, 
   {
     "args": [
       "--scenarios_json", 
@@ -37652,6 +37673,27 @@
     "shortname": "json_run_localhost:cpp_protobuf_async_client_sync_server_unary_qps_unconstrained_insecure", 
     "timeout_seconds": 180
   }, 
+  {
+    "args": [
+      "--scenarios_json", 
+      "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_client_sync_server_streaming_qps_unconstrained_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}"
+    ], 
+    "boringssl": true, 
+    "ci_platforms": [
+      "linux"
+    ], 
+    "cpu_cost": 8, 
+    "defaults": "boringssl", 
+    "exclude_configs": [], 
+    "flaky": false, 
+    "language": "c++", 
+    "name": "json_run_localhost", 
+    "platforms": [
+      "linux"
+    ], 
+    "shortname": "json_run_localhost:cpp_protobuf_async_client_sync_server_streaming_qps_unconstrained_insecure", 
+    "timeout_seconds": 180
+  }, 
   {
     "args": [
       "--scenarios_json",