Explorar o código

Merge branch 'master' into poll-cv-disable

Sree Kuchibhotla %!s(int64=8) %!d(string=hai) anos
pai
achega
6687cb8297
Modificáronse 100 ficheiros con 1063 adicións e 370 borrados
  1. 5 0
      BUILD
  2. 4 0
      CMakeLists.txt
  3. 4 0
      Makefile
  4. 1 0
      build.yaml
  5. 0 1
      composer.json
  6. 11 17
      src/compiler/php_generator.cc
  7. 2 1
      src/compiler/php_generator.h
  8. 12 3
      src/compiler/php_generator_helpers.h
  9. 12 8
      src/compiler/php_plugin.cc
  10. 3 3
      src/core/ext/client_channel/connector.c
  11. 4 3
      src/core/ext/client_channel/connector.h
  12. 7 3
      src/core/ext/client_channel/http_connect_handshaker.c
  13. 2 1
      src/core/ext/client_channel/subchannel.c
  14. 9 5
      src/core/ext/transport/chttp2/client/chttp2_connector.c
  15. 8 4
      src/core/ext/transport/chttp2/server/chttp2_server.c
  16. 1 1
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  17. 9 5
      src/core/lib/channel/handshaker.c
  18. 5 3
      src/core/lib/channel/handshaker.h
  19. 3 2
      src/core/lib/iomgr/endpoint.c
  20. 3 2
      src/core/lib/iomgr/endpoint.h
  21. 11 6
      src/core/lib/iomgr/ev_epoll_linux.c
  22. 11 6
      src/core/lib/iomgr/ev_poll_posix.c
  23. 2 2
      src/core/lib/iomgr/ev_posix.c
  24. 2 2
      src/core/lib/iomgr/ev_posix.h
  25. 2 1
      src/core/lib/iomgr/network_status_tracker.c
  26. 2 1
      src/core/lib/iomgr/tcp_client_posix.c
  27. 3 2
      src/core/lib/iomgr/tcp_posix.c
  28. 4 2
      src/core/lib/iomgr/tcp_server_posix.c
  29. 3 1
      src/core/lib/iomgr/tcp_uv.c
  30. 20 7
      src/core/lib/iomgr/tcp_windows.c
  31. 2 1
      src/core/lib/iomgr/udp_server.c
  32. 3 3
      src/core/lib/security/transport/secure_endpoint.c
  33. 9 4
      src/core/lib/security/transport/security_handshaker.c
  34. 16 0
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  35. 10 1
      src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
  36. 4 1
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  37. 1 1
      src/csharp/Grpc.Core/Internal/NativeMethods.cs
  38. 3 2
      src/csharp/ext/grpc_csharp_ext.c
  39. 7 7
      src/php/composer.json
  40. 22 19
      src/php/tests/generated_code/AbstractGeneratedCodeTest.php
  41. 1 1
      src/php/tests/generated_code/GeneratedCodeTest.php
  42. 1 1
      src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php
  43. 44 40
      src/php/tests/interop/interop_client.php
  44. 6 0
      src/proto/grpc/testing/BUILD
  45. 2 2
      src/python/grpcio/grpc/_channel.py
  46. 1 2
      src/python/grpcio/grpc/beta/_server_adaptations.py
  47. 2 3
      src/python/grpcio/grpc/framework/foundation/logging_pool.py
  48. 8 10
      src/python/grpcio_tests/tests/interop/methods.py
  49. 1 2
      src/python/grpcio_tests/tests/unit/_cython/_channel_test.py
  50. 0 1
      templates/composer.json.template
  51. 7 7
      templates/src/php/composer.json.template
  52. 4 2
      test/core/bad_client/bad_client.c
  53. 3 1
      test/core/client_channel/set_initial_connect_string_test.c
  54. 2 1
      test/core/end2end/bad_server_response_test.c
  55. 6 3
      test/core/end2end/fixtures/http_proxy.c
  56. 7 23
      test/core/end2end/fuzzers/api_fuzzer.c
  57. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/0242a9f4d4fafc96ee9ed762b610e3c68d6efdec
  58. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/0cd9696699bd190463ecef91968624601b64cd8b
  59. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/122b6fc72956541812dd653b726b073b77ca33be
  60. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/315d27e12f2214a56fb9901dacff14852ff2ac0f
  61. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/3f2429e3255ae36fecb57559b57d2b0cb88f5dd1
  62. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/41bda7ff09175f821992adf4314a8ec3007ffe55
  63. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/49d0085058d7fa81247f51b802c0f4206854b4dc
  64. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/614dbc86b17270ef1d5ab705ecbe88c742815ce7
  65. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/6cb17148d52be437332b6fd6f2fc8328bfb63fb0
  66. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/6ea192b1d4c4577ca7511f8ce5027b31b2e0d75d
  67. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/746477e7e8f093f87cb6924ab6476cda9689607d
  68. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/7752153d87017b85112a49ea95aa25ca78d24431
  69. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/7e75ea44aa7347c2f827beecb27e3bf5b1907b8a
  70. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/95e73caecc0ab06beaa9b84125adcb2e6eee2eff
  71. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/9e273a94bf3c60f1c7875874c81d0b9309428752
  72. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/a65bda38b60ae084a5dcc3b616660aa338feef17
  73. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/b39f27387a256019038cddb91f65651c01afb825
  74. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/b6f721156f8dc6a353555929e459e61bab8b394a
  75. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/bbb2429766a7c4ef9cb7110d567fd48cd6507dc5
  76. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/bc330aa616a792ff22a8c7428dcdb4d99accbe4b
  77. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/cd4ccfa79f65f31716296e690f3a76007edde2e3
  78. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/crash-5d73de981fb75553a7b2606e111716ee9f2af844
  79. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/e6b74f64e8bdfdf98177aee58b8729ff2aa7ffb2
  80. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/edecc59c5809796f266abd8df4d5ecf6aae304ca
  81. BIN=BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/f1b2889ae7091d6a14332343fe7a2bffd81039a7
  82. 1 1
      test/core/internal_api_canaries/iomgr.c
  83. 8 4
      test/core/iomgr/endpoint_tests.c
  84. 2 1
      test/core/iomgr/ev_epoll_linux_test.c
  85. 2 1
      test/core/iomgr/fd_posix_test.c
  86. 2 1
      test/core/iomgr/tcp_client_posix_test.c
  87. 1 1
      test/core/iomgr/tcp_server_posix_test.c
  88. 4 2
      test/core/security/secure_endpoint_test.c
  89. 2 1
      test/core/security/ssl_server_fuzzer.c
  90. 1 1
      test/core/surface/concurrent_connectivity_test.c
  91. 5 3
      test/core/util/mock_endpoint.c
  92. 7 3
      test/core/util/passthru_endpoint.c
  93. 1 1
      test/core/util/reconnect_server.c
  94. 74 26
      test/cpp/microbenchmarks/bm_fullstack.cc
  95. 145 30
      test/cpp/util/cli_call.cc
  96. 51 0
      test/cpp/util/cli_call.h
  97. 3 3
      test/cpp/util/grpc_cli.cc
  98. 195 58
      test/cpp/util/grpc_tool.cc
  99. 193 0
      test/cpp/util/grpc_tool_test.cc
  100. 29 3
      test/cpp/util/proto_file_parser.cc

+ 5 - 0
BUILD

@@ -1183,6 +1183,7 @@ grpc_cc_library(
         "include/grpc++/impl/codegen/core_codegen_interface.h",
         "include/grpc++/impl/codegen/create_auth_context.h",
         "include/grpc++/impl/codegen/grpc_library.h",
+        "include/grpc++/impl/codegen/metadata_map.h",
         "include/grpc++/impl/codegen/method_handler_impl.h",
         "include/grpc++/impl/codegen/rpc_method.h",
         "include/grpc++/impl/codegen/rpc_service_method.h",
@@ -1191,6 +1192,7 @@ grpc_cc_library(
         "include/grpc++/impl/codegen/server_context.h",
         "include/grpc++/impl/codegen/server_interface.h",
         "include/grpc++/impl/codegen/service_type.h",
+        "include/grpc++/impl/codegen/slice.h",
         "include/grpc++/impl/codegen/status.h",
         "include/grpc++/impl/codegen/status_code_enum.h",
         "include/grpc++/impl/codegen/status_helper.h",
@@ -1233,6 +1235,9 @@ grpc_cc_library(
     public_hdrs = [
         "include/grpc++/impl/codegen/config_protobuf.h",
     ],
+    external_deps = [
+        "protobuf",
+    ],
 )
 
 grpc_cc_library(

+ 4 - 0
CMakeLists.txt

@@ -1615,6 +1615,7 @@ foreach(_hdr
   include/grpc++/impl/codegen/core_codegen_interface.h
   include/grpc++/impl/codegen/create_auth_context.h
   include/grpc++/impl/codegen/grpc_library.h
+  include/grpc++/impl/codegen/metadata_map.h
   include/grpc++/impl/codegen/method_handler_impl.h
   include/grpc++/impl/codegen/rpc_method.h
   include/grpc++/impl/codegen/rpc_service_method.h
@@ -1960,6 +1961,7 @@ foreach(_hdr
   include/grpc++/impl/codegen/core_codegen_interface.h
   include/grpc++/impl/codegen/create_auth_context.h
   include/grpc++/impl/codegen/grpc_library.h
+  include/grpc++/impl/codegen/metadata_map.h
   include/grpc++/impl/codegen/method_handler_impl.h
   include/grpc++/impl/codegen/rpc_method.h
   include/grpc++/impl/codegen/rpc_service_method.h
@@ -2225,6 +2227,7 @@ foreach(_hdr
   include/grpc++/impl/codegen/core_codegen_interface.h
   include/grpc++/impl/codegen/create_auth_context.h
   include/grpc++/impl/codegen/grpc_library.h
+  include/grpc++/impl/codegen/metadata_map.h
   include/grpc++/impl/codegen/method_handler_impl.h
   include/grpc++/impl/codegen/rpc_method.h
   include/grpc++/impl/codegen/rpc_service_method.h
@@ -2383,6 +2386,7 @@ foreach(_hdr
   include/grpc++/impl/codegen/core_codegen_interface.h
   include/grpc++/impl/codegen/create_auth_context.h
   include/grpc++/impl/codegen/grpc_library.h
+  include/grpc++/impl/codegen/metadata_map.h
   include/grpc++/impl/codegen/method_handler_impl.h
   include/grpc++/impl/codegen/rpc_method.h
   include/grpc++/impl/codegen/rpc_service_method.h

+ 4 - 0
Makefile

@@ -3896,6 +3896,7 @@ PUBLIC_HEADERS_CXX += \
     include/grpc++/impl/codegen/core_codegen_interface.h \
     include/grpc++/impl/codegen/create_auth_context.h \
     include/grpc++/impl/codegen/grpc_library.h \
+    include/grpc++/impl/codegen/metadata_map.h \
     include/grpc++/impl/codegen/method_handler_impl.h \
     include/grpc++/impl/codegen/rpc_method.h \
     include/grpc++/impl/codegen/rpc_service_method.h \
@@ -4268,6 +4269,7 @@ PUBLIC_HEADERS_CXX += \
     include/grpc++/impl/codegen/core_codegen_interface.h \
     include/grpc++/impl/codegen/create_auth_context.h \
     include/grpc++/impl/codegen/grpc_library.h \
+    include/grpc++/impl/codegen/metadata_map.h \
     include/grpc++/impl/codegen/method_handler_impl.h \
     include/grpc++/impl/codegen/rpc_method.h \
     include/grpc++/impl/codegen/rpc_service_method.h \
@@ -4626,6 +4628,7 @@ PUBLIC_HEADERS_CXX += \
     include/grpc++/impl/codegen/core_codegen_interface.h \
     include/grpc++/impl/codegen/create_auth_context.h \
     include/grpc++/impl/codegen/grpc_library.h \
+    include/grpc++/impl/codegen/metadata_map.h \
     include/grpc++/impl/codegen/method_handler_impl.h \
     include/grpc++/impl/codegen/rpc_method.h \
     include/grpc++/impl/codegen/rpc_service_method.h \
@@ -4807,6 +4810,7 @@ PUBLIC_HEADERS_CXX += \
     include/grpc++/impl/codegen/core_codegen_interface.h \
     include/grpc++/impl/codegen/create_auth_context.h \
     include/grpc++/impl/codegen/grpc_library.h \
+    include/grpc++/impl/codegen/metadata_map.h \
     include/grpc++/impl/codegen/method_handler_impl.h \
     include/grpc++/impl/codegen/rpc_method.h \
     include/grpc++/impl/codegen/rpc_service_method.h \

+ 1 - 0
build.yaml

@@ -825,6 +825,7 @@ filegroups:
   - include/grpc++/impl/codegen/core_codegen_interface.h
   - include/grpc++/impl/codegen/create_auth_context.h
   - include/grpc++/impl/codegen/grpc_library.h
+  - include/grpc++/impl/codegen/metadata_map.h
   - include/grpc++/impl/codegen/method_handler_impl.h
   - include/grpc++/impl/codegen/rpc_method.h
   - include/grpc++/impl/codegen/rpc_service_method.h

+ 0 - 1
composer.json

@@ -7,7 +7,6 @@
   "license": "BSD-3-Clause",
   "require": {
     "php": ">=5.5.0",
-    "ext-grpc": "*",
     "google/protobuf": "v3.1.0-alpha-1"
   },
   "require-dev": {

+ 11 - 17
src/compiler/php_generator.cc

@@ -134,29 +134,15 @@ void PrintService(const ServiceDescriptor *service, Printer *out) {
   out->Outdent();
   out->Print("}\n\n");
 }
-
-void PrintServices(const FileDescriptor *file, Printer *out) {
-  map<grpc::string, grpc::string> vars;
-  vars["package"] = MessageIdentifierName(file->package());
-  out->Print(vars, "namespace $package$ {\n\n");
-  out->Indent();
-  for (int i = 0; i < file->service_count(); i++) {
-    PrintService(file->service(i), out);
-  }
-  out->Outdent();
-  out->Print("}\n");
-}
 }
 
-grpc::string GenerateFile(const FileDescriptor *file) {
+grpc::string GenerateFile(const FileDescriptor *file,
+                          const ServiceDescriptor *service) {
   grpc::string output;
   {
     StringOutputStream output_stream(&output);
     Printer out(&output_stream, '$');
 
-    if (file->service_count() == 0) {
-      return output;
-    }
     out.Print("<?php\n");
     out.Print("// GENERATED CODE -- DO NOT EDIT!\n\n");
 
@@ -166,7 +152,15 @@ grpc::string GenerateFile(const FileDescriptor *file) {
       out.Print(leading_comments.c_str());
     }
 
-    PrintServices(file, &out);
+    map<grpc::string, grpc::string> vars;
+    vars["package"] = MessageIdentifierName(file->package());
+    out.Print(vars, "namespace $package$ {\n\n");
+    out.Indent();
+
+    PrintService(service, &out);
+
+    out.Outdent();
+    out.Print("}\n");
   }
   return output;
 }

+ 2 - 1
src/compiler/php_generator.h

@@ -38,7 +38,8 @@
 
 namespace grpc_php_generator {
 
-grpc::string GenerateFile(const grpc::protobuf::FileDescriptor *file);
+grpc::string GenerateFile(const grpc::protobuf::FileDescriptor *file,
+                          const grpc::protobuf::ServiceDescriptor *service);
 
 }  // namespace grpc_php_generator
 

+ 12 - 3
src/compiler/php_generator_helpers.h

@@ -41,14 +41,23 @@
 
 namespace grpc_php_generator {
 
-inline grpc::string GetPHPServiceFilename(const grpc::string& filename) {
-  return grpc_generator::StripProto(filename) + "_grpc_pb.php";
+inline grpc::string GetPHPServiceFilename(
+    const grpc::protobuf::FileDescriptor *file,
+    const grpc::protobuf::ServiceDescriptor *service) {
+  std::vector<grpc::string> tokens =
+      grpc_generator::tokenize(file->package(), ".");
+  std::ostringstream oss;
+  for (unsigned int i = 0; i < tokens.size(); i++) {
+    oss << (i == 0 ? "" : "/")
+        << grpc_generator::CapitalizeFirstLetter(tokens[i]);
+  }
+  return oss.str() + "/" + service->name() + "Client.php";
 }
 
 // Get leading or trailing comments in a string. Comment lines start with "// ".
 // Leading detached comments are put in in front of leading comments.
 template <typename DescriptorType>
-inline grpc::string GetPHPComments(const DescriptorType* desc,
+inline grpc::string GetPHPComments(const DescriptorType *desc,
                                    grpc::string prefix) {
   return grpc_generator::GetPrefixedComments(desc, true, prefix);
 }

+ 12 - 8
src/compiler/php_plugin.cc

@@ -51,18 +51,22 @@ class PHPGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
                 const grpc::string &parameter,
                 grpc::protobuf::compiler::GeneratorContext *context,
                 grpc::string *error) const {
-    grpc::string code = GenerateFile(file);
-    if (code.size() == 0) {
+    if (file->service_count() == 0) {
       return true;
     }
 
-    // Get output file name
-    grpc::string file_name = GetPHPServiceFilename(file->name());
+    for (int i = 0; i < file->service_count(); i++) {
+      grpc::string code = GenerateFile(file, file->service(i));
+
+      // Get output file name
+      grpc::string file_name = GetPHPServiceFilename(file, file->service(i));
+
+      std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output(
+          context->Open(file_name));
+      grpc::protobuf::io::CodedOutputStream coded_out(output.get());
+      coded_out.WriteRaw(code.data(), code.size());
+    }
 
-    std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output(
-        context->Open(file_name));
-    grpc::protobuf::io::CodedOutputStream coded_out(output.get());
-    coded_out.WriteRaw(code.data(), code.size());
     return true;
   }
 };

+ 3 - 3
src/core/ext/client_channel/connector.c

@@ -49,7 +49,7 @@ void grpc_connector_connect(grpc_exec_ctx* exec_ctx, grpc_connector* connector,
   connector->vtable->connect(exec_ctx, connector, in_args, out_args, notify);
 }
 
-void grpc_connector_shutdown(grpc_exec_ctx* exec_ctx,
-                             grpc_connector* connector) {
-  connector->vtable->shutdown(exec_ctx, connector);
+void grpc_connector_shutdown(grpc_exec_ctx* exec_ctx, grpc_connector* connector,
+                             grpc_error* why) {
+  connector->vtable->shutdown(exec_ctx, connector, why);
 }

+ 4 - 3
src/core/ext/client_channel/connector.h

@@ -68,7 +68,8 @@ struct grpc_connector_vtable {
   void (*ref)(grpc_connector *connector);
   void (*unref)(grpc_exec_ctx *exec_ctx, grpc_connector *connector);
   /** Implementation of grpc_connector_shutdown */
-  void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_connector *connector);
+  void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
+                   grpc_error *why);
   /** Implementation of grpc_connector_connect */
   void (*connect)(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
                   const grpc_connect_in_args *in_args,
@@ -83,7 +84,7 @@ void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
                             grpc_connect_out_args *out_args,
                             grpc_closure *notify);
 /** Cancel any pending connection */
-void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx,
-                             grpc_connector *connector);
+void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
+                             grpc_error *why);
 
 #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H */

+ 7 - 3
src/core/ext/client_channel/http_connect_handshaker.c

@@ -123,7 +123,8 @@ static void handshake_failed_locked(grpc_exec_ctx* exec_ctx,
     // before destroying them, even if we know that there are no
     // pending read/write callbacks.  This should be fixed, at which
     // point this can be removed.
-    grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
+    grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint,
+                           GRPC_ERROR_REF(error));
     // Not shutting down, so the handshake failed.  Clean up before
     // invoking the callback.
     cleanup_args_for_failure_locked(exec_ctx, handshaker);
@@ -251,15 +252,18 @@ static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx,
 }
 
 static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
-                                             grpc_handshaker* handshaker_in) {
+                                             grpc_handshaker* handshaker_in,
+                                             grpc_error* why) {
   http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
   gpr_mu_lock(&handshaker->mu);
   if (!handshaker->shutdown) {
     handshaker->shutdown = true;
-    grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
+    grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint,
+                           GRPC_ERROR_REF(why));
     cleanup_args_for_failure_locked(exec_ctx, handshaker);
   }
   gpr_mu_unlock(&handshaker->mu);
+  GRPC_ERROR_UNREF(why);
 }
 
 static void http_connect_handshaker_do_handshake(

+ 2 - 1
src/core/ext/client_channel/subchannel.c

@@ -273,7 +273,8 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
   gpr_mu_lock(&c->mu);
   GPR_ASSERT(!c->disconnected);
   c->disconnected = true;
-  grpc_connector_shutdown(exec_ctx, c->connector);
+  grpc_connector_shutdown(exec_ctx, c->connector,
+                          GRPC_ERROR_CREATE("Subchannel disconnected"));
   con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
   if (con != NULL) {
     GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection");

+ 9 - 5
src/core/ext/transport/chttp2/client/chttp2_connector.c

@@ -92,19 +92,21 @@ static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx,
 }
 
 static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx,
-                                      grpc_connector *con) {
+                                      grpc_connector *con, grpc_error *why) {
   chttp2_connector *c = (chttp2_connector *)con;
   gpr_mu_lock(&c->mu);
   c->shutdown = true;
   if (c->handshake_mgr != NULL) {
-    grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr);
+    grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr,
+                                    GRPC_ERROR_REF(why));
   }
   // If handshaking is not yet in progress, shutdown the endpoint.
   // Otherwise, the handshaker will do this for us.
   if (!c->connecting && c->endpoint != NULL) {
-    grpc_endpoint_shutdown(exec_ctx, c->endpoint);
+    grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(why));
   }
   gpr_mu_unlock(&c->mu);
+  GRPC_ERROR_UNREF(why);
 }
 
 static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
@@ -121,7 +123,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
       // before destroying them, even if we know that there are no
       // pending read/write callbacks.  This should be fixed, at which
       // point this can be removed.
-      grpc_endpoint_shutdown(exec_ctx, args->endpoint);
+      grpc_endpoint_shutdown(exec_ctx, args->endpoint, GRPC_ERROR_REF(error));
       grpc_endpoint_destroy(exec_ctx, args->endpoint);
       grpc_channel_args_destroy(exec_ctx, args->args);
       grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer);
@@ -195,7 +197,9 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
     grpc_closure *notify = c->notify;
     c->notify = NULL;
     grpc_closure_sched(exec_ctx, notify, error);
-    if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint);
+    if (c->endpoint != NULL) {
+      grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(error));
+    }
     gpr_mu_unlock(&c->mu);
     chttp2_connector_unref(exec_ctx, arg);
   } else {

+ 8 - 4
src/core/ext/transport/chttp2/server/chttp2_server.c

@@ -101,16 +101,19 @@ static void pending_handshake_manager_remove_locked(
 }
 
 static void pending_handshake_manager_shutdown_locked(grpc_exec_ctx *exec_ctx,
-                                                      server_state *state) {
+                                                      server_state *state,
+                                                      grpc_error *why) {
   pending_handshake_manager_node *prev_node = NULL;
   for (pending_handshake_manager_node *node = state->pending_handshake_mgrs;
        node != NULL; node = node->next) {
-    grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr);
+    grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr,
+                                    GRPC_ERROR_REF(why));
     gpr_free(prev_node);
     prev_node = node;
   }
   gpr_free(prev_node);
   state->pending_handshake_mgrs = NULL;
+  GRPC_ERROR_UNREF(why);
 }
 
 static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
@@ -129,7 +132,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
       // before destroying them, even if we know that there are no
       // pending read/write callbacks.  This should be fixed, at which
       // point this can be removed.
-      grpc_endpoint_shutdown(exec_ctx, args->endpoint);
+      grpc_endpoint_shutdown(exec_ctx, args->endpoint, GRPC_ERROR_NONE);
       grpc_endpoint_destroy(exec_ctx, args->endpoint);
       grpc_channel_args_destroy(exec_ctx, args->args);
       grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer);
@@ -210,7 +213,8 @@ static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg,
   gpr_mu_lock(&state->mu);
   grpc_closure *destroy_done = state->server_destroy_listener_done;
   GPR_ASSERT(state->shutdown);
-  pending_handshake_manager_shutdown_locked(exec_ctx, state);
+  pending_handshake_manager_shutdown_locked(exec_ctx, state,
+                                            GRPC_ERROR_REF(error));
   gpr_mu_unlock(&state->mu);
   // Flush queued work before destroying handshaker factory, since that
   // may do a synchronous unref.

+ 1 - 1
src/core/ext/transport/chttp2/transport/chttp2_transport.c

@@ -417,7 +417,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
     t->closed = 1;
     connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
                            GRPC_ERROR_REF(error), "close_transport");
-    grpc_endpoint_shutdown(exec_ctx, t->ep);
+    grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
 
     /* flush writable stream list to avoid dangling references */
     grpc_chttp2_stream *s;

+ 9 - 5
src/core/lib/channel/handshaker.c

@@ -55,8 +55,8 @@ void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
 }
 
 void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
-                              grpc_handshaker* handshaker) {
-  handshaker->vtable->shutdown(exec_ctx, handshaker);
+                              grpc_handshaker* handshaker, grpc_error* why) {
+  handshaker->vtable->shutdown(exec_ctx, handshaker, why);
 }
 
 void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
@@ -141,14 +141,17 @@ void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
 }
 
 void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
-                                     grpc_handshake_manager* mgr) {
+                                     grpc_handshake_manager* mgr,
+                                     grpc_error* why) {
   gpr_mu_lock(&mgr->mu);
   // Shutdown the handshaker that's currently in progress, if any.
   if (!mgr->shutdown && mgr->index > 0) {
     mgr->shutdown = true;
-    grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1]);
+    grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1],
+                             GRPC_ERROR_REF(why));
   }
   gpr_mu_unlock(&mgr->mu);
+  GRPC_ERROR_UNREF(why);
 }
 
 // Helper function to call either the next handshaker or the
@@ -197,7 +200,8 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg,
 static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
   grpc_handshake_manager* mgr = arg;
   if (error == GRPC_ERROR_NONE) {  // Timer fired, rather than being cancelled.
-    grpc_handshake_manager_shutdown(exec_ctx, mgr);
+    grpc_handshake_manager_shutdown(exec_ctx, mgr,
+                                    GRPC_ERROR_CREATE("Handshake timed out"));
   }
   grpc_handshake_manager_unref(exec_ctx, mgr);
 }

+ 5 - 3
src/core/lib/channel/handshaker.h

@@ -86,7 +86,8 @@ typedef struct {
 
   /// Shuts down the handshaker (e.g., to clean up when the operation is
   /// aborted in the middle).
-  void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker);
+  void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker,
+                   grpc_error* why);
 
   /// Performs handshaking, modifying \a args as needed (e.g., to
   /// replace \a endpoint with a wrapped endpoint).
@@ -111,7 +112,7 @@ void grpc_handshaker_init(const grpc_handshaker_vtable* vtable,
 void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
                              grpc_handshaker* handshaker);
 void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
-                              grpc_handshaker* handshaker);
+                              grpc_handshaker* handshaker, grpc_error* why);
 void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
                                   grpc_handshaker* handshaker,
                                   grpc_tcp_server_acceptor* acceptor,
@@ -141,7 +142,8 @@ void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
 /// The caller must still call grpc_handshake_manager_destroy() after
 /// calling this function.
 void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
-                                     grpc_handshake_manager* mgr);
+                                     grpc_handshake_manager* mgr,
+                                     grpc_error* why);
 
 /// Invokes handshakers in the order they were added.
 /// Takes ownership of \a endpoint, and then passes that ownership to

+ 3 - 2
src/core/lib/iomgr/endpoint.c

@@ -54,8 +54,9 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx* exec_ctx,
   ep->vtable->add_to_pollset_set(exec_ctx, ep, pollset_set);
 }
 
-void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {
-  ep->vtable->shutdown(exec_ctx, ep);
+void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+                            grpc_error* why) {
+  ep->vtable->shutdown(exec_ctx, ep, why);
 }
 
 void grpc_endpoint_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {

+ 3 - 2
src/core/lib/iomgr/endpoint.h

@@ -57,7 +57,7 @@ struct grpc_endpoint_vtable {
                          grpc_pollset *pollset);
   void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
                              grpc_pollset_set *pollset);
-  void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
+  void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why);
   void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
   grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep);
   char *(*get_peer)(grpc_endpoint *ep);
@@ -96,7 +96,8 @@ void grpc_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
 
 /* Causes any pending and future read/write callbacks to run immediately with
    success==0 */
-void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
+void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+                            grpc_error *why);
 void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
 
 /* Add an endpoint to a pollset, so that when the pollset is polled, events from

+ 11 - 6
src/core/lib/iomgr/ev_epoll_linux.c

@@ -143,6 +143,7 @@ struct grpc_fd {
   /* Indicates that the fd is shutdown and that any pending read/write closures
      should fail */
   bool shutdown;
+  grpc_error *shutdown_error; /* reason for shutdown: set iff shutdown==true */
 
   /* The fd is either closed or we relinquished control of it. In either cases,
      this indicates that the 'fd' on this structure is no longer valid */
@@ -907,6 +908,7 @@ static void unref_by(grpc_fd *fd, int n) {
     fd->freelist_next = fd_freelist;
     fd_freelist = fd;
     grpc_iomgr_unregister_object(&fd->iomgr_object);
+    if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
 
     gpr_mu_unlock(&fd_freelist_mu);
   } else {
@@ -1058,11 +1060,11 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
   GRPC_ERROR_UNREF(error);
 }
 
-static grpc_error *fd_shutdown_error(bool shutdown) {
-  if (!shutdown) {
+static grpc_error *fd_shutdown_error(grpc_fd *fd) {
+  if (!fd->shutdown) {
     return GRPC_ERROR_NONE;
   } else {
-    return GRPC_ERROR_CREATE("FD shutdown");
+    return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
   }
 }
 
@@ -1076,7 +1078,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
   } else if (*st == CLOSURE_READY) {
     /* already ready ==> queue the closure to run immediately */
     *st = CLOSURE_NOT_READY;
-    grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
+    grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
   } else {
     /* upcallptr was set to a different closure.  This is an error! */
     gpr_log(GPR_ERROR,
@@ -1098,7 +1100,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
     return 0;
   } else {
     /* waiting ==> queue closure */
-    grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
+    grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
     *st = CLOSURE_NOT_READY;
     return 1;
   }
@@ -1123,17 +1125,20 @@ static bool fd_is_shutdown(grpc_fd *fd) {
 }
 
 /* Might be called multiple times */
-static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
   gpr_mu_lock(&fd->po.mu);
   /* Do the actual shutdown only once */
   if (!fd->shutdown) {
     fd->shutdown = true;
+    fd->shutdown_error = why;
 
     shutdown(fd->fd, SHUT_RDWR);
     /* Flush any pending read and write closures. Since fd->shutdown is 'true'
        at this point, the closures would be called with 'success = false' */
     set_ready_locked(exec_ctx, fd, &fd->read_closure);
     set_ready_locked(exec_ctx, fd, &fd->write_closure);
+  } else {
+    GRPC_ERROR_UNREF(why);
   }
   gpr_mu_unlock(&fd->po.mu);
 }

+ 11 - 6
src/core/lib/iomgr/ev_poll_posix.c

@@ -82,6 +82,7 @@ struct grpc_fd {
   int shutdown;
   int closed;
   int released;
+  grpc_error *shutdown_error;
 
   /* The watcher list.
 
@@ -306,6 +307,7 @@ static void unref_by(grpc_fd *fd, int n) {
   if (old == n) {
     gpr_mu_destroy(&fd->mu);
     grpc_iomgr_unregister_object(&fd->iomgr_object);
+    if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
     gpr_free(fd);
   } else {
     GPR_ASSERT(old > n);
@@ -444,11 +446,11 @@ static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
 static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
 #endif
 
-static grpc_error *fd_shutdown_error(bool shutdown) {
-  if (!shutdown) {
+static grpc_error *fd_shutdown_error(grpc_fd *fd) {
+  if (!fd->shutdown) {
     return GRPC_ERROR_NONE;
   } else {
-    return GRPC_ERROR_CREATE("FD shutdown");
+    return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
   }
 }
 
@@ -462,7 +464,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
   } else if (*st == CLOSURE_READY) {
     /* already ready ==> queue the closure to run immediately */
     *st = CLOSURE_NOT_READY;
-    grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
+    grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
     maybe_wake_one_watcher_locked(fd);
   } else {
     /* upcallptr was set to a different closure.  This is an error! */
@@ -485,7 +487,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
     return 0;
   } else {
     /* waiting ==> queue closure */
-    grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
+    grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
     *st = CLOSURE_NOT_READY;
     return 1;
   }
@@ -496,15 +498,18 @@ static void set_read_notifier_pollset_locked(
   fd->read_notifier_pollset = read_notifier_pollset;
 }
 
-static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
   gpr_mu_lock(&fd->mu);
   /* only shutdown once */
   if (!fd->shutdown) {
     fd->shutdown = 1;
+    fd->shutdown_error = why;
     /* signal read/write closed to OS so that future operations fail */
     shutdown(fd->fd, SHUT_RDWR);
     set_ready_locked(exec_ctx, fd, &fd->read_closure);
     set_ready_locked(exec_ctx, fd, &fd->write_closure);
+  } else {
+    GRPC_ERROR_UNREF(why);
   }
   gpr_mu_unlock(&fd->mu);
 }

+ 2 - 2
src/core/lib/iomgr/ev_posix.c

@@ -162,8 +162,8 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
   g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, reason);
 }
 
-void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
-  g_event_engine->fd_shutdown(exec_ctx, fd);
+void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
+  g_event_engine->fd_shutdown(exec_ctx, fd, why);
 }
 
 bool grpc_fd_is_shutdown(grpc_fd *fd) {

+ 2 - 2
src/core/lib/iomgr/ev_posix.h

@@ -51,7 +51,7 @@ typedef struct grpc_event_engine_vtable {
   int (*fd_wrapped_fd)(grpc_fd *fd);
   void (*fd_orphan)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
                     int *release_fd, const char *reason);
-  void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
+  void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why);
   void (*fd_notify_on_read)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
                             grpc_closure *closure);
   void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
@@ -140,7 +140,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
 bool grpc_fd_is_shutdown(grpc_fd *fd);
 
 /* Cause any current and future callbacks to fail. */
-void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
+void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why);
 
 /* Register read interest, causing read_cb to be called once when fd becomes
    readable, on deadline specified by deadline, or on shutdown triggered by

+ 2 - 1
src/core/lib/iomgr/network_status_tracker.c

@@ -117,7 +117,8 @@ void grpc_network_status_shutdown_all_endpoints() {
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
 
   for (endpoint_ll_node *curr = head; curr != NULL; curr = curr->next) {
-    curr->ep->vtable->shutdown(&exec_ctx, curr->ep);
+    curr->ep->vtable->shutdown(&exec_ctx, curr->ep,
+                               GRPC_ERROR_CREATE("Network unavailable"));
   }
   gpr_mu_unlock(&g_endpoint_mutex);
   grpc_exec_ctx_finish(&exec_ctx);

+ 2 - 1
src/core/lib/iomgr/tcp_client_posix.c

@@ -121,7 +121,8 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
   }
   gpr_mu_lock(&ac->mu);
   if (ac->fd != NULL) {
-    grpc_fd_shutdown(exec_ctx, ac->fd);
+    grpc_fd_shutdown(exec_ctx, ac->fd,
+                     GRPC_ERROR_CREATE("connect() timed out"));
   }
   done = (--ac->refs == 0);
   gpr_mu_unlock(&ac->mu);

+ 3 - 2
src/core/lib/iomgr/tcp_posix.c

@@ -119,9 +119,10 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
 static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
                              grpc_error *error);
 
-static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+                         grpc_error *why) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
-  grpc_fd_shutdown(exec_ctx, tcp->em_fd);
+  grpc_fd_shutdown(exec_ctx, tcp->em_fd, why);
   grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
 }
 

+ 4 - 2
src/core/lib/iomgr/tcp_server_posix.c

@@ -276,7 +276,8 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
   if (s->active_ports) {
     grpc_tcp_listener *sp;
     for (sp = s->head; sp; sp = sp->next) {
-      grpc_fd_shutdown(exec_ctx, sp->emfd);
+      grpc_fd_shutdown(exec_ctx, sp->emfd,
+                       GRPC_ERROR_CREATE("Server destroyed"));
     }
     gpr_mu_unlock(&s->mu);
   } else {
@@ -773,7 +774,8 @@ void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
   if (s->active_ports) {
     grpc_tcp_listener *sp;
     for (sp = s->head; sp; sp = sp->next) {
-      grpc_fd_shutdown(exec_ctx, sp->emfd);
+      grpc_fd_shutdown(exec_ctx, sp->emfd,
+                       GRPC_ERROR_CREATE("Server shutdown"));
     }
   }
   gpr_mu_unlock(&s->mu);

+ 3 - 1
src/core/lib/iomgr/tcp_uv.c

@@ -298,13 +298,15 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
 
 static void shutdown_callback(uv_shutdown_t *req, int status) {}
 
-static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+                                 grpc_error *why) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   if (!tcp->shutting_down) {
     tcp->shutting_down = true;
     uv_shutdown_t *req = &tcp->shutdown_req;
     uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback);
   }
+  GRPC_ERROR_UNREF(why);
 }
 
 static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {

+ 20 - 7
src/core/lib/iomgr/tcp_windows.c

@@ -116,6 +116,7 @@ typedef struct grpc_tcp {
      to protect ourselves when requesting a shutdown. */
   gpr_mu mu;
   int shutting_down;
+  grpc_error *shutdown_error;
 
   char *peer_string;
 } grpc_tcp;
@@ -125,6 +126,7 @@ static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
   gpr_mu_destroy(&tcp->mu);
   gpr_free(tcp->peer_string);
   grpc_resource_user_unref(exec_ctx, tcp->resource_user);
+  if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error);
   gpr_free(tcp);
 }
 
@@ -182,7 +184,10 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
         grpc_slice_buffer_add(tcp->read_slices, sub);
       } else {
         grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
-        error = GRPC_ERROR_CREATE("End of TCP stream");
+        error = tcp->shutting_down
+                    ? GRPC_ERROR_CREATE_REFERENCING("TCP stream shutting down",
+                                                    &tcp->shutdown_error, 1)
+                    : GRPC_ERROR_CREATE("End of TCP stream");
       }
     }
   }
@@ -203,8 +208,9 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
   WSABUF buffer;
 
   if (tcp->shutting_down) {
-    grpc_closure_sched(exec_ctx, cb,
-                       GRPC_ERROR_CREATE("TCP socket is shutting down"));
+    grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_REFERENCING(
+                                         "TCP socket is shutting down",
+                                         &tcp->shutdown_error, 1));
     return;
   }
 
@@ -291,8 +297,9 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
   size_t len;
 
   if (tcp->shutting_down) {
-    grpc_closure_sched(exec_ctx, cb,
-                       GRPC_ERROR_CREATE("TCP socket is shutting down"));
+    grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_REFERENCING(
+                                         "TCP socket is shutting down",
+                                         &tcp->shutdown_error, 1));
     return;
   }
 
@@ -373,12 +380,18 @@ static void win_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
    we're not going to protect against these. However the IO Completion Port
    callback will happen from another thread, so we need to protect against
    concurrent access of the data structure in that regard. */
-static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+                         grpc_error *why) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   gpr_mu_lock(&tcp->mu);
   /* At that point, what may happen is that we're already inside the IOCP
      callback. See the comments in on_read and on_write. */
-  tcp->shutting_down = 1;
+  if (!tcp->shutting_down) {
+    tcp->shutting_down = 1;
+    tcp->shutdown_error = why;
+  } else {
+    GRPC_ERROR_UNREF(why);
+  }
   grpc_winsocket_shutdown(tcp->socket);
   gpr_mu_unlock(&tcp->mu);
   grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);

+ 2 - 1
src/core/lib/iomgr/udp_server.c

@@ -203,7 +203,8 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
     for (sp = s->head; sp; sp = sp->next) {
       GPR_ASSERT(sp->orphan_cb);
       sp->orphan_cb(sp->emfd);
-      grpc_fd_shutdown(exec_ctx, sp->emfd);
+      grpc_fd_shutdown(exec_ctx, sp->emfd,
+                       GRPC_ERROR_CREATE("Server destroyed"));
     }
     gpr_mu_unlock(&s->mu);
   } else {

+ 3 - 3
src/core/lib/security/transport/secure_endpoint.c

@@ -341,10 +341,10 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
   GPR_TIMER_END("secure_endpoint.endpoint_write", 0);
 }
 
-static void endpoint_shutdown(grpc_exec_ctx *exec_ctx,
-                              grpc_endpoint *secure_ep) {
+static void endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
+                              grpc_error *why) {
   secure_endpoint *ep = (secure_endpoint *)secure_ep;
-  grpc_endpoint_shutdown(exec_ctx, ep->wrapped_ep);
+  grpc_endpoint_shutdown(exec_ctx, ep->wrapped_ep, why);
 }
 
 static void endpoint_destroy(grpc_exec_ctx *exec_ctx,

+ 9 - 4
src/core/lib/security/transport/security_handshaker.c

@@ -130,7 +130,7 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx,
     // before destroying them, even if we know that there are no
     // pending read/write callbacks.  This should be fixed, at which
     // point this can be removed.
-    grpc_endpoint_shutdown(exec_ctx, h->args->endpoint);
+    grpc_endpoint_shutdown(exec_ctx, h->args->endpoint, GRPC_ERROR_REF(error));
     // Not shutting down, so the write failed.  Clean up before
     // invoking the callback.
     cleanup_args_for_failure_locked(exec_ctx, h);
@@ -347,15 +347,17 @@ static void security_handshaker_destroy(grpc_exec_ctx *exec_ctx,
 }
 
 static void security_handshaker_shutdown(grpc_exec_ctx *exec_ctx,
-                                         grpc_handshaker *handshaker) {
+                                         grpc_handshaker *handshaker,
+                                         grpc_error *why) {
   security_handshaker *h = (security_handshaker *)handshaker;
   gpr_mu_lock(&h->mu);
   if (!h->shutdown) {
     h->shutdown = true;
-    grpc_endpoint_shutdown(exec_ctx, h->args->endpoint);
+    grpc_endpoint_shutdown(exec_ctx, h->args->endpoint, GRPC_ERROR_REF(why));
     cleanup_args_for_failure_locked(exec_ctx, h);
   }
   gpr_mu_unlock(&h->mu);
+  GRPC_ERROR_UNREF(why);
 }
 
 static void security_handshaker_do_handshake(grpc_exec_ctx *exec_ctx,
@@ -417,7 +419,10 @@ static void fail_handshaker_destroy(grpc_exec_ctx *exec_ctx,
 }
 
 static void fail_handshaker_shutdown(grpc_exec_ctx *exec_ctx,
-                                     grpc_handshaker *handshaker) {}
+                                     grpc_handshaker *handshaker,
+                                     grpc_error *why) {
+  GRPC_ERROR_UNREF(why);
+}
 
 static void fail_handshaker_do_handshake(grpc_exec_ctx *exec_ctx,
                                          grpc_handshaker *handshaker,

+ 16 - 0
src/csharp/Grpc.Core.Tests/ClientServerTest.cs

@@ -335,6 +335,22 @@ namespace Grpc.Core.Tests
             Assert.AreEqual(StatusCode.Unimplemented, ex.Status.StatusCode);
         }
 
+        [Test]
+        public void StatusDetailIsUtf8()
+        {
+            // some japanese and chinese characters
+            var nonAsciiString = "\u30a1\u30a2\u30a3 \u62b5\u6297\u662f\u5f92\u52b3\u7684";
+            helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+            {
+                context.Status = new Status(StatusCode.Unknown, nonAsciiString);
+                return "";
+            });
+
+            var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
+            Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode);
+            Assert.AreEqual(nonAsciiString, ex.Status.Detail);
+        }
+
         [Test]
         public void ServerCallContext_PeerInfoPresent()
         {

+ 10 - 1
src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs

@@ -33,6 +33,7 @@
 
 using System;
 using System.Runtime.InteropServices;
+using System.Text;
 using Grpc.Core;
 
 namespace Grpc.Core.Internal
@@ -42,6 +43,7 @@ namespace Grpc.Core.Internal
     /// </summary>
     internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid
     {
+        static readonly Encoding EncodingUTF8 = System.Text.Encoding.UTF8;
         static readonly NativeMethods Native = NativeMethods.Get();
 
         private BatchContextSafeHandle()
@@ -73,7 +75,7 @@ namespace Grpc.Core.Internal
         {
             UIntPtr detailsLength;
             IntPtr detailsPtr = Native.grpcsharp_batch_context_recv_status_on_client_details(this, out detailsLength);
-            string details = Marshal.PtrToStringAnsi(detailsPtr, (int) detailsLength.ToUInt32());
+            string details = PtrToStringUtf8(detailsPtr, (int) detailsLength.ToUInt32());
             var status = new Status(Native.grpcsharp_batch_context_recv_status_on_client_status(this), details);
 
             IntPtr metadataArrayPtr = Native.grpcsharp_batch_context_recv_status_on_client_trailing_metadata(this);
@@ -106,5 +108,12 @@ namespace Grpc.Core.Internal
             Native.grpcsharp_batch_context_destroy(handle);
             return true;
         }
+
+        string PtrToStringUtf8(IntPtr ptr, int len)
+        {
+            var bytes = new byte[len];
+            Marshal.Copy(ptr, bytes, 0, len);
+            return EncodingUTF8.GetString(bytes);
+        }
     }
 }

+ 4 - 1
src/csharp/Grpc.Core/Internal/CallSafeHandle.cs

@@ -32,6 +32,7 @@
 using System;
 using System.Diagnostics;
 using System.Runtime.InteropServices;
+using System.Text;
 using Grpc.Core;
 using Grpc.Core.Utils;
 using Grpc.Core.Profiling;
@@ -44,6 +45,7 @@ namespace Grpc.Core.Internal
     internal class CallSafeHandle : SafeHandleZeroIsInvalid, INativeCall
     {
         public static readonly CallSafeHandle NullInstance = new CallSafeHandle();
+        static readonly Encoding EncodingUTF8 = System.Text.Encoding.UTF8;
         static readonly NativeMethods Native = NativeMethods.Get();
 
         const uint GRPC_WRITE_BUFFER_HINT = 1;
@@ -138,7 +140,8 @@ namespace Grpc.Core.Internal
                 var ctx = BatchContextSafeHandle.Create();
                 var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
                 completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
-                Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata,
+                var statusDetailBytes = EncodingUTF8.GetBytes(status.Detail);
+                Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, statusDetailBytes, new UIntPtr((ulong)statusDetailBytes.Length), metadataArray, sendEmptyInitialMetadata,
                     optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
             }
         }

+ 1 - 1
src/csharp/Grpc.Core/Internal/NativeMethods.cs

@@ -336,7 +336,7 @@ namespace Grpc.Core.Internal
             public delegate CallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call,
                 BatchContextSafeHandle ctx);
             public delegate CallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call,
-                BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+                BatchContextSafeHandle ctx, StatusCode statusCode, byte[] statusMessage, UIntPtr statusMessageLen, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
                 byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
             public delegate CallError grpcsharp_call_recv_message_delegate(CallSafeHandle call,
                 BatchContextSafeHandle ctx);

+ 3 - 2
src/csharp/ext/grpc_csharp_ext.c

@@ -734,14 +734,15 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
 
 GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
     grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code,
-    const char *status_details, grpc_metadata_array *trailing_metadata,
+    const char *status_details, size_t status_details_len,
+    grpc_metadata_array *trailing_metadata,
     int32_t send_empty_initial_metadata, const char* optional_send_buffer,
     size_t optional_send_buffer_len, uint32_t write_flags) {
   /* TODO: don't use magic number */
   grpc_op ops[3];
   memset(ops, 0, sizeof(ops));
   size_t nops = 1;
-  grpc_slice status_details_slice = grpc_slice_from_copied_string(status_details);
+  grpc_slice status_details_slice = grpc_slice_from_copied_buffer(status_details, status_details_len);
   ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
   ops[0].data.send_status_from_server.status = status_code;
   ops[0].data.send_status_from_server.status_details = &status_details_slice;

+ 7 - 7
src/php/composer.json

@@ -1,14 +1,10 @@
 {
-  "name": "grpc/grpc",
-  "type": "library",
-  "description": "gRPC library for PHP",
-  "keywords": ["rpc"],
-  "homepage": "http://grpc.io",
+  "name": "grpc/grpc-dev",
+  "description": "gRPC library for PHP - for Developement use only",
   "license": "BSD-3-Clause",
   "version": "1.1.0",
   "require": {
     "php": ">=5.5.0",
-    "ext-grpc": "*",
     "google/protobuf": "v3.1.0-alpha-1"
   },
   "require-dev": {
@@ -16,7 +12,11 @@
   },
   "autoload": {
     "psr-4": {
-      "Grpc\\": "lib/Grpc/"
+      "Grpc\\": "lib/Grpc/",
+      "Grpc\\Testing\\": "tests/interop/Grpc/Testing/",
+      "GPBMetadata\\Src\\Proto\\Grpc\\Testing\\": "tests/interop/GPBMetadata/Src/Proto/Grpc/Testing/",
+      "Math\\": "tests/generated_code/Math/",
+      "GPBMetadata\\": "tests/generated_code/GPBMetadata/"
     }
   }
 }

+ 22 - 19
src/php/tests/generated_code/AbstractGeneratedCodeTest.php

@@ -32,8 +32,11 @@
  *
  */
 require_once realpath(dirname(__FILE__).'/../../vendor/autoload.php');
-require_once dirname(__FILE__).'/math.pb.php';
-require_once dirname(__FILE__).'/math_grpc_pb.php';
+
+// The following includes are needed when using protobuf 3.1.0
+// and will suppress warnings when using protobuf 3.2.0+
+@include_once dirname(__FILE__).'/math.pb.php';
+@include_once dirname(__FILE__).'/math_grpc_pb.php';
 
 abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
 {
@@ -70,7 +73,7 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
     public function testClose()
     {
         self::$client->close();
-        $div_arg = new math\DivArgs();
+        $div_arg = new Math\DivArgs();
         $call = self::$client->Div($div_arg);
     }
 
@@ -79,20 +82,20 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
      */
     public function testInvalidMetadata()
     {
-        $div_arg = new math\DivArgs();
+        $div_arg = new Math\DivArgs();
         $call = self::$client->Div($div_arg, [' ' => 'abc123']);
     }
 
     public function testGetCallMetadata()
     {
-        $div_arg = new math\DivArgs();
+        $div_arg = new Math\DivArgs();
         $call = self::$client->Div($div_arg);
         $this->assertTrue(is_array($call->getMetadata()));
     }
 
     public function testTimeout()
     {
-        $div_arg = new math\DivArgs();
+        $div_arg = new Math\DivArgs();
         $call = self::$client->Div($div_arg, [], ['timeout' => 1]);
         list($response, $status) = $call->wait();
         $this->assertSame(\Grpc\STATUS_DEADLINE_EXCEEDED, $status->code);
@@ -100,7 +103,7 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
 
     public function testCancel()
     {
-        $div_arg = new math\DivArgs();
+        $div_arg = new Math\DivArgs();
         $call = self::$client->Div($div_arg);
         $call->cancel();
         list($response, $status) = $call->wait();
@@ -109,7 +112,7 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
 
     public function testCallCredentialsCallback()
     {
-        $div_arg = new math\DivArgs();
+        $div_arg = new Math\DivArgs();
         $call = self::$client->Div($div_arg, array(), array(
             'call_credentials_callback' => function ($context) {
                 return array();
@@ -122,7 +125,7 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
 
     public function testCallCredentialsCallback2()
     {
-        $div_arg = new math\DivArgs();
+        $div_arg = new Math\DivArgs();
         $call = self::$client->Div($div_arg);
         $call_credentials = Grpc\CallCredentials::createFromPlugin(
             function ($context) {
@@ -143,7 +146,7 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
         $invalid_client = new DummyInvalidClient('host', [
             'credentials' => Grpc\ChannelCredentials::createInsecure(),
         ]);
-        $div_arg = new math\DivArgs();
+        $div_arg = new Math\DivArgs();
         $invalid_client->InvalidUnaryCall($div_arg);
     }
 
@@ -166,7 +169,7 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
 
     public function testWriteFlags()
     {
-        $div_arg = new math\DivArgs();
+        $div_arg = new Math\DivArgs();
         $div_arg->setDividend(7);
         $div_arg->setDivisor(4);
         $call = self::$client->Div($div_arg, [],
@@ -180,7 +183,7 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
 
     public function testWriteFlagsServerStreaming()
     {
-        $fib_arg = new math\FibArgs();
+        $fib_arg = new Math\FibArgs();
         $fib_arg->setLimit(7);
         $call = self::$client->Fib($fib_arg, [],
                                    ['flags' => Grpc\WRITE_NO_COMPRESS]);
@@ -192,7 +195,7 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
     public function testWriteFlagsClientStreaming()
     {
         $call = self::$client->Sum();
-        $num = new math\Num();
+        $num = new Math\Num();
         $num->setNum(1);
         $call->write($num, ['flags' => Grpc\WRITE_NO_COMPRESS]);
         list($response, $status) = $call->wait();
@@ -202,7 +205,7 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
     public function testWriteFlagsBidiStreaming()
     {
         $call = self::$client->DivMany();
-        $div_arg = new math\DivArgs();
+        $div_arg = new Math\DivArgs();
         $div_arg->setDividend(7);
         $div_arg->setDivisor(4);
         $call->write($div_arg, ['flags' => Grpc\WRITE_NO_COMPRESS]);
@@ -214,7 +217,7 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
 
     public function testSimpleRequest()
     {
-        $div_arg = new math\DivArgs();
+        $div_arg = new Math\DivArgs();
         $div_arg->setDividend(7);
         $div_arg->setDivisor(4);
         $call = self::$client->Div($div_arg);
@@ -227,7 +230,7 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
 
     public function testServerStreaming()
     {
-        $fib_arg = new math\FibArgs();
+        $fib_arg = new Math\FibArgs();
         $fib_arg->setLimit(7);
         $call = self::$client->Fib($fib_arg);
         $this->assertTrue(is_string($call->getPeer()));
@@ -246,7 +249,7 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
         $call = self::$client->Sum();
         $this->assertTrue(is_string($call->getPeer()));
         for ($i = 0; $i < 7; ++$i) {
-            $num = new math\Num();
+            $num = new Math\Num();
             $num->setNum($i);
             $call->write($num);
         }
@@ -260,7 +263,7 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
         $call = self::$client->DivMany();
         $this->assertTrue(is_string($call->getPeer()));
         for ($i = 0; $i < 7; ++$i) {
-            $div_arg = new math\DivArgs();
+            $div_arg = new Math\DivArgs();
             $div_arg->setDividend(2 * $i + 1);
             $div_arg->setDivisor(2);
             $call->write($div_arg);
@@ -276,7 +279,7 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase
 
 class DummyInvalidClient extends \Grpc\BaseStub
 {
-    public function InvalidUnaryCall(\math\DivArgs $argument,
+    public function InvalidUnaryCall(\Math\DivArgs $argument,
                                      $metadata = [],
                                      $options = [])
     {

+ 1 - 1
src/php/tests/generated_code/GeneratedCodeTest.php

@@ -37,7 +37,7 @@ class GeneratedCodeTest extends AbstractGeneratedCodeTest
 {
     public function setUp()
     {
-        self::$client = new math\MathClient(
+        self::$client = new Math\MathClient(
             getenv('GRPC_TEST_HOST'), [
                 'credentials' => Grpc\ChannelCredentials::createInsecure(),
             ]);

+ 1 - 1
src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php

@@ -37,7 +37,7 @@ class GeneratedCodeWithCallbackTest extends AbstractGeneratedCodeTest
 {
     public function setUp()
     {
-        self::$client = new math\MathClient(
+        self::$client = new Math\MathClient(
         getenv('GRPC_TEST_HOST'),
         ['credentials' => Grpc\ChannelCredentials::createInsecure(),
          'update_metadata' => function ($a_hash,

+ 44 - 40
src/php/tests/interop/interop_client.php

@@ -32,8 +32,12 @@
  *
  */
 require_once realpath(dirname(__FILE__).'/../../vendor/autoload.php');
-require 'src/proto/grpc/testing/test.pb.php';
-require 'src/proto/grpc/testing/test_grpc_pb.php';
+
+// The following includes are needed when using protobuf 3.1.0
+// and will suppress warnings when using protobuf 3.2.0+
+@include_once 'src/proto/grpc/testing/test.pb.php';
+@include_once 'src/proto/grpc/testing/test_grpc_pb.php';
+
 use Google\Auth\CredentialsLoader;
 use Google\Auth\ApplicationDefaultCredentials;
 use GuzzleHttp\ClientInterface;
@@ -70,7 +74,7 @@ function hardAssertIfStatusOk($status)
 function emptyUnary($stub)
 {
     list($result, $status) =
-        $stub->EmptyCall(new grpc\testing\EmptyMessage())->wait();
+        $stub->EmptyCall(new Grpc\Testing\EmptyMessage())->wait();
     hardAssertIfStatusOk($status);
     hardAssert($result !== null, 'Call completed with a null response');
 }
@@ -98,11 +102,11 @@ function performLargeUnary($stub, $fillUsername = false,
     $request_len = 271828;
     $response_len = 314159;
 
-    $request = new grpc\testing\SimpleRequest();
-    $request->setResponseType(grpc\testing\PayloadType::COMPRESSABLE);
+    $request = new Grpc\Testing\SimpleRequest();
+    $request->setResponseType(Grpc\Testing\PayloadType::COMPRESSABLE);
     $request->setResponseSize($response_len);
-    $payload = new grpc\testing\Payload();
-    $payload->setType(grpc\testing\PayloadType::COMPRESSABLE);
+    $payload = new Grpc\Testing\Payload();
+    $payload->setType(Grpc\Testing\PayloadType::COMPRESSABLE);
     $payload->setBody(str_repeat("\0", $request_len));
     $request->setPayload($payload);
     $request->setFillUsername($fillUsername);
@@ -117,7 +121,7 @@ function performLargeUnary($stub, $fillUsername = false,
     hardAssertIfStatusOk($status);
     hardAssert($result !== null, 'Call returned a null response');
     $payload = $result->getPayload();
-    hardAssert($payload->getType() === grpc\testing\PayloadType::COMPRESSABLE,
+    hardAssert($payload->getType() === Grpc\Testing\PayloadType::COMPRESSABLE,
                'Payload had the wrong type');
     hardAssert(strlen($payload->getBody()) === $response_len,
                'Payload had the wrong length');
@@ -249,8 +253,8 @@ function clientStreaming($stub)
 
     $requests = array_map(
         function ($length) {
-            $request = new grpc\testing\StreamingInputCallRequest();
-            $payload = new grpc\testing\Payload();
+            $request = new Grpc\Testing\StreamingInputCallRequest();
+            $payload = new Grpc\Testing\Payload();
             $payload->setBody(str_repeat("\0", $length));
             $request->setPayload($payload);
 
@@ -276,10 +280,10 @@ function serverStreaming($stub)
 {
     $sizes = [31415, 9, 2653, 58979];
 
-    $request = new grpc\testing\StreamingOutputCallRequest();
-    $request->setResponseType(grpc\testing\PayloadType::COMPRESSABLE);
+    $request = new Grpc\Testing\StreamingOutputCallRequest();
+    $request->setResponseType(Grpc\Testing\PayloadType::COMPRESSABLE);
     foreach ($sizes as $size) {
-        $response_parameters = new grpc\testing\ResponseParameters();
+        $response_parameters = new Grpc\Testing\ResponseParameters();
         $response_parameters->setSize($size);
         $request->getResponseParameters()[] = $response_parameters;
     }
@@ -290,7 +294,7 @@ function serverStreaming($stub)
         hardAssert($i < 4, 'Too many responses');
         $payload = $value->getPayload();
         hardAssert(
-            $payload->getType() === grpc\testing\PayloadType::COMPRESSABLE,
+            $payload->getType() === Grpc\Testing\PayloadType::COMPRESSABLE,
             'Payload '.$i.' had the wrong type');
         hardAssert(strlen($payload->getBody()) === $sizes[$i],
                    'Response '.$i.' had the wrong length');
@@ -311,12 +315,12 @@ function pingPong($stub)
 
     $call = $stub->FullDuplexCall();
     for ($i = 0; $i < 4; ++$i) {
-        $request = new grpc\testing\StreamingOutputCallRequest();
-        $request->setResponseType(grpc\testing\PayloadType::COMPRESSABLE);
-        $response_parameters = new grpc\testing\ResponseParameters();
+        $request = new Grpc\Testing\StreamingOutputCallRequest();
+        $request->setResponseType(Grpc\Testing\PayloadType::COMPRESSABLE);
+        $response_parameters = new Grpc\Testing\ResponseParameters();
         $response_parameters->setSize($response_lengths[$i]);
         $request->getResponseParameters()[] = $response_parameters;
-        $payload = new grpc\testing\Payload();
+        $payload = new Grpc\Testing\Payload();
         $payload->setBody(str_repeat("\0", $request_lengths[$i]));
         $request->setPayload($payload);
 
@@ -326,7 +330,7 @@ function pingPong($stub)
         hardAssert($response !== null, 'Server returned too few responses');
         $payload = $response->getPayload();
         hardAssert(
-            $payload->getType() === grpc\testing\PayloadType::COMPRESSABLE,
+            $payload->getType() === Grpc\Testing\PayloadType::COMPRESSABLE,
             'Payload '.$i.' had the wrong type');
         hardAssert(strlen($payload->getBody()) === $response_lengths[$i],
                    'Payload '.$i.' had the wrong length');
@@ -371,12 +375,12 @@ function cancelAfterBegin($stub)
 function cancelAfterFirstResponse($stub)
 {
     $call = $stub->FullDuplexCall();
-    $request = new grpc\testing\StreamingOutputCallRequest();
-    $request->setResponseType(grpc\testing\PayloadType::COMPRESSABLE);
-    $response_parameters = new grpc\testing\ResponseParameters();
+    $request = new Grpc\Testing\StreamingOutputCallRequest();
+    $request->setResponseType(Grpc\Testing\PayloadType::COMPRESSABLE);
+    $response_parameters = new Grpc\Testing\ResponseParameters();
     $response_parameters->setSize(31415);
     $request->getResponseParameters()[] = $response_parameters;
-    $payload = new grpc\testing\Payload();
+    $payload = new Grpc\Testing\Payload();
     $payload->setBody(str_repeat("\0", 27182));
     $request->setPayload($payload);
 
@@ -391,12 +395,12 @@ function cancelAfterFirstResponse($stub)
 function timeoutOnSleepingServer($stub)
 {
     $call = $stub->FullDuplexCall([], ['timeout' => 1000]);
-    $request = new grpc\testing\StreamingOutputCallRequest();
-    $request->setResponseType(grpc\testing\PayloadType::COMPRESSABLE);
-    $response_parameters = new grpc\testing\ResponseParameters();
+    $request = new Grpc\Testing\StreamingOutputCallRequest();
+    $request->setResponseType(Grpc\Testing\PayloadType::COMPRESSABLE);
+    $response_parameters = new Grpc\Testing\ResponseParameters();
     $response_parameters->setSize(8);
     $request->getResponseParameters()[] = $response_parameters;
-    $payload = new grpc\testing\Payload();
+    $payload = new Grpc\Testing\Payload();
     $payload->setBody(str_repeat("\0", 9));
     $request->setPayload($payload);
 
@@ -416,11 +420,11 @@ function customMetadata($stub)
     $request_len = 271828;
     $response_len = 314159;
 
-    $request = new grpc\testing\SimpleRequest();
-    $request->setResponseType(grpc\testing\PayloadType::COMPRESSABLE);
+    $request = new Grpc\Testing\SimpleRequest();
+    $request->setResponseType(Grpc\Testing\PayloadType::COMPRESSABLE);
     $request->setResponseSize($response_len);
-    $payload = new grpc\testing\Payload();
-    $payload->setType(grpc\testing\PayloadType::COMPRESSABLE);
+    $payload = new Grpc\Testing\Payload();
+    $payload->setType(Grpc\Testing\PayloadType::COMPRESSABLE);
     $payload->setBody(str_repeat("\0", $request_len));
     $request->setPayload($payload);
 
@@ -449,9 +453,9 @@ function customMetadata($stub)
 
     $streaming_call = $stub->FullDuplexCall($metadata);
 
-    $streaming_request = new grpc\testing\StreamingOutputCallRequest();
+    $streaming_request = new Grpc\Testing\StreamingOutputCallRequest();
     $streaming_request->setPayload($payload);
-    $response_parameters = new grpc\testing\ResponseParameters();
+    $response_parameters = new Grpc\Testing\ResponseParameters();
     $response_parameters->setSize($response_len);
     $streaming_request->getResponseParameters()[] = $response_parameters;
     $streaming_call->write($streaming_request);
@@ -477,11 +481,11 @@ function customMetadata($stub)
 
 function statusCodeAndMessage($stub)
 {
-    $echo_status = new grpc\testing\EchoStatus();
+    $echo_status = new Grpc\Testing\EchoStatus();
     $echo_status->setCode(2);
     $echo_status->setMessage('test status message');
 
-    $request = new grpc\testing\SimpleRequest();
+    $request = new Grpc\Testing\SimpleRequest();
     $request->setResponseStatus($echo_status);
 
     $call = $stub->UnaryCall($request);
@@ -496,7 +500,7 @@ function statusCodeAndMessage($stub)
 
     $streaming_call = $stub->FullDuplexCall();
 
-    $streaming_request = new grpc\testing\StreamingOutputCallRequest();
+    $streaming_request = new Grpc\Testing\StreamingOutputCallRequest();
     $streaming_request->setResponseStatus($echo_status);
     $streaming_call->write($streaming_request);
     $streaming_call->writesDone();
@@ -514,7 +518,7 @@ function statusCodeAndMessage($stub)
 # NOTE: the stub input to this function is from UnimplementedService
 function unimplementedService($stub)
 {
-    $call = $stub->UnimplementedCall(new grpc\testing\EmptyMessage());
+    $call = $stub->UnimplementedCall(new Grpc\Testing\EmptyMessage());
     list($result, $status) = $call->wait();
     hardAssert($status->code === Grpc\STATUS_UNIMPLEMENTED,
                'Received unexpected status code');
@@ -523,7 +527,7 @@ function unimplementedService($stub)
 # NOTE: the stub input to this function is from TestService
 function unimplementedMethod($stub)
 {
-    $call = $stub->UnimplementedCall(new grpc\testing\EmptyMessage());
+    $call = $stub->UnimplementedCall(new Grpc\Testing\EmptyMessage());
     list($result, $status) = $call->wait();
     hardAssert($status->code === Grpc\STATUS_UNIMPLEMENTED,
                'Received unexpected status code');
@@ -614,10 +618,10 @@ function _makeStub($args)
     }
 
     if ($test_case === 'unimplemented_service') {
-        $stub = new grpc\testing\UnimplementedServiceClient($server_address,
+        $stub = new Grpc\Testing\UnimplementedServiceClient($server_address,
                                                             $opts);
     } else {
-        $stub = new grpc\testing\TestServiceClient($server_address, $opts);
+        $stub = new Grpc\Testing\TestServiceClient($server_address, $opts);
     }
 
     return $stub;

+ 6 - 0
src/proto/grpc/testing/BUILD

@@ -42,11 +42,13 @@ grpc_proto_library(
     name = "control_proto",
     srcs = ["control.proto"],
     deps = ["payloads_proto", "stats_proto"],
+    has_services = False,
 )
 
 grpc_proto_library(
     name = "echo_messages_proto",
     srcs = ["echo_messages.proto"],
+    has_services = False,
 )
 
 grpc_proto_library(
@@ -58,11 +60,13 @@ grpc_proto_library(
 grpc_proto_library(
     name = "empty_proto",
     srcs = ["empty.proto"],
+    has_services = False,
 )
 
 grpc_proto_library(
     name = "messages_proto",
     srcs = ["messages.proto"],
+    has_services = False,
 )
 
 grpc_proto_library(
@@ -73,6 +77,7 @@ grpc_proto_library(
 grpc_proto_library(
     name = "payloads_proto",
     srcs = ["payloads.proto"],
+    has_services = False,
 )
 
 grpc_proto_library(
@@ -84,6 +89,7 @@ grpc_proto_library(
 grpc_proto_library(
     name = "stats_proto",
     srcs = ["stats.proto"],
+    has_services = False,
 )
 
 grpc_proto_library(

+ 2 - 2
src/python/grpcio/grpc/_channel.py

@@ -842,8 +842,8 @@ def _poll_connectivity(state, channel, initial_try_to_connect):
     connectivity = channel.check_connectivity_state(try_to_connect)
     with state.lock:
         state.connectivity = (
-            _common.
-            CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[connectivity])
+            _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
+                connectivity])
         callbacks = tuple(callback
                           for callback, unused_but_known_to_be_none_connectivity
                           in state.callbacks_and_connectivities)

+ 1 - 2
src/python/grpcio/grpc/beta/_server_adaptations.py

@@ -393,5 +393,4 @@ def server(service_implementations, multi_method_implementation,
     else:
         effective_thread_pool = thread_pool
     return _Server(
-        grpc.server(
-            effective_thread_pool, handlers=(generic_rpc_handler,)))
+        grpc.server(effective_thread_pool, handlers=(generic_rpc_handler,)))

+ 2 - 3
src/python/grpcio/grpc/framework/foundation/logging_pool.py

@@ -64,9 +64,8 @@ class _LoggingPool(object):
         return self._backing_pool.submit(_wrap(fn), *args, **kwargs)
 
     def map(self, func, *iterables, **kwargs):
-        return self._backing_pool.map(_wrap(func),
-                                      *iterables,
-                                      timeout=kwargs.get('timeout', None))
+        return self._backing_pool.map(
+            _wrap(func), *iterables, timeout=kwargs.get('timeout', None))
 
     def shutdown(self, wait=True):
         self._backing_pool.shutdown(wait=wait)

+ 8 - 10
src/python/grpcio_tests/tests/interop/methods.py

@@ -351,8 +351,7 @@ def _status_code_and_message(stub):
         response_type=messages_pb2.COMPRESSABLE,
         response_size=1,
         payload=messages_pb2.Payload(body=b'\x00'),
-        response_status=messages_pb2.EchoStatus(
-            code=code, message=details))
+        response_status=messages_pb2.EchoStatus(code=code, message=details))
     response_future = stub.UnaryCall.future(request)
     _validate_status_code_and_details(response_future, status, details)
 
@@ -363,8 +362,7 @@ def _status_code_and_message(stub):
             response_type=messages_pb2.COMPRESSABLE,
             response_parameters=(messages_pb2.ResponseParameters(size=1),),
             payload=messages_pb2.Payload(body=b'\x00'),
-            response_status=messages_pb2.EchoStatus(
-                code=code, message=details))
+            response_status=messages_pb2.EchoStatus(code=code, message=details))
         pipe.add(request)  # sends the initial request.
     # Dropping out of with block closes the pipe
     _validate_status_code_and_details(response_iterator, status, details)
@@ -428,8 +426,8 @@ def _compute_engine_creds(stub, args):
 
 
 def _oauth2_auth_token(stub, args):
-    json_key_filename = os.environ[oauth2client_client.
-                                   GOOGLE_APPLICATION_CREDENTIALS]
+    json_key_filename = os.environ[
+        oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
     wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
     response = _large_unary_common_behavior(stub, True, True, None)
     if wanted_email != response.username:
@@ -441,8 +439,8 @@ def _oauth2_auth_token(stub, args):
 
 
 def _jwt_token_creds(stub, args):
-    json_key_filename = os.environ[oauth2client_client.
-                                   GOOGLE_APPLICATION_CREDENTIALS]
+    json_key_filename = os.environ[
+        oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
     wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
     response = _large_unary_common_behavior(stub, True, False, None)
     if wanted_email != response.username:
@@ -451,8 +449,8 @@ def _jwt_token_creds(stub, args):
 
 
 def _per_rpc_creds(stub, args):
-    json_key_filename = os.environ[oauth2client_client.
-                                   GOOGLE_APPLICATION_CREDENTIALS]
+    json_key_filename = os.environ[
+        oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
     wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
     credentials = oauth2client_client.GoogleCredentials.get_application_default()
     scoped_credentials = credentials.create_scoped([args.oauth_scope])

+ 1 - 2
src/python/grpcio_tests/tests/unit/_cython/_channel_test.py

@@ -59,8 +59,7 @@ def _create_loop_destroy():
 
 def _in_parallel(behavior, arguments):
     threads = tuple(
-        threading.Thread(
-            target=behavior, args=arguments)
+        threading.Thread(target=behavior, args=arguments)
         for _ in range(test_constants.THREAD_CONCURRENCY))
     for thread in threads:
         thread.start()

+ 0 - 1
templates/composer.json.template

@@ -9,7 +9,6 @@
     "license": "BSD-3-Clause",
     "require": {
       "php": ">=5.5.0",
-      "ext-grpc": "*",
       "google/protobuf": "v3.1.0-alpha-1"
     },
     "require-dev": {

+ 7 - 7
templates/src/php/composer.json.template

@@ -1,16 +1,12 @@
 %YAML 1.2
 --- |
   {
-    "name": "grpc/grpc",
-    "type": "library",
-    "description": "gRPC library for PHP",
-    "keywords": ["rpc"],
-    "homepage": "http://grpc.io",
+    "name": "grpc/grpc-dev",
+    "description": "gRPC library for PHP - for Developement use only",
     "license": "BSD-3-Clause",
     "version": "${settings.php_version.php_composer()}",
     "require": {
       "php": ">=5.5.0",
-      "ext-grpc": "*",
       "google/protobuf": "v3.1.0-alpha-1"
     },
     "require-dev": {
@@ -18,7 +14,11 @@
     },
     "autoload": {
       "psr-4": {
-        "Grpc\\": "lib/Grpc/"
+        "Grpc\\": "lib/Grpc/",
+        "Grpc\\Testing\\": "tests/interop/Grpc/Testing/",
+        "GPBMetadata\\Src\\Proto\\Grpc\\Testing\\": "tests/interop/GPBMetadata/Src/Proto/Grpc/Testing/",
+        "Math\\": "tests/generated_code/Math/",
+        "GPBMetadata\\": "tests/generated_code/GPBMetadata/"
       }
     }
   }

+ 4 - 2
test/core/bad_client/bad_client.c

@@ -163,7 +163,8 @@ void grpc_run_bad_client_test(
       gpr_event_wait(&a.done_write, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)));
 
   if (flags & GRPC_BAD_CLIENT_DISCONNECT) {
-    grpc_endpoint_shutdown(&exec_ctx, sfd.client);
+    grpc_endpoint_shutdown(&exec_ctx, sfd.client,
+                           GRPC_ERROR_CREATE("Forced Disconnect"));
     grpc_endpoint_destroy(&exec_ctx, sfd.client);
     grpc_exec_ctx_finish(&exec_ctx);
     sfd.client = NULL;
@@ -189,7 +190,8 @@ void grpc_run_bad_client_test(
       grpc_slice_buffer_destroy_internal(&exec_ctx, &args.incoming);
     }
     // Shutdown.
-    grpc_endpoint_shutdown(&exec_ctx, sfd.client);
+    grpc_endpoint_shutdown(&exec_ctx, sfd.client,
+                           GRPC_ERROR_CREATE("Test Shutdown"));
     grpc_endpoint_destroy(&exec_ctx, sfd.client);
     grpc_exec_ctx_finish(&exec_ctx);
   }

+ 3 - 1
test/core/client_channel/set_initial_connect_string_test.c

@@ -81,7 +81,9 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
           state.incoming_buffer.length, strlen(magic_connect_string));
   if (state.incoming_buffer.length > strlen(magic_connect_string)) {
     gpr_atm_rel_store(&state.done_atm, 1);
-    grpc_endpoint_shutdown(exec_ctx, state.tcp);
+    grpc_endpoint_shutdown(
+        exec_ctx, state.tcp,
+        GRPC_ERROR_CREATE("Incoming buffer longer than magic_connect_string"));
     grpc_endpoint_destroy(exec_ctx, state.tcp);
   } else {
     grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer,

+ 2 - 1
test/core/end2end/bad_server_response_test.c

@@ -298,7 +298,8 @@ static void run_test(const char *response_payload,
   gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
 
   /* clean up */
-  grpc_endpoint_shutdown(&exec_ctx, state.tcp);
+  grpc_endpoint_shutdown(&exec_ctx, state.tcp,
+                         GRPC_ERROR_CREATE("Test Shutdown"));
   grpc_endpoint_destroy(&exec_ctx, state.tcp);
   cleanup_rpc(&exec_ctx);
   grpc_exec_ctx_finish(&exec_ctx);

+ 6 - 3
test/core/end2end/fixtures/http_proxy.c

@@ -133,9 +133,12 @@ static void proxy_connection_failed(grpc_exec_ctx* exec_ctx,
   const char* msg = grpc_error_string(error);
   gpr_log(GPR_INFO, "%s: %s", prefix, msg);
 
-  grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint);
-  if (conn->server_endpoint != NULL)
-    grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint);
+  grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint,
+                         GRPC_ERROR_REF(error));
+  if (conn->server_endpoint != NULL) {
+    grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint,
+                           GRPC_ERROR_REF(error));
+  }
   proxy_connection_unref(exec_ctx, conn);
 }
 

+ 7 - 23
test/core/end2end/fuzzers/api_fuzzer.c

@@ -604,12 +604,12 @@ static call_state *maybe_delete_call_state(call_state *call) {
   grpc_slice_unref(call->recv_status_details);
   grpc_call_details_destroy(&call->call_details);
 
-  for (size_t i = 0; i < call->num_to_free; i++) {
-    gpr_free(call->to_free[i]);
-  }
   for (size_t i = 0; i < call->num_slices_to_unref; i++) {
     grpc_slice_unref(call->slices_to_unref[i]);
   }
+  for (size_t i = 0; i < call->num_to_free; i++) {
+    gpr_free(call->to_free[i]);
+  }
   gpr_free(call->to_free);
   gpr_free(call->slices_to_unref);
 
@@ -627,7 +627,7 @@ static void add_to_free(call_state *call, void *p) {
   call->to_free[call->num_to_free++] = p;
 }
 
-static grpc_slice *add_to_slice_unref(call_state *call, grpc_slice s) {
+static grpc_slice *add_slice_to_unref(call_state *call, grpc_slice s) {
   if (call->num_slices_to_unref == call->cap_slices_to_unref) {
     call->cap_slices_to_unref = GPR_MAX(8, 2 * call->cap_slices_to_unref);
     call->slices_to_unref =
@@ -648,8 +648,8 @@ static void read_metadata(input_stream *inp, size_t *count,
       (*metadata)[i].key = read_string_like_slice(inp);
       (*metadata)[i].value = read_buffer_like_slice(inp);
       (*metadata)[i].flags = read_uint32(inp);
-      add_to_slice_unref(cs, (*metadata)[i].key);
-      add_to_slice_unref(cs, (*metadata)[i].value);
+      add_slice_to_unref(cs, (*metadata)[i].key);
+      add_slice_to_unref(cs, (*metadata)[i].value);
     }
   } else {
     *metadata = gpr_malloc(1);
@@ -1008,7 +1008,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
                   g_active_call);
               op->data.send_status_from_server.status = next_byte(&inp);
               op->data.send_status_from_server.status_details =
-                  add_to_slice_unref(g_active_call,
+                  add_slice_to_unref(g_active_call,
                                      read_buffer_like_slice(&inp));
               break;
             case GRPC_OP_RECV_INITIAL_METADATA:
@@ -1056,22 +1056,6 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
           grpc_byte_buffer_destroy(g_active_call->send_message);
           g_active_call->send_message = NULL;
         }
-        for (i = 0; i < num_ops; i++) {
-          op = &ops[i];
-          switch (op->op) {
-            case GRPC_OP_SEND_STATUS_FROM_SERVER:
-              gpr_free((void *)op->data.send_status_from_server.status_details);
-              break;
-            case GRPC_OP_SEND_MESSAGE:
-            case GRPC_OP_SEND_INITIAL_METADATA:
-            case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
-            case GRPC_OP_RECV_INITIAL_METADATA:
-            case GRPC_OP_RECV_MESSAGE:
-            case GRPC_OP_RECV_STATUS_ON_CLIENT:
-            case GRPC_OP_RECV_CLOSE_ON_SERVER:
-              break;
-          }
-        }
         gpr_free(ops);
 
         break;

BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/0242a9f4d4fafc96ee9ed762b610e3c68d6efdec


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/0cd9696699bd190463ecef91968624601b64cd8b


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/122b6fc72956541812dd653b726b073b77ca33be


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/315d27e12f2214a56fb9901dacff14852ff2ac0f


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/3f2429e3255ae36fecb57559b57d2b0cb88f5dd1


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/41bda7ff09175f821992adf4314a8ec3007ffe55


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/49d0085058d7fa81247f51b802c0f4206854b4dc


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/614dbc86b17270ef1d5ab705ecbe88c742815ce7


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/6cb17148d52be437332b6fd6f2fc8328bfb63fb0


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/6ea192b1d4c4577ca7511f8ce5027b31b2e0d75d


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/746477e7e8f093f87cb6924ab6476cda9689607d


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/7752153d87017b85112a49ea95aa25ca78d24431


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/7e75ea44aa7347c2f827beecb27e3bf5b1907b8a


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/95e73caecc0ab06beaa9b84125adcb2e6eee2eff


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/9e273a94bf3c60f1c7875874c81d0b9309428752


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/a65bda38b60ae084a5dcc3b616660aa338feef17


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/b39f27387a256019038cddb91f65651c01afb825


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/b6f721156f8dc6a353555929e459e61bab8b394a


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/bbb2429766a7c4ef9cb7110d567fd48cd6507dc5


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/bc330aa616a792ff22a8c7428dcdb4d99accbe4b


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/cd4ccfa79f65f31716296e690f3a76007edde2e3


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/crash-5d73de981fb75553a7b2606e111716ee9f2af844


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/e6b74f64e8bdfdf98177aee58b8729ff2aa7ffb2


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/edecc59c5809796f266abd8df4d5ecf6aae304ca


BIN=BIN
test/core/end2end/fuzzers/api_fuzzer_corpus/f1b2889ae7091d6a14332343fe7a2bffd81039a7


+ 1 - 1
test/core/internal_api_canaries/iomgr.c

@@ -92,7 +92,7 @@ static void test_code(void) {
   grpc_endpoint_read(&exec_ctx, &endpoint, NULL, NULL);
   grpc_endpoint_get_peer(&endpoint);
   grpc_endpoint_write(&exec_ctx, &endpoint, NULL, NULL);
-  grpc_endpoint_shutdown(&exec_ctx, &endpoint);
+  grpc_endpoint_shutdown(&exec_ctx, &endpoint, GRPC_ERROR_CANCELLED);
   grpc_endpoint_destroy(&exec_ctx, &endpoint);
   grpc_endpoint_add_to_pollset(&exec_ctx, &endpoint, NULL);
   grpc_endpoint_add_to_pollset_set(&exec_ctx, &endpoint, NULL);

+ 8 - 4
test/core/iomgr/endpoint_tests.c

@@ -233,9 +233,11 @@ static void read_and_write_test(grpc_endpoint_test_config config,
 
   if (shutdown) {
     gpr_log(GPR_DEBUG, "shutdown read");
-    grpc_endpoint_shutdown(&exec_ctx, state.read_ep);
+    grpc_endpoint_shutdown(&exec_ctx, state.read_ep,
+                           GRPC_ERROR_CREATE("Test Shutdown"));
     gpr_log(GPR_DEBUG, "shutdown write");
-    grpc_endpoint_shutdown(&exec_ctx, state.write_ep);
+    grpc_endpoint_shutdown(&exec_ctx, state.write_ep,
+                           GRPC_ERROR_CREATE("Test Shutdown"));
   }
   grpc_exec_ctx_flush(&exec_ctx);
 
@@ -296,7 +298,8 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
                      grpc_closure_create(inc_on_failure, &fail_count,
                                          grpc_schedule_on_exec_ctx));
   wait_for_fail_count(&exec_ctx, &fail_count, 0);
-  grpc_endpoint_shutdown(&exec_ctx, f.client_ep);
+  grpc_endpoint_shutdown(&exec_ctx, f.client_ep,
+                         GRPC_ERROR_CREATE("Test Shutdown"));
   wait_for_fail_count(&exec_ctx, &fail_count, 1);
   grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer,
                      grpc_closure_create(inc_on_failure, &fail_count,
@@ -307,7 +310,8 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
                       grpc_closure_create(inc_on_failure, &fail_count,
                                           grpc_schedule_on_exec_ctx));
   wait_for_fail_count(&exec_ctx, &fail_count, 3);
-  grpc_endpoint_shutdown(&exec_ctx, f.client_ep);
+  grpc_endpoint_shutdown(&exec_ctx, f.client_ep,
+                         GRPC_ERROR_CREATE("Test Shutdown"));
   wait_for_fail_count(&exec_ctx, &fail_count, 3);
 
   grpc_slice_buffer_destroy_internal(&exec_ctx, &slice_buffer);

+ 2 - 1
test/core/iomgr/ev_epoll_linux_test.c

@@ -89,7 +89,8 @@ static void test_fd_cleanup(grpc_exec_ctx *exec_ctx, test_fd *tfds,
   int i;
 
   for (i = 0; i < num_fds; i++) {
-    grpc_fd_shutdown(exec_ctx, tfds[i].fd);
+    grpc_fd_shutdown(exec_ctx, tfds[i].fd,
+                     GRPC_ERROR_CREATE("test_fd_cleanup"));
     grpc_exec_ctx_flush(exec_ctx);
 
     grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, "test_fd_cleanup");

+ 2 - 1
test/core/iomgr/fd_posix_test.c

@@ -132,7 +132,8 @@ static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
   grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, "a");
   gpr_free(se);
   /* Start to shutdown listen fd. */
-  grpc_fd_shutdown(exec_ctx, sv->em_fd);
+  grpc_fd_shutdown(exec_ctx, sv->em_fd,
+                   GRPC_ERROR_CREATE("session_shutdown_cb"));
 }
 
 /* Called when data become readable in a session. */

+ 2 - 1
test/core/iomgr/tcp_client_posix_test.c

@@ -72,7 +72,8 @@ static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg,
                          grpc_error *error) {
   GPR_ASSERT(g_connecting != NULL);
   GPR_ASSERT(error == GRPC_ERROR_NONE);
-  grpc_endpoint_shutdown(exec_ctx, g_connecting);
+  grpc_endpoint_shutdown(exec_ctx, g_connecting,
+                         GRPC_ERROR_CREATE("must_succeed called"));
   grpc_endpoint_destroy(exec_ctx, g_connecting);
   g_connecting = NULL;
   finish_connection();

+ 1 - 1
test/core/iomgr/tcp_server_posix_test.c

@@ -121,7 +121,7 @@ static void server_weak_ref_set(server_weak_ref *weak_ref,
 static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
                        grpc_pollset *pollset,
                        grpc_tcp_server_acceptor *acceptor) {
-  grpc_endpoint_shutdown(exec_ctx, tcp);
+  grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected"));
   grpc_endpoint_destroy(exec_ctx, tcp);
 
   on_connect_result temp_result;

+ 4 - 2
test/core/security/secure_endpoint_test.c

@@ -166,8 +166,10 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
   GPR_ASSERT(incoming.count == 1);
   GPR_ASSERT(grpc_slice_eq(s, incoming.slices[0]));
 
-  grpc_endpoint_shutdown(&exec_ctx, f.client_ep);
-  grpc_endpoint_shutdown(&exec_ctx, f.server_ep);
+  grpc_endpoint_shutdown(&exec_ctx, f.client_ep,
+                         GRPC_ERROR_CREATE("test_leftover end"));
+  grpc_endpoint_shutdown(&exec_ctx, f.server_ep,
+                         GRPC_ERROR_CREATE("test_leftover end"));
   grpc_endpoint_destroy(&exec_ctx, f.client_ep);
   grpc_endpoint_destroy(&exec_ctx, f.server_ep);
   grpc_exec_ctx_finish(&exec_ctx);

+ 2 - 1
test/core/security/ssl_server_fuzzer.c

@@ -121,7 +121,8 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
   // server will wait for more data. Explicitly fail the server by shutting down
   // the endpoint.
   if (!state.done_callback_called) {
-    grpc_endpoint_shutdown(&exec_ctx, mock_endpoint);
+    grpc_endpoint_shutdown(&exec_ctx, mock_endpoint,
+                           GRPC_ERROR_CREATE("Explicit close"));
     grpc_exec_ctx_flush(&exec_ctx);
   }
 

+ 1 - 1
test/core/surface/concurrent_connectivity_test.c

@@ -107,7 +107,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *vargs, grpc_endpoint *tcp,
                        grpc_tcp_server_acceptor *acceptor) {
   gpr_free(acceptor);
   struct server_thread_args *args = (struct server_thread_args *)vargs;
-  grpc_endpoint_shutdown(exec_ctx, tcp);
+  grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected"));
   grpc_endpoint_destroy(exec_ctx, tcp);
   GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, NULL));
 }

+ 5 - 3
test/core/util/mock_endpoint.c

@@ -78,16 +78,18 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
 static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
                                   grpc_pollset_set *pollset) {}
 
-static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+                        grpc_error *why) {
   grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
   gpr_mu_lock(&m->mu);
   if (m->on_read) {
-    grpc_closure_sched(exec_ctx, m->on_read,
-                       GRPC_ERROR_CREATE("Endpoint Shutdown"));
+    grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE_REFERENCING(
+                                                 "Endpoint Shutdown", &why, 1));
     m->on_read = NULL;
   }
   gpr_mu_unlock(&m->mu);
   grpc_resource_user_shutdown(exec_ctx, m->resource_user);
+  GRPC_ERROR_UNREF(why);
 }
 
 static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {

+ 7 - 3
test/core/util/passthru_endpoint.c

@@ -109,21 +109,25 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
 static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
                                   grpc_pollset_set *pollset) {}
 
-static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+                        grpc_error *why) {
   half *m = (half *)ep;
   gpr_mu_lock(&m->parent->mu);
   m->parent->shutdown = true;
   if (m->on_read) {
-    grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"));
+    grpc_closure_sched(exec_ctx, m->on_read,
+                       GRPC_ERROR_CREATE_REFERENCING("Shutdown", &why, 1));
     m->on_read = NULL;
   }
   m = other_half(m);
   if (m->on_read) {
-    grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"));
+    grpc_closure_sched(exec_ctx, m->on_read,
+                       GRPC_ERROR_CREATE_REFERENCING("Shutdown", &why, 1));
     m->on_read = NULL;
   }
   gpr_mu_unlock(&m->parent->mu);
   grpc_resource_user_shutdown(exec_ctx, m->resource_user);
+  GRPC_ERROR_UNREF(why);
 }
 
 static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {

+ 1 - 1
test/core/util/reconnect_server.c

@@ -80,7 +80,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
   gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
   timestamp_list *new_tail;
   peer = grpc_endpoint_get_peer(tcp);
-  grpc_endpoint_shutdown(exec_ctx, tcp);
+  grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected"));
   grpc_endpoint_destroy(exec_ctx, tcp);
   if (peer) {
     last_colon = strrchr(peer, ':');

+ 74 - 26
test/cpp/microbenchmarks/bm_fullstack.cc

@@ -84,6 +84,16 @@ static class InitializeStuff {
  * FIXTURES
  */
 
+static void ApplyCommonServerBuilderConfig(ServerBuilder* b) {
+  b->SetMaxReceiveMessageSize(INT_MAX);
+  b->SetMaxSendMessageSize(INT_MAX);
+}
+
+static void ApplyCommonChannelArguments(ChannelArguments* c) {
+  c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX);
+  c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX);
+}
+
 class FullstackFixture {
  public:
   FullstackFixture(Service* service, const grpc::string& address) {
@@ -91,8 +101,11 @@ class FullstackFixture {
     b.AddListeningPort(address, InsecureServerCredentials());
     cq_ = b.AddCompletionQueue(true);
     b.RegisterService(service);
+    ApplyCommonServerBuilderConfig(&b);
     server_ = b.BuildAndStart();
-    channel_ = CreateChannel(address, InsecureChannelCredentials());
+    ChannelArguments args;
+    ApplyCommonChannelArguments(&args);
+    channel_ = CreateCustomChannel(address, InsecureChannelCredentials(), args);
   }
 
   virtual ~FullstackFixture() {
@@ -146,6 +159,7 @@ class EndpointPairFixture {
     ServerBuilder b;
     cq_ = b.AddCompletionQueue(true);
     b.RegisterService(service);
+    ApplyCommonServerBuilderConfig(&b);
     server_ = b.BuildAndStart();
 
     grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -174,6 +188,7 @@ class EndpointPairFixture {
     {
       ChannelArguments args;
       args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority");
+      ApplyCommonChannelArguments(&args);
 
       grpc_channel_args c_args = args.c_channel_args();
       grpc_transport* transport =
@@ -343,6 +358,12 @@ static void BM_UnaryPingPong(benchmark::State& state) {
   EchoRequest send_request;
   EchoResponse send_response;
   EchoResponse recv_response;
+  if (state.range(0) > 0) {
+    send_request.set_message(std::string(state.range(0), 'a'));
+  }
+  if (state.range(1) > 0) {
+    send_response.set_message(std::string(state.range(1), 'a'));
+  }
   Status recv_status;
   struct ServerEnv {
     ServerContext ctx;
@@ -365,6 +386,7 @@ static void BM_UnaryPingPong(benchmark::State& state) {
   std::unique_ptr<EchoTestService::Stub> stub(
       EchoTestService::NewStub(fixture->channel()));
   while (state.KeepRunning()) {
+    recv_response.Clear();
     ClientContext cli_ctx;
     ClientContextMutator cli_ctx_mut(&cli_ctx);
     std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
@@ -396,55 +418,81 @@ static void BM_UnaryPingPong(benchmark::State& state) {
   fixture.reset();
   server_env[0]->~ServerEnv();
   server_env[1]->~ServerEnv();
+  state.SetBytesProcessed(state.range(0) * state.iterations() +
+                          state.range(1) * state.iterations());
 }
 
 /*******************************************************************************
  * CONFIGURATIONS
  */
 
-BENCHMARK_TEMPLATE(BM_UnaryPingPong, TCP, NoOpMutator, NoOpMutator);
-BENCHMARK_TEMPLATE(BM_UnaryPingPong, UDS, NoOpMutator, NoOpMutator);
-BENCHMARK_TEMPLATE(BM_UnaryPingPong, SockPair, NoOpMutator, NoOpMutator);
-BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, NoOpMutator);
+static void SweepSizesArgs(benchmark::internal::Benchmark* b) {
+  b->Args({0, 0});
+  for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) {
+    b->Args({i, 0});
+    b->Args({0, i});
+    b->Args({i, i});
+  }
+}
+
+BENCHMARK_TEMPLATE(BM_UnaryPingPong, TCP, NoOpMutator, NoOpMutator)
+    ->Apply(SweepSizesArgs);
+BENCHMARK_TEMPLATE(BM_UnaryPingPong, UDS, NoOpMutator, NoOpMutator)
+    ->Args({0, 0});
+BENCHMARK_TEMPLATE(BM_UnaryPingPong, SockPair, NoOpMutator, NoOpMutator)
+    ->Args({0, 0});
+BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, NoOpMutator)
+    ->Apply(SweepSizesArgs);
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
-                   Client_AddMetadata<RandomBinaryMetadata<10>, 1>,
-                   NoOpMutator);
+                   Client_AddMetadata<RandomBinaryMetadata<10>, 1>, NoOpMutator)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
-                   Client_AddMetadata<RandomBinaryMetadata<31>, 1>,
-                   NoOpMutator);
+                   Client_AddMetadata<RandomBinaryMetadata<31>, 1>, NoOpMutator)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
                    Client_AddMetadata<RandomBinaryMetadata<100>, 1>,
-                   NoOpMutator);
+                   NoOpMutator)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
-                   Client_AddMetadata<RandomBinaryMetadata<10>, 2>,
-                   NoOpMutator);
+                   Client_AddMetadata<RandomBinaryMetadata<10>, 2>, NoOpMutator)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
-                   Client_AddMetadata<RandomBinaryMetadata<31>, 2>,
-                   NoOpMutator);
+                   Client_AddMetadata<RandomBinaryMetadata<31>, 2>, NoOpMutator)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
                    Client_AddMetadata<RandomBinaryMetadata<100>, 2>,
-                   NoOpMutator);
+                   NoOpMutator)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
-                   Server_AddInitialMetadata<RandomBinaryMetadata<10>, 1>);
+                   Server_AddInitialMetadata<RandomBinaryMetadata<10>, 1>)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
-                   Server_AddInitialMetadata<RandomBinaryMetadata<31>, 1>);
+                   Server_AddInitialMetadata<RandomBinaryMetadata<31>, 1>)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
-                   Server_AddInitialMetadata<RandomBinaryMetadata<100>, 1>);
+                   Server_AddInitialMetadata<RandomBinaryMetadata<100>, 1>)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
-                   Client_AddMetadata<RandomAsciiMetadata<10>, 1>, NoOpMutator);
+                   Client_AddMetadata<RandomAsciiMetadata<10>, 1>, NoOpMutator)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
-                   Client_AddMetadata<RandomAsciiMetadata<31>, 1>, NoOpMutator);
+                   Client_AddMetadata<RandomAsciiMetadata<31>, 1>, NoOpMutator)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
-                   Client_AddMetadata<RandomAsciiMetadata<100>, 1>,
-                   NoOpMutator);
+                   Client_AddMetadata<RandomAsciiMetadata<100>, 1>, NoOpMutator)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
-                   Server_AddInitialMetadata<RandomAsciiMetadata<10>, 1>);
+                   Server_AddInitialMetadata<RandomAsciiMetadata<10>, 1>)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
-                   Server_AddInitialMetadata<RandomAsciiMetadata<31>, 1>);
+                   Server_AddInitialMetadata<RandomAsciiMetadata<31>, 1>)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
-                   Server_AddInitialMetadata<RandomAsciiMetadata<100>, 1>);
+                   Server_AddInitialMetadata<RandomAsciiMetadata<100>, 1>)
+    ->Args({0, 0});
 BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
-                   Server_AddInitialMetadata<RandomAsciiMetadata<10>, 100>);
+                   Server_AddInitialMetadata<RandomAsciiMetadata<10>, 100>)
+    ->Args({0, 0});
 
 }  // namespace testing
 }  // namespace grpc

+ 145 - 30
test/cpp/util/cli_call.cc

@@ -37,8 +37,6 @@
 
 #include <grpc++/channel.h>
 #include <grpc++/client_context.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/generic/generic_stub.h>
 #include <grpc++/support/byte_buffer.h>
 #include <grpc/grpc.h>
 #include <grpc/slice.h>
@@ -56,55 +54,172 @@ Status CliCall::Call(std::shared_ptr<grpc::Channel> channel,
                      const OutgoingMetadataContainer& metadata,
                      IncomingMetadataContainer* server_initial_metadata,
                      IncomingMetadataContainer* server_trailing_metadata) {
-  std::unique_ptr<grpc::GenericStub> stub(new grpc::GenericStub(channel));
-  grpc::ClientContext ctx;
+  CliCall call(channel, method, metadata);
+  call.Write(request);
+  call.WritesDone();
+  if (!call.Read(response, server_initial_metadata)) {
+    fprintf(stderr, "Failed to read response.\n");
+  }
+  return call.Finish(server_trailing_metadata);
+}
+
+CliCall::CliCall(std::shared_ptr<grpc::Channel> channel,
+                 const grpc::string& method,
+                 const OutgoingMetadataContainer& metadata)
+    : stub_(new grpc::GenericStub(channel)) {
+  gpr_mu_init(&write_mu_);
+  gpr_cv_init(&write_cv_);
   if (!metadata.empty()) {
     for (OutgoingMetadataContainer::const_iterator iter = metadata.begin();
          iter != metadata.end(); ++iter) {
-      ctx.AddMetadata(iter->first, iter->second);
+      ctx_.AddMetadata(iter->first, iter->second);
     }
   }
-  grpc::CompletionQueue cq;
-  std::unique_ptr<grpc::GenericClientAsyncReaderWriter> call(
-      stub->Call(&ctx, method, &cq, tag(1)));
+  call_ = stub_->Call(&ctx_, method, &cq_, tag(1));
   void* got_tag;
   bool ok;
-  cq.Next(&got_tag, &ok);
+  cq_.Next(&got_tag, &ok);
   GPR_ASSERT(ok);
+}
+
+CliCall::~CliCall() {
+  gpr_cv_destroy(&write_cv_);
+  gpr_mu_destroy(&write_mu_);
+}
+
+void CliCall::Write(const grpc::string& request) {
+  void* got_tag;
+  bool ok;
 
   grpc_slice s = grpc_slice_from_copied_string(request.c_str());
   grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
   grpc::ByteBuffer send_buffer(&req_slice, 1);
-  call->Write(send_buffer, tag(2));
-  cq.Next(&got_tag, &ok);
-  GPR_ASSERT(ok);
-  call->WritesDone(tag(3));
-  cq.Next(&got_tag, &ok);
+  call_->Write(send_buffer, tag(2));
+  cq_.Next(&got_tag, &ok);
   GPR_ASSERT(ok);
+}
+
+bool CliCall::Read(grpc::string* response,
+                   IncomingMetadataContainer* server_initial_metadata) {
+  void* got_tag;
+  bool ok;
+
   grpc::ByteBuffer recv_buffer;
-  call->Read(&recv_buffer, tag(4));
-  cq.Next(&got_tag, &ok);
-  if (!ok) {
-    std::cout << "Failed to read response." << std::endl;
+  call_->Read(&recv_buffer, tag(3));
+
+  if (!cq_.Next(&got_tag, &ok) || !ok) {
+    return false;
   }
-  grpc::Status status;
-  call->Finish(&status, tag(5));
-  cq.Next(&got_tag, &ok);
+  std::vector<grpc::Slice> slices;
+  recv_buffer.Dump(&slices);
+
+  response->clear();
+  for (size_t i = 0; i < slices.size(); i++) {
+    response->append(reinterpret_cast<const char*>(slices[i].begin()),
+                     slices[i].size());
+  }
+  if (server_initial_metadata) {
+    *server_initial_metadata = ctx_.GetServerInitialMetadata();
+  }
+  return true;
+}
+
+void CliCall::WritesDone() {
+  void* got_tag;
+  bool ok;
+
+  call_->WritesDone(tag(4));
+  cq_.Next(&got_tag, &ok);
   GPR_ASSERT(ok);
+}
 
-  if (status.ok()) {
-    std::vector<grpc::Slice> slices;
-    (void)recv_buffer.Dump(&slices);
+void CliCall::WriteAndWait(const grpc::string& request) {
+  grpc_slice s = grpc_slice_from_copied_string(request.c_str());
+  grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
+  grpc::ByteBuffer send_buffer(&req_slice, 1);
+
+  gpr_mu_lock(&write_mu_);
+  call_->Write(send_buffer, tag(2));
+  write_done_ = false;
+  while (!write_done_) {
+    gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
+  }
+  gpr_mu_unlock(&write_mu_);
+}
+
+void CliCall::WritesDoneAndWait() {
+  gpr_mu_lock(&write_mu_);
+  call_->WritesDone(tag(4));
+  write_done_ = false;
+  while (!write_done_) {
+    gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
+  }
+  gpr_mu_unlock(&write_mu_);
+}
 
-    response->clear();
-    for (size_t i = 0; i < slices.size(); i++) {
-      response->append(reinterpret_cast<const char*>(slices[i].begin()),
-                       slices[i].size());
+bool CliCall::ReadAndMaybeNotifyWrite(
+    grpc::string* response,
+    IncomingMetadataContainer* server_initial_metadata) {
+  void* got_tag;
+  bool ok;
+  grpc::ByteBuffer recv_buffer;
+
+  call_->Read(&recv_buffer, tag(3));
+  bool cq_result = cq_.Next(&got_tag, &ok);
+
+  while (got_tag != tag(3)) {
+    gpr_mu_lock(&write_mu_);
+    write_done_ = true;
+    gpr_cv_signal(&write_cv_);
+    gpr_mu_unlock(&write_mu_);
+
+    cq_result = cq_.Next(&got_tag, &ok);
+    if (got_tag == tag(2)) {
+      GPR_ASSERT(ok);
     }
   }
 
-  *server_initial_metadata = ctx.GetServerInitialMetadata();
-  *server_trailing_metadata = ctx.GetServerTrailingMetadata();
+  if (!cq_result || !ok) {
+    // If the RPC is ended on the server side, we should still wait for the
+    // pending write on the client side to be done.
+    if (!ok) {
+      gpr_mu_lock(&write_mu_);
+      if (!write_done_) {
+        cq_.Next(&got_tag, &ok);
+        GPR_ASSERT(got_tag != tag(2));
+        write_done_ = true;
+        gpr_cv_signal(&write_cv_);
+      }
+      gpr_mu_unlock(&write_mu_);
+    }
+    return false;
+  }
+
+  std::vector<grpc::Slice> slices;
+  recv_buffer.Dump(&slices);
+  response->clear();
+  for (size_t i = 0; i < slices.size(); i++) {
+    response->append(reinterpret_cast<const char*>(slices[i].begin()),
+                     slices[i].size());
+  }
+  if (server_initial_metadata) {
+    *server_initial_metadata = ctx_.GetServerInitialMetadata();
+  }
+  return true;
+}
+
+Status CliCall::Finish(IncomingMetadataContainer* server_trailing_metadata) {
+  void* got_tag;
+  bool ok;
+  grpc::Status status;
+
+  call_->Finish(&status, tag(5));
+  cq_.Next(&got_tag, &ok);
+  GPR_ASSERT(ok);
+  if (server_trailing_metadata) {
+    *server_trailing_metadata = ctx_.GetServerTrailingMetadata();
+  }
+
   return status;
 }
 

+ 51 - 0
test/cpp/util/cli_call.h

@@ -37,23 +37,74 @@
 #include <map>
 
 #include <grpc++/channel.h>
+#include <grpc++/completion_queue.h>
+#include <grpc++/generic/generic_stub.h>
 #include <grpc++/support/status.h>
 #include <grpc++/support/string_ref.h>
 
 namespace grpc {
+
+class ClientContext;
+
 namespace testing {
 
+// CliCall handles the sending and receiving of generic messages given the name
+// of the remote method. This class is only used by GrpcTool. Its thread-safe
+// and thread-unsafe methods should not be used together.
 class CliCall final {
  public:
   typedef std::multimap<grpc::string, grpc::string> OutgoingMetadataContainer;
   typedef std::multimap<grpc::string_ref, grpc::string_ref>
       IncomingMetadataContainer;
+
+  CliCall(std::shared_ptr<grpc::Channel> channel, const grpc::string& method,
+          const OutgoingMetadataContainer& metadata);
+  ~CliCall();
+
+  // Perform an unary generic RPC.
   static Status Call(std::shared_ptr<grpc::Channel> channel,
                      const grpc::string& method, const grpc::string& request,
                      grpc::string* response,
                      const OutgoingMetadataContainer& metadata,
                      IncomingMetadataContainer* server_initial_metadata,
                      IncomingMetadataContainer* server_trailing_metadata);
+
+  // Send a generic request message in a synchronous manner. NOT thread-safe.
+  void Write(const grpc::string& request);
+
+  // Send a generic request message in a synchronous manner. NOT thread-safe.
+  void WritesDone();
+
+  // Receive a generic response message in a synchronous manner.NOT thread-safe.
+  bool Read(grpc::string* response,
+            IncomingMetadataContainer* server_initial_metadata);
+
+  // Thread-safe write. Must be used with ReadAndMaybeNotifyWrite. Send out a
+  // generic request message and wait for ReadAndMaybeNotifyWrite to finish it.
+  void WriteAndWait(const grpc::string& request);
+
+  // Thread-safe WritesDone. Must be used with ReadAndMaybeNotifyWrite. Send out
+  // WritesDone for gereneric request messages and wait for
+  // ReadAndMaybeNotifyWrite to finish it.
+  void WritesDoneAndWait();
+
+  // Thread-safe Read. Blockingly receive a generic response message. Notify
+  // writes if they are finished when this read is waiting for a resposne.
+  bool ReadAndMaybeNotifyWrite(
+      grpc::string* response,
+      IncomingMetadataContainer* server_initial_metadata);
+
+  // Finish the RPC.
+  Status Finish(IncomingMetadataContainer* server_trailing_metadata);
+
+ private:
+  std::unique_ptr<grpc::GenericStub> stub_;
+  grpc::ClientContext ctx_;
+  std::unique_ptr<grpc::GenericClientAsyncReaderWriter> call_;
+  grpc::CompletionQueue cq_;
+  gpr_mu write_mu_;
+  gpr_cv write_cv_;  // Protected by write_mu_;
+  bool write_done_;  // Portected by write_mu_;
 };
 
 }  // namespace testing

+ 3 - 3
test/cpp/util/grpc_cli.cc

@@ -83,10 +83,10 @@ DEFINE_string(outfile, "", "Output file (default is stdout)");
 static bool SimplePrint(const grpc::string& outfile,
                         const grpc::string& output) {
   if (outfile.empty()) {
-    std::cout << output;
+    std::cout << output << std::endl;
   } else {
-    std::ofstream output_file(outfile, std::ios::trunc | std::ios::binary);
-    output_file << output;
+    std::ofstream output_file(outfile, std::ios::app | std::ios::binary);
+    output_file << output << std::endl;
     output_file.close();
   }
   return true;

+ 195 - 58
test/cpp/util/grpc_tool.cc

@@ -39,6 +39,7 @@
 #include <memory>
 #include <sstream>
 #include <string>
+#include <thread>
 
 #include <gflags/gflags.h>
 #include <grpc++/channel.h>
@@ -159,6 +160,36 @@ void PrintMetadata(const T& m, const grpc::string& message) {
   }
 }
 
+void ReadResponse(CliCall* call, const grpc::string& method_name,
+                  GrpcToolOutputCallback callback, ProtoFileParser* parser,
+                  gpr_mu* parser_mu, bool print_mode) {
+  grpc::string serialized_response_proto;
+  std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata;
+
+  for (bool receive_initial_metadata = true; call->ReadAndMaybeNotifyWrite(
+           &serialized_response_proto,
+           receive_initial_metadata ? &server_initial_metadata : nullptr);
+       receive_initial_metadata = false) {
+    fprintf(stderr, "got response.\n");
+    if (!FLAGS_binary_output) {
+      gpr_mu_lock(parser_mu);
+      serialized_response_proto = parser->GetTextFormatFromMethod(
+          method_name, serialized_response_proto, false /* is_request */);
+      if (parser->HasError() && print_mode) {
+        fprintf(stderr, "Failed to parse response.\n");
+      }
+      gpr_mu_unlock(parser_mu);
+    }
+    if (receive_initial_metadata) {
+      PrintMetadata(server_initial_metadata,
+                    "Received initial metadata from server:");
+    }
+    if (!callback(serialized_response_proto) && print_mode) {
+      fprintf(stderr, "Failed to output response.\n");
+    }
+  }
+}
+
 struct Command {
   const char* command;
   std::function<bool(GrpcTool*, int, const char**, const CliCredentials&,
@@ -416,85 +447,191 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
   grpc::string server_address(argv[0]);
   grpc::string method_name(argv[1]);
   grpc::string formatted_method_name;
-  std::unique_ptr<grpc::testing::ProtoFileParser> parser;
+  std::unique_ptr<ProtoFileParser> parser;
   grpc::string serialized_request_proto;
+  bool print_mode = false;
 
-  if (argc == 3) {
-    request_text = argv[2];
-    if (!FLAGS_infile.empty()) {
-      fprintf(stderr, "warning: request given in argv, ignoring --infile\n");
-    }
+  std::shared_ptr<grpc::Channel> channel =
+      FLAGS_remotedb
+          ? grpc::CreateChannel(server_address, cred.GetCredentials())
+          : nullptr;
+
+  parser.reset(new grpc::testing::ProtoFileParser(channel, FLAGS_proto_path,
+                                                  FLAGS_protofiles));
+
+  if (FLAGS_binary_input) {
+    formatted_method_name = method_name;
   } else {
-    std::stringstream input_stream;
+    formatted_method_name = parser->GetFormattedMethodName(method_name);
+  }
+
+  if (parser->HasError()) {
+    return false;
+  }
+
+  if (parser->IsStreaming(method_name, true /* is_request */)) {
+    std::istream* input_stream;
+    std::ifstream input_file;
+
+    if (argc == 3) {
+      request_text = argv[2];
+    }
+
+    std::multimap<grpc::string, grpc::string> client_metadata;
+    ParseMetadataFlag(&client_metadata);
+    PrintMetadata(client_metadata, "Sending client initial metadata:");
+
+    CliCall call(channel, formatted_method_name, client_metadata);
+
     if (FLAGS_infile.empty()) {
       if (isatty(STDIN_FILENO)) {
-        fprintf(stderr, "reading request message from stdin...\n");
+        print_mode = true;
+        fprintf(stderr, "reading streaming request message from stdin...\n");
       }
-      input_stream << std::cin.rdbuf();
+      input_stream = &std::cin;
     } else {
-      std::ifstream input_file(FLAGS_infile, std::ios::in | std::ios::binary);
-      input_stream << input_file.rdbuf();
+      input_file.open(FLAGS_infile, std::ios::in | std::ios::binary);
+      input_stream = &input_file;
+    }
+
+    gpr_mu parser_mu;
+    gpr_mu_init(&parser_mu);
+    std::thread read_thread(ReadResponse, &call, method_name, callback,
+                            parser.get(), &parser_mu, print_mode);
+
+    std::stringstream request_ss;
+    grpc::string line;
+    while (!request_text.empty() ||
+           (!input_stream->eof() && getline(*input_stream, line))) {
+      if (!request_text.empty()) {
+        if (FLAGS_binary_input) {
+          serialized_request_proto = request_text;
+          request_text.clear();
+        } else {
+          gpr_mu_lock(&parser_mu);
+          serialized_request_proto = parser->GetSerializedProtoFromMethod(
+              method_name, request_text, true /* is_request */);
+          request_text.clear();
+          if (parser->HasError()) {
+            if (print_mode) {
+              fprintf(stderr, "Failed to parse request.\n");
+            }
+            gpr_mu_unlock(&parser_mu);
+            continue;
+          }
+          gpr_mu_unlock(&parser_mu);
+        }
+
+        call.WriteAndWait(serialized_request_proto);
+        if (print_mode) {
+          fprintf(stderr, "Request sent.\n");
+        }
+      } else {
+        if (line.length() == 0) {
+          request_text = request_ss.str();
+          request_ss.str(grpc::string());
+          request_ss.clear();
+        } else {
+          request_ss << line << ' ';
+        }
+      }
+    }
+    if (input_file.is_open()) {
       input_file.close();
     }
-    request_text = input_stream.str();
-  }
 
-  std::shared_ptr<grpc::Channel> channel =
-      grpc::CreateChannel(server_address, cred.GetCredentials());
-  if (!FLAGS_binary_input || !FLAGS_binary_output) {
-    parser.reset(
-        new grpc::testing::ProtoFileParser(FLAGS_remotedb ? channel : nullptr,
-                                           FLAGS_proto_path, FLAGS_protofiles));
-    if (parser->HasError()) {
+    call.WritesDoneAndWait();
+    read_thread.join();
+
+    std::multimap<grpc::string_ref, grpc::string_ref> server_trailing_metadata;
+    Status status = call.Finish(&server_trailing_metadata);
+    PrintMetadata(server_trailing_metadata,
+                  "Received trailing metadata from server:");
+
+    if (status.ok()) {
+      fprintf(stderr, "Stream RPC succeeded with OK status\n");
+      return true;
+    } else {
+      fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
+              status.error_code(), status.error_message().c_str());
       return false;
     }
-  }
 
-  if (FLAGS_binary_input) {
-    serialized_request_proto = request_text;
-    formatted_method_name = method_name;
-  } else {
-    formatted_method_name = parser->GetFormattedMethodName(method_name);
-    serialized_request_proto = parser->GetSerializedProtoFromMethod(
-        method_name, request_text, true /* is_request */);
-    if (parser->HasError()) {
-      return false;
+  } else {  // parser->IsStreaming(method_name, true /* is_request */)
+    if (argc == 3) {
+      request_text = argv[2];
+      if (!FLAGS_infile.empty()) {
+        fprintf(stderr, "warning: request given in argv, ignoring --infile\n");
+      }
+    } else {
+      std::stringstream input_stream;
+      if (FLAGS_infile.empty()) {
+        if (isatty(STDIN_FILENO)) {
+          fprintf(stderr, "reading request message from stdin...\n");
+        }
+        input_stream << std::cin.rdbuf();
+      } else {
+        std::ifstream input_file(FLAGS_infile, std::ios::in | std::ios::binary);
+        input_stream << input_file.rdbuf();
+        input_file.close();
+      }
+      request_text = input_stream.str();
     }
-  }
-  fprintf(stderr, "connecting to %s\n", server_address.c_str());
 
-  grpc::string serialized_response_proto;
-  std::multimap<grpc::string, grpc::string> client_metadata;
-  std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata,
-      server_trailing_metadata;
-  ParseMetadataFlag(&client_metadata);
-  PrintMetadata(client_metadata, "Sending client initial metadata:");
-  grpc::Status status = grpc::testing::CliCall::Call(
-      channel, formatted_method_name, serialized_request_proto,
-      &serialized_response_proto, client_metadata, &server_initial_metadata,
-      &server_trailing_metadata);
-  PrintMetadata(server_initial_metadata,
-                "Received initial metadata from server:");
-  PrintMetadata(server_trailing_metadata,
-                "Received trailing metadata from server:");
-  if (status.ok()) {
-    fprintf(stderr, "Rpc succeeded with OK status\n");
-    if (FLAGS_binary_output) {
-      output_ss << serialized_response_proto;
+    if (FLAGS_binary_input) {
+      serialized_request_proto = request_text;
+      // formatted_method_name = method_name;
     } else {
-      grpc::string response_text = parser->GetTextFormatFromMethod(
-          method_name, serialized_response_proto, false /* is_request */);
+      // formatted_method_name = parser->GetFormattedMethodName(method_name);
+      serialized_request_proto = parser->GetSerializedProtoFromMethod(
+          method_name, request_text, true /* is_request */);
       if (parser->HasError()) {
         return false;
       }
-      output_ss << "Response: \n " << response_text << std::endl;
     }
-  } else {
-    fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
-            status.error_code(), status.error_message().c_str());
+    fprintf(stderr, "connecting to %s\n", server_address.c_str());
+
+    grpc::string serialized_response_proto;
+    std::multimap<grpc::string, grpc::string> client_metadata;
+    std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata,
+        server_trailing_metadata;
+    ParseMetadataFlag(&client_metadata);
+    PrintMetadata(client_metadata, "Sending client initial metadata:");
+
+    CliCall call(channel, formatted_method_name, client_metadata);
+    call.Write(serialized_request_proto);
+    call.WritesDone();
+
+    for (bool receive_initial_metadata = true; call.Read(
+             &serialized_response_proto,
+             receive_initial_metadata ? &server_initial_metadata : nullptr);
+         receive_initial_metadata = false) {
+      if (!FLAGS_binary_output) {
+        serialized_response_proto = parser->GetTextFormatFromMethod(
+            method_name, serialized_response_proto, false /* is_request */);
+        if (parser->HasError()) {
+          return false;
+        }
+      }
+      if (receive_initial_metadata) {
+        PrintMetadata(server_initial_metadata,
+                      "Received initial metadata from server:");
+      }
+      if (!callback(serialized_response_proto)) {
+        return false;
+      }
+    }
+    Status status = call.Finish(&server_trailing_metadata);
+    if (status.ok()) {
+      fprintf(stderr, "Rpc succeeded with OK status\n");
+      return true;
+    } else {
+      fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
+              status.error_code(), status.error_message().c_str());
+      return false;
+    }
   }
-
-  return callback(output_ss.str());
+  GPR_UNREACHABLE_CODE(return false);
 }
 
 bool GrpcTool::ParseMessage(int argc, const char** argv,

+ 193 - 0
test/cpp/util/grpc_tool_test.cc

@@ -102,6 +102,8 @@ DECLARE_bool(l);
 
 namespace {
 
+const int kNumResponseStreamsMsgs = 3;
+
 class TestCliCredentials final : public grpc::testing::CliCredentials {
  public:
   std::shared_ptr<grpc::ChannelCredentials> GetCredentials() const override {
@@ -137,6 +139,71 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
     response->set_message(request->message());
     return Status::OK;
   }
+
+  Status RequestStream(ServerContext* context,
+                       ServerReader<EchoRequest>* reader,
+                       EchoResponse* response) override {
+    EchoRequest request;
+    response->set_message("");
+    if (!context->client_metadata().empty()) {
+      for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
+               iter = context->client_metadata().begin();
+           iter != context->client_metadata().end(); ++iter) {
+        context->AddInitialMetadata(ToString(iter->first),
+                                    ToString(iter->second));
+      }
+    }
+    context->AddTrailingMetadata("trailing_key", "trailing_value");
+    while (reader->Read(&request)) {
+      response->mutable_message()->append(request.message());
+    }
+
+    return Status::OK;
+  }
+
+  Status ResponseStream(ServerContext* context, const EchoRequest* request,
+                        ServerWriter<EchoResponse>* writer) override {
+    if (!context->client_metadata().empty()) {
+      for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
+               iter = context->client_metadata().begin();
+           iter != context->client_metadata().end(); ++iter) {
+        context->AddInitialMetadata(ToString(iter->first),
+                                    ToString(iter->second));
+      }
+    }
+    context->AddTrailingMetadata("trailing_key", "trailing_value");
+
+    EchoResponse response;
+    for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
+      response.set_message(request->message() + grpc::to_string(i));
+      writer->Write(response);
+    }
+
+    return Status::OK;
+  }
+
+  Status BidiStream(
+      ServerContext* context,
+      ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
+    EchoRequest request;
+    EchoResponse response;
+    if (!context->client_metadata().empty()) {
+      for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
+               iter = context->client_metadata().begin();
+           iter != context->client_metadata().end(); ++iter) {
+        context->AddInitialMetadata(ToString(iter->first),
+                                    ToString(iter->second));
+      }
+    }
+    context->AddTrailingMetadata("trailing_key", "trailing_value");
+
+    while (stream->Read(&request)) {
+      response.set_message(request.message());
+      stream->Write(response);
+    }
+
+    return Status::OK;
+  }
 };
 
 }  // namespace
@@ -347,6 +414,132 @@ TEST_F(GrpcToolTest, CallCommand) {
   ShutdownServer();
 }
 
+TEST_F(GrpcToolTest, CallCommandRequestStream) {
+  // Test input: grpc_cli call localhost:<port> RequestStream "message:
+  // 'Hello0'"
+  std::stringstream output_stream;
+
+  const grpc::string server_address = SetUpServer();
+  const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+                        "RequestStream", "message: 'Hello0'"};
+
+  // Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n"
+  std::streambuf* orig = std::cin.rdbuf();
+  std::istringstream ss("message: 'Hello1'\n\n message: 'Hello2'\n\n");
+  std::cin.rdbuf(ss.rdbuf());
+
+  EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+                                   std::bind(PrintStream, &output_stream,
+                                             std::placeholders::_1)));
+
+  // Expected output: "message: \"Hello0Hello1Hello2\""
+  EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
+                             "message: \"Hello0Hello1Hello2\""));
+  std::cin.rdbuf(orig);
+  ShutdownServer();
+}
+
+TEST_F(GrpcToolTest, CallCommandRequestStreamWithBadRequest) {
+  // Test input: grpc_cli call localhost:<port> RequestStream "message:
+  // 'Hello0'"
+  std::stringstream output_stream;
+
+  const grpc::string server_address = SetUpServer();
+  const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+                        "RequestStream", "message: 'Hello0'"};
+
+  // Mock std::cin input "bad_field: 'Hello1'\n\n message: 'Hello2'\n\n"
+  std::streambuf* orig = std::cin.rdbuf();
+  std::istringstream ss("bad_field: 'Hello1'\n\n message: 'Hello2'\n\n");
+  std::cin.rdbuf(ss.rdbuf());
+
+  EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+                                   std::bind(PrintStream, &output_stream,
+                                             std::placeholders::_1)));
+
+  // Expected output: "message: \"Hello0Hello2\""
+  EXPECT_TRUE(NULL !=
+              strstr(output_stream.str().c_str(), "message: \"Hello0Hello2\""));
+  std::cin.rdbuf(orig);
+  ShutdownServer();
+}
+
+TEST_F(GrpcToolTest, CallCommandResponseStream) {
+  // Test input: grpc_cli call localhost:<port> ResponseStream "message:
+  // 'Hello'"
+  std::stringstream output_stream;
+
+  const grpc::string server_address = SetUpServer();
+  const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+                        "ResponseStream", "message: 'Hello'"};
+
+  EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+                                   std::bind(PrintStream, &output_stream,
+                                             std::placeholders::_1)));
+
+  // Expected output: "message: \"Hello{n}\""
+  for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
+    grpc::string expected_response_text =
+        "message: \"Hello" + grpc::to_string(i) + "\"\n";
+    EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
+                               expected_response_text.c_str()));
+  }
+
+  ShutdownServer();
+}
+
+TEST_F(GrpcToolTest, CallCommandBidiStream) {
+  // Test input: grpc_cli call localhost:<port> BidiStream "message: 'Hello0'"
+  std::stringstream output_stream;
+
+  const grpc::string server_address = SetUpServer();
+  const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+                        "BidiStream", "message: 'Hello0'"};
+
+  // Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n"
+  std::streambuf* orig = std::cin.rdbuf();
+  std::istringstream ss("message: 'Hello1'\n\n message: 'Hello2'\n\n");
+  std::cin.rdbuf(ss.rdbuf());
+
+  EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+                                   std::bind(PrintStream, &output_stream,
+                                             std::placeholders::_1)));
+
+  // Expected output: "message: \"Hello0\"\nmessage: \"Hello1\"\nmessage:
+  // \"Hello2\"\n\n"
+  EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
+                             "message: \"Hello0\"\nmessage: "
+                             "\"Hello1\"\nmessage: \"Hello2\"\n"));
+  std::cin.rdbuf(orig);
+  ShutdownServer();
+}
+
+TEST_F(GrpcToolTest, CallCommandBidiStreamWithBadRequest) {
+  // Test input: grpc_cli call localhost:<port> BidiStream "message: 'Hello0'"
+  std::stringstream output_stream;
+
+  const grpc::string server_address = SetUpServer();
+  const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+                        "BidiStream", "message: 'Hello0'"};
+
+  // Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n"
+  std::streambuf* orig = std::cin.rdbuf();
+  std::istringstream ss("message: 1.0\n\n message: 'Hello2'\n\n");
+  std::cin.rdbuf(ss.rdbuf());
+
+  EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+                                   std::bind(PrintStream, &output_stream,
+                                             std::placeholders::_1)));
+
+  // Expected output: "message: \"Hello0\"\nmessage: \"Hello1\"\nmessage:
+  // \"Hello2\"\n\n"
+  EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
+                             "message: \"Hello0\"\nmessage: \"Hello2\"\n"));
+  std::cin.rdbuf(orig);
+
+  ShutdownServer();
+}
+
 TEST_F(GrpcToolTest, ParseCommand) {
   // Test input "grpc_cli parse localhost:<port> grpc.testing.EchoResponse
   // ECHO_RESPONSE_MESSAGE"

+ 29 - 3
test/cpp/util/proto_file_parser.cc

@@ -81,8 +81,9 @@ class ErrorPrinter : public protobuf::compiler::MultiFileErrorCollector {
 ProtoFileParser::ProtoFileParser(std::shared_ptr<grpc::Channel> channel,
                                  const grpc::string& proto_path,
                                  const grpc::string& protofiles)
-    : has_error_(false) {
-  std::vector<grpc::string> service_list;
+    : has_error_(false),
+      dynamic_factory_(new protobuf::DynamicMessageFactory()) {
+  std::vector<std::string> service_list;
   if (channel) {
     reflection_db_.reset(new grpc::ProtoReflectionDescriptorDatabase(channel));
     reflection_db_->GetServices(&service_list);
@@ -127,7 +128,6 @@ ProtoFileParser::ProtoFileParser(std::shared_ptr<grpc::Channel> channel,
   }
 
   desc_pool_.reset(new protobuf::DescriptorPool(desc_db_.get()));
-  dynamic_factory_.reset(new protobuf::DynamicMessageFactory(desc_pool_.get()));
 
   for (auto it = service_list.begin(); it != service_list.end(); it++) {
     if (known_services.find(*it) == known_services.end()) {
@@ -144,6 +144,11 @@ ProtoFileParser::~ProtoFileParser() {}
 
 grpc::string ProtoFileParser::GetFullMethodName(const grpc::string& method) {
   has_error_ = false;
+
+  if (known_methods_.find(method) != known_methods_.end()) {
+    return known_methods_[method];
+  }
+
   const protobuf::MethodDescriptor* method_descriptor = nullptr;
   for (auto it = service_desc_list_.begin(); it != service_desc_list_.end();
        it++) {
@@ -169,6 +174,8 @@ grpc::string ProtoFileParser::GetFullMethodName(const grpc::string& method) {
     return "";
   }
 
+  known_methods_[method] = method_descriptor->full_name();
+
   return method_descriptor->full_name();
 }
 
@@ -205,6 +212,25 @@ grpc::string ProtoFileParser::GetMessageTypeFromMethod(
                     : method_desc->output_type()->full_name();
 }
 
+bool ProtoFileParser::IsStreaming(const grpc::string& method, bool is_request) {
+  has_error_ = false;
+
+  grpc::string full_method_name = GetFullMethodName(method);
+  if (has_error_) {
+    return false;
+  }
+
+  const protobuf::MethodDescriptor* method_desc =
+      desc_pool_->FindMethodByName(full_method_name);
+  if (!method_desc) {
+    LogError("Method not found");
+    return false;
+  }
+
+  return is_request ? method_desc->client_streaming()
+                    : method_desc->server_streaming();
+}
+
 grpc::string ProtoFileParser::GetSerializedProtoFromMethod(
     const grpc::string& method, const grpc::string& text_format_proto,
     bool is_request) {

Algúns arquivos non se mostraron porque demasiados arquivos cambiaron neste cambio