Эх сурвалжийг харах

Merge remote-tracking branch 'upstream/master' into cpp_filter_api_cleanup

Mark D. Roth 8 жил өмнө
parent
commit
f290686a10

+ 3 - 0
src/core/lib/channel/context.h

@@ -47,6 +47,9 @@ typedef enum {
   /// Value is a \a census_context.
   /// Value is a \a census_context.
   GRPC_CONTEXT_TRACING,
   GRPC_CONTEXT_TRACING,
 
 
+  /// Reserved for traffic_class_context.
+  GRPC_CONTEXT_TRAFFIC,
+
   GRPC_CONTEXT_COUNT
   GRPC_CONTEXT_COUNT
 } grpc_context_index;
 } grpc_context_index;
 
 

+ 1 - 1
src/python/grpcio_health_checking/MANIFEST.in

@@ -1,4 +1,4 @@
 include grpc_version.py
 include grpc_version.py
 include health_commands.py
 include health_commands.py
-graft grpc
+graft grpc_health
 global-exclude *.pyc
 global-exclude *.pyc

+ 129 - 11
test/cpp/util/grpc_tool.cc

@@ -86,11 +86,12 @@ class GrpcTool {
   // callback);
   // callback);
   // bool PrintTypeId(int argc, const char** argv, GrpcToolOutputCallback
   // bool PrintTypeId(int argc, const char** argv, GrpcToolOutputCallback
   // callback);
   // callback);
-  // bool ParseMessage(int argc, const char** argv, GrpcToolOutputCallback
-  // callback);
-  // bool ToText(int argc, const char** argv, GrpcToolOutputCallback callback);
-  // bool ToBinary(int argc, const char** argv, GrpcToolOutputCallback
-  // callback);
+  bool ParseMessage(int argc, const char** argv, const CliCredentials& cred,
+                    GrpcToolOutputCallback callback);
+  bool ToText(int argc, const char** argv, const CliCredentials& cred,
+              GrpcToolOutputCallback callback);
+  bool ToBinary(int argc, const char** argv, const CliCredentials& cred,
+                GrpcToolOutputCallback callback);
 
 
   void SetPrintCommandMode(int exit_status) {
   void SetPrintCommandMode(int exit_status) {
     print_command_usage_ = true;
     print_command_usage_ = true;
@@ -173,9 +174,9 @@ const Command ops[] = {
     {"list", BindWith5Args(&GrpcTool::ListServices), 1, 3},
     {"list", BindWith5Args(&GrpcTool::ListServices), 1, 3},
     {"call", BindWith5Args(&GrpcTool::CallMethod), 2, 3},
     {"call", BindWith5Args(&GrpcTool::CallMethod), 2, 3},
     {"type", BindWith5Args(&GrpcTool::PrintType), 2, 2},
     {"type", BindWith5Args(&GrpcTool::PrintType), 2, 2},
-    // {"parse", BindWith5Args(&GrpcTool::ParseMessage), 2, 3},
-    // {"totext", BindWith5Args(&GrpcTool::ToText), 2, 3},
-    // {"tobinary", BindWith5Args(&GrpcTool::ToBinary), 2, 3},
+    {"parse", BindWith5Args(&GrpcTool::ParseMessage), 2, 3},
+    {"totext", BindWith5Args(&GrpcTool::ToText), 2, 3},
+    {"tobinary", BindWith5Args(&GrpcTool::ToBinary), 2, 3},
 };
 };
 
 
 void Usage(const grpc::string& msg) {
 void Usage(const grpc::string& msg) {
@@ -185,9 +186,9 @@ void Usage(const grpc::string& msg) {
       "  grpc_cli ls ...         ; List services\n"
       "  grpc_cli ls ...         ; List services\n"
       "  grpc_cli call ...       ; Call method\n"
       "  grpc_cli call ...       ; Call method\n"
       "  grpc_cli type ...       ; Print type\n"
       "  grpc_cli type ...       ; Print type\n"
-      // "  grpc_cli parse ...      ; Parse message\n"
-      // "  grpc_cli totext ...     ; Convert binary message to text\n"
-      // "  grpc_cli tobinary ...   ; Convert text message to binary\n"
+      "  grpc_cli parse ...      ; Parse message\n"
+      "  grpc_cli totext ...     ; Convert binary message to text\n"
+      "  grpc_cli tobinary ...   ; Convert text message to binary\n"
       "  grpc_cli help ...       ; Print this message, or per-command usage\n"
       "  grpc_cli help ...       ; Print this message, or per-command usage\n"
       "\n",
       "\n",
       msg.c_str());
       msg.c_str());
@@ -496,5 +497,122 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
   return callback(output_ss.str());
   return callback(output_ss.str());
 }
 }
 
 
+bool GrpcTool::ParseMessage(int argc, const char** argv,
+                            const CliCredentials& cred,
+                            GrpcToolOutputCallback callback) {
+  CommandUsage(
+      "Parse message\n"
+      "  grpc_cli parse <address> <type> [<message>]\n"
+      "    <address>                ; host:port\n"
+      "    <type>                   ; Protocol buffer type name\n"
+      "    <message>                ; Text protobuffer (overrides --infile)\n"
+      "    --protofiles             ; Comma separated proto files used as a"
+      " fallback when parsing request/response\n"
+      "    --proto_path             ; The search path of proto files, valid"
+      " only when --protofiles is given\n"
+      "    --infile                 ; Input filename (defaults to stdin)\n"
+      "    --outfile                ; Output filename (defaults to stdout)\n"
+      "    --binary_input           ; Input in binary format\n"
+      "    --binary_output          ; Output in binary format\n" +
+      cred.GetCredentialUsage());
+
+  std::stringstream output_ss;
+  grpc::string message_text;
+  grpc::string server_address(argv[0]);
+  grpc::string type_name(argv[1]);
+  std::unique_ptr<grpc::testing::ProtoFileParser> parser;
+  grpc::string serialized_request_proto;
+
+  if (argc == 3) {
+    message_text = argv[2];
+    if (!FLAGS_infile.empty()) {
+      fprintf(stderr, "warning: message given in argv, ignoring --infile.\n");
+    }
+  } else {
+    std::stringstream input_stream;
+    if (FLAGS_infile.empty()) {
+      if (isatty(STDIN_FILENO)) {
+        fprintf(stderr, "reading request message from stdin...\n");
+      }
+      input_stream << std::cin.rdbuf();
+    } else {
+      std::ifstream input_file(FLAGS_infile, std::ios::in | std::ios::binary);
+      input_stream << input_file.rdbuf();
+      input_file.close();
+    }
+    message_text = input_stream.str();
+  }
+
+  if (!FLAGS_binary_input || !FLAGS_binary_output) {
+    std::shared_ptr<grpc::Channel> channel =
+        grpc::CreateChannel(server_address, cred.GetCredentials());
+    parser.reset(
+        new grpc::testing::ProtoFileParser(FLAGS_remotedb ? channel : nullptr,
+                                           FLAGS_proto_path, FLAGS_protofiles));
+    if (parser->HasError()) {
+      return false;
+    }
+  }
+
+  if (FLAGS_binary_input) {
+    serialized_request_proto = message_text;
+  } else {
+    serialized_request_proto =
+        parser->GetSerializedProtoFromMessageType(type_name, message_text);
+    if (parser->HasError()) {
+      return false;
+    }
+  }
+
+  if (FLAGS_binary_output) {
+    output_ss << serialized_request_proto;
+  } else {
+    grpc::string output_text = parser->GetTextFormatFromMessageType(
+        type_name, serialized_request_proto);
+    if (parser->HasError()) {
+      return false;
+    }
+    output_ss << output_text << std::endl;
+  }
+
+  return callback(output_ss.str());
+}
+
+bool GrpcTool::ToText(int argc, const char** argv, const CliCredentials& cred,
+                      GrpcToolOutputCallback callback) {
+  CommandUsage(
+      "Convert binary message to text\n"
+      "  grpc_cli totext <protofiles> <type>\n"
+      "    <protofiles>             ; Comma separated list of proto files\n"
+      "    <type>                   ; Protocol buffer type name\n"
+      "    --proto_path             ; The search path of proto files\n"
+      "    --infile                 ; Input filename (defaults to stdin)\n"
+      "    --outfile                ; Output filename (defaults to stdout)\n");
+
+  FLAGS_protofiles = argv[0];
+  FLAGS_remotedb = false;
+  FLAGS_binary_input = true;
+  FLAGS_binary_output = false;
+  return ParseMessage(argc, argv, cred, callback);
+}
+
+bool GrpcTool::ToBinary(int argc, const char** argv, const CliCredentials& cred,
+                        GrpcToolOutputCallback callback) {
+  CommandUsage(
+      "Convert text message to binary\n"
+      "  grpc_cli tobinary <protofiles> <type> [<message>]\n"
+      "    <protofiles>             ; Comma separated list of proto files\n"
+      "    <type>                   ; Protocol buffer type name\n"
+      "    --proto_path             ; The search path of proto files\n"
+      "    --infile                 ; Input filename (defaults to stdin)\n"
+      "    --outfile                ; Output filename (defaults to stdout)\n");
+
+  FLAGS_protofiles = argv[0];
+  FLAGS_remotedb = false;
+  FLAGS_binary_input = false;
+  FLAGS_binary_output = true;
+  return ParseMessage(argc, argv, cred, callback);
+}
+
 }  // namespace testing
 }  // namespace testing
 }  // namespace grpc
 }  // namespace grpc

+ 50 - 0
test/cpp/util/grpc_tool_test.cc

@@ -86,9 +86,18 @@ using grpc::testing::EchoResponse;
   "  rpc Echo(grpc.testing.EchoRequest) returns (grpc.testing.EchoResponse) " \
   "  rpc Echo(grpc.testing.EchoRequest) returns (grpc.testing.EchoResponse) " \
   "{}\n"
   "{}\n"
 
 
+#define ECHO_RESPONSE_MESSAGE \
+  "message: \"echo\"\n"       \
+  "param {\n"                 \
+  "  host: \"localhost\"\n"   \
+  "  peer: \"peer\"\n"        \
+  "}\n\n"
+
 namespace grpc {
 namespace grpc {
 namespace testing {
 namespace testing {
 
 
+DECLARE_bool(binary_input);
+DECLARE_bool(binary_output);
 DECLARE_bool(l);
 DECLARE_bool(l);
 
 
 namespace {
 namespace {
@@ -338,6 +347,47 @@ TEST_F(GrpcToolTest, CallCommand) {
   ShutdownServer();
   ShutdownServer();
 }
 }
 
 
+TEST_F(GrpcToolTest, ParseCommand) {
+  // Test input "grpc_cli parse localhost:<port> grpc.testing.EchoResponse
+  // ECHO_RESPONSE_MESSAGE"
+  std::stringstream output_stream;
+  std::stringstream binary_output_stream;
+
+  const grpc::string server_address = SetUpServer();
+  const char* argv[] = {"grpc_cli", "parse", server_address.c_str(),
+                        "grpc.testing.EchoResponse", ECHO_RESPONSE_MESSAGE};
+
+  FLAGS_binary_input = false;
+  FLAGS_binary_output = false;
+  EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+                                   std::bind(PrintStream, &output_stream,
+                                             std::placeholders::_1)));
+  // Expected output: ECHO_RESPONSE_MESSAGE
+  EXPECT_TRUE(0 == strcmp(output_stream.str().c_str(), ECHO_RESPONSE_MESSAGE));
+
+  // Parse text message to binary message and then parse it back to text message
+  output_stream.str(grpc::string());
+  output_stream.clear();
+  FLAGS_binary_output = true;
+  EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+                                   std::bind(PrintStream, &output_stream,
+                                             std::placeholders::_1)));
+  grpc::string binary_data = output_stream.str();
+  output_stream.str(grpc::string());
+  output_stream.clear();
+  argv[4] = binary_data.c_str();
+  FLAGS_binary_input = true;
+  FLAGS_binary_output = false;
+  EXPECT_TRUE(0 == GrpcToolMainLib(5, argv, TestCliCredentials(),
+                                   std::bind(PrintStream, &output_stream,
+                                             std::placeholders::_1)));
+
+  // Expected output: ECHO_RESPONSE_MESSAGE
+  EXPECT_TRUE(0 == strcmp(output_stream.str().c_str(), ECHO_RESPONSE_MESSAGE));
+
+  ShutdownServer();
+}
+
 TEST_F(GrpcToolTest, TooFewArguments) {
 TEST_F(GrpcToolTest, TooFewArguments) {
   // Test input "grpc_cli call Echo"
   // Test input "grpc_cli call Echo"
   std::stringstream output_stream;
   std::stringstream output_stream;

+ 176 - 0
test/http2_test/http2_base_server.py

@@ -0,0 +1,176 @@
+import logging
+import messages_pb2
+import struct
+
+import h2
+import h2.connection
+import twisted
+import twisted.internet
+import twisted.internet.protocol
+
+_READ_CHUNK_SIZE = 16384
+_GRPC_HEADER_SIZE = 5
+
+class H2ProtocolBaseServer(twisted.internet.protocol.Protocol):
+  def __init__(self):
+    self._conn = h2.connection.H2Connection(client_side=False)
+    self._recv_buffer = {}
+    self._handlers = {}
+    self._handlers['ConnectionMade'] = self.on_connection_made_default
+    self._handlers['DataReceived'] = self.on_data_received_default
+    self._handlers['WindowUpdated'] = self.on_window_update_default
+    self._handlers['RequestReceived'] = self.on_request_received_default
+    self._handlers['SendDone'] = self.on_send_done_default
+    self._handlers['ConnectionLost'] = self.on_connection_lost
+    self._handlers['PingAcknowledged'] = self.on_ping_acknowledged_default
+    self._stream_status = {}
+    self._send_remaining = {}
+    self._outstanding_pings = 0
+
+  def set_handlers(self, handlers):
+    self._handlers = handlers
+
+  def connectionMade(self):
+    self._handlers['ConnectionMade']()
+
+  def connectionLost(self, reason):
+    self._handlers['ConnectionLost'](reason)
+
+  def on_connection_made_default(self):
+    logging.info('Connection Made')
+    self._conn.initiate_connection()
+    self.transport.setTcpNoDelay(True)
+    self.transport.write(self._conn.data_to_send())
+
+  def on_connection_lost(self, reason):
+    logging.info('Disconnected %s' % reason)
+    twisted.internet.reactor.callFromThread(twisted.internet.reactor.stop)
+
+  def dataReceived(self, data):
+    try:
+      events = self._conn.receive_data(data)
+    except h2.exceptions.ProtocolError:
+      # this try/except block catches exceptions due to race between sending
+      # GOAWAY and processing a response in flight.
+      return
+    if self._conn.data_to_send:
+      self.transport.write(self._conn.data_to_send())
+    for event in events:
+      if isinstance(event, h2.events.RequestReceived) and self._handlers.has_key('RequestReceived'):
+        logging.info('RequestReceived Event for stream: %d' % event.stream_id)
+        self._handlers['RequestReceived'](event)
+      elif isinstance(event, h2.events.DataReceived) and self._handlers.has_key('DataReceived'):
+        logging.info('DataReceived Event for stream: %d' % event.stream_id)
+        self._handlers['DataReceived'](event)
+      elif isinstance(event, h2.events.WindowUpdated) and self._handlers.has_key('WindowUpdated'):
+        logging.info('WindowUpdated Event for stream: %d' % event.stream_id)
+        self._handlers['WindowUpdated'](event)
+      elif isinstance(event, h2.events.PingAcknowledged) and self._handlers.has_key('PingAcknowledged'):
+        logging.info('PingAcknowledged Event')
+        self._handlers['PingAcknowledged'](event)
+    self.transport.write(self._conn.data_to_send())
+
+  def on_ping_acknowledged_default(self, event):
+    logging.info('ping acknowledged')
+    self._outstanding_pings -= 1
+
+  def on_data_received_default(self, event):
+    self._conn.acknowledge_received_data(len(event.data), event.stream_id)
+    self._recv_buffer[event.stream_id] += event.data
+
+  def on_request_received_default(self, event):
+    self._recv_buffer[event.stream_id] = ''
+    self._stream_id = event.stream_id
+    self._stream_status[event.stream_id] = True
+    self._conn.send_headers(
+      stream_id=event.stream_id,
+      headers=[
+          (':status', '200'),
+          ('content-type', 'application/grpc'),
+          ('grpc-encoding', 'identity'),
+          ('grpc-accept-encoding', 'identity,deflate,gzip'),
+      ],
+    )
+    self.transport.write(self._conn.data_to_send())
+
+  def on_window_update_default(self, event):
+    # send pending data, if any
+    self.default_send(event.stream_id)
+
+  def send_reset_stream(self):
+    self._conn.reset_stream(self._stream_id)
+    self.transport.write(self._conn.data_to_send())
+
+  def setup_send(self, data_to_send, stream_id):
+    logging.info('Setting up data to send for stream_id: %d' % stream_id)
+    self._send_remaining[stream_id] = len(data_to_send)
+    self._send_offset = 0
+    self._data_to_send = data_to_send
+    self.default_send(stream_id)
+
+  def default_send(self, stream_id):
+    if not self._send_remaining.has_key(stream_id):
+      # not setup to send data yet
+      return
+
+    while self._send_remaining[stream_id] > 0:
+      lfcw = self._conn.local_flow_control_window(stream_id)
+      if lfcw == 0:
+        break
+      chunk_size = min(lfcw, _READ_CHUNK_SIZE)
+      bytes_to_send = min(chunk_size, self._send_remaining[stream_id])
+      logging.info('flow_control_window = %d. sending [%d:%d] stream_id %d' %
+                    (lfcw, self._send_offset, self._send_offset + bytes_to_send,
+                    stream_id))
+      data = self._data_to_send[self._send_offset : self._send_offset + bytes_to_send]
+      try:
+        self._conn.send_data(stream_id, data, False)
+      except h2.exceptions.ProtocolError:
+        logging.info('Stream %d is closed' % stream_id)
+        break
+      self._send_remaining[stream_id] -= bytes_to_send
+      self._send_offset += bytes_to_send
+      if self._send_remaining[stream_id] == 0:
+        self._handlers['SendDone'](stream_id)
+
+  def default_ping(self):
+    logging.info('sending ping')
+    self._outstanding_pings += 1
+    self._conn.ping(b'\x00'*8)
+    self.transport.write(self._conn.data_to_send())
+
+  def on_send_done_default(self, stream_id):
+    if self._stream_status[stream_id]:
+      self._stream_status[stream_id] = False
+      self.default_send_trailer(stream_id)
+    else:
+      logging.error('Stream %d is already closed' % stream_id)
+
+  def default_send_trailer(self, stream_id):
+    logging.info('Sending trailer for stream id %d' % stream_id)
+    self._conn.send_headers(stream_id,
+      headers=[ ('grpc-status', '0') ],
+      end_stream=True
+    )
+    self.transport.write(self._conn.data_to_send())
+
+  @staticmethod
+  def default_response_data(response_size):
+    sresp = messages_pb2.SimpleResponse()
+    sresp.payload.body = b'\x00'*response_size
+    serialized_resp_proto = sresp.SerializeToString()
+    response_data = b'\x00' + struct.pack('i', len(serialized_resp_proto))[::-1] + serialized_resp_proto
+    return response_data
+
+  def parse_received_data(self, stream_id):
+    """ returns a grpc framed string of bytes containing response proto of the size
+    asked in request """
+    recv_buffer = self._recv_buffer[stream_id]
+    grpc_msg_size = struct.unpack('i',recv_buffer[1:5][::-1])[0]
+    if len(recv_buffer) != _GRPC_HEADER_SIZE + grpc_msg_size:
+      return None
+    req_proto_str = recv_buffer[5:5+grpc_msg_size]
+    sr = messages_pb2.SimpleRequest()
+    sr.ParseFromString(req_proto_str)
+    logging.info('Parsed request for stream %d: response_size=%s' % (stream_id, sr.response_size))
+    return sr

+ 59 - 0
test/http2_test/http2_test_server.py

@@ -0,0 +1,59 @@
+"""
+  HTTP2 Test Server. Highly experimental work in progress.
+"""
+import argparse
+import logging
+import twisted
+import twisted.internet
+import twisted.internet.endpoints
+import twisted.internet.reactor
+
+import http2_base_server
+import test_goaway
+import test_max_streams
+import test_ping
+import test_rst_after_data
+import test_rst_after_header
+import test_rst_during_data
+
+_TEST_CASE_MAPPING = {
+  'rst_after_header': test_rst_after_header.TestcaseRstStreamAfterHeader,
+  'rst_after_data': test_rst_after_data.TestcaseRstStreamAfterData,
+  'rst_during_data': test_rst_during_data.TestcaseRstStreamDuringData,
+  'goaway': test_goaway.TestcaseGoaway,
+  'ping': test_ping.TestcasePing,
+  'max_streams': test_max_streams.TestcaseSettingsMaxStreams,
+}
+
+class H2Factory(twisted.internet.protocol.Factory):
+  def __init__(self, testcase):
+    logging.info('Creating H2Factory for new connection.')
+    self._num_streams = 0
+    self._testcase = testcase
+
+  def buildProtocol(self, addr):
+    self._num_streams += 1
+    logging.info('New Connection: %d' % self._num_streams)
+    if not _TEST_CASE_MAPPING.has_key(self._testcase):
+      logging.error('Unknown test case: %s' % self._testcase)
+      assert(0)
+    else:
+      t = _TEST_CASE_MAPPING[self._testcase]
+
+    if self._testcase == 'goaway':
+      return t(self._num_streams).get_base_server()
+    else:
+      return t().get_base_server()
+
+if __name__ == "__main__":
+  logging.basicConfig(format = "%(levelname) -10s %(asctime)s %(module)s:%(lineno)s | %(message)s", level=logging.INFO)
+  parser = argparse.ArgumentParser()
+  parser.add_argument("test")
+  parser.add_argument("port")
+  args = parser.parse_args()
+  if args.test not in _TEST_CASE_MAPPING.keys():
+    logging.error('unknown test: %s' % args.test)
+  else:
+    endpoint = twisted.internet.endpoints.TCP4ServerEndpoint(twisted.internet.reactor, int(args.port), backlog=128)
+    endpoint.listen(H2Factory(args.test))
+    twisted.internet.reactor.run()

Файлын зөрүү хэтэрхий том тул дарагдсан байна
+ 22 - 0
test/http2_test/messages_pb2.py


+ 48 - 0
test/http2_test/test_goaway.py

@@ -0,0 +1,48 @@
+import logging
+import time
+
+import http2_base_server
+
+class TestcaseGoaway(object):
+  """ 
+    This test does the following:
+      Process incoming request normally, i.e. send headers, data and trailers.
+      Then send a GOAWAY frame with the stream id of the processed request.
+      It checks that the next request is made on a different TCP connection.
+  """
+  def __init__(self, iteration):
+    self._base_server = http2_base_server.H2ProtocolBaseServer()
+    self._base_server._handlers['RequestReceived'] = self.on_request_received
+    self._base_server._handlers['DataReceived'] = self.on_data_received
+    self._base_server._handlers['SendDone'] = self.on_send_done
+    self._base_server._handlers['ConnectionLost'] = self.on_connection_lost
+    self._ready_to_send = False
+    self._iteration = iteration
+
+  def get_base_server(self):
+    return self._base_server
+
+  def on_connection_lost(self, reason):
+    logging.info('Disconnect received. Count %d' % self._iteration)
+    # _iteration == 2 => Two different connections have been used.
+    if self._iteration == 2:
+      self._base_server.on_connection_lost(reason)
+
+  def on_send_done(self, stream_id):
+    self._base_server.on_send_done_default(stream_id)
+    logging.info('Sending GOAWAY for stream %d:' % stream_id)
+    self._base_server._conn.close_connection(error_code=0, additional_data=None, last_stream_id=stream_id)
+    self._base_server._stream_status[stream_id] = False
+
+  def on_request_received(self, event):
+    self._ready_to_send = False
+    self._base_server.on_request_received_default(event)
+
+  def on_data_received(self, event):
+    self._base_server.on_data_received_default(event)
+    sr = self._base_server.parse_received_data(event.stream_id)
+    if sr:
+      logging.info('Creating response size = %s' % sr.response_size)
+      response_data = self._base_server.default_response_data(sr.response_size)
+      self._ready_to_send = True
+      self._base_server.setup_send(response_data, event.stream_id)

+ 34 - 0
test/http2_test/test_max_streams.py

@@ -0,0 +1,34 @@
+import hyperframe.frame
+import logging
+
+import http2_base_server
+
+class TestcaseSettingsMaxStreams(object):
+  """
+    This test sets MAX_CONCURRENT_STREAMS to 1 and asserts that at any point
+    only 1 stream is active.
+  """
+  def __init__(self):
+    self._base_server = http2_base_server.H2ProtocolBaseServer()
+    self._base_server._handlers['DataReceived'] = self.on_data_received
+    self._base_server._handlers['ConnectionMade'] = self.on_connection_made
+
+  def get_base_server(self):
+    return self._base_server
+
+  def on_connection_made(self):
+    logging.info('Connection Made')
+    self._base_server._conn.initiate_connection()
+    self._base_server._conn.update_settings(
+                  {hyperframe.frame.SettingsFrame.MAX_CONCURRENT_STREAMS: 1})
+    self._base_server.transport.setTcpNoDelay(True)
+    self._base_server.transport.write(self._base_server._conn.data_to_send())
+
+  def on_data_received(self, event):
+    self._base_server.on_data_received_default(event)
+    sr = self._base_server.parse_received_data(event.stream_id)
+    if sr:
+      logging.info('Creating response of size = %s' % sr.response_size)
+      response_data = self._base_server.default_response_data(sr.response_size)
+      self._base_server.setup_send(response_data, event.stream_id)
+    # TODO (makdharma): Add assertion to check number of live streams

+ 38 - 0
test/http2_test/test_ping.py

@@ -0,0 +1,38 @@
+import logging
+
+import http2_base_server
+
+class TestcasePing(object):
+  """
+    This test injects PING frames before and after header and data. Keeps count
+    of outstanding ping response and asserts when the count is non-zero at the
+    end of the test.
+  """
+  def __init__(self):
+    self._base_server = http2_base_server.H2ProtocolBaseServer()
+    self._base_server._handlers['RequestReceived'] = self.on_request_received
+    self._base_server._handlers['DataReceived'] = self.on_data_received
+    self._base_server._handlers['ConnectionLost'] = self.on_connection_lost
+
+  def get_base_server(self):
+    return self._base_server
+
+  def on_request_received(self, event):
+    self._base_server.default_ping()
+    self._base_server.on_request_received_default(event)
+    self._base_server.default_ping()
+
+  def on_data_received(self, event):
+    self._base_server.on_data_received_default(event)
+    sr = self._base_server.parse_received_data(event.stream_id)
+    if sr:
+      logging.info('Creating response size = %s' % sr.response_size)
+      response_data = self._base_server.default_response_data(sr.response_size)
+      self._base_server.default_ping()
+      self._base_server.setup_send(response_data, event.stream_id)
+      self._base_server.default_ping()
+
+  def on_connection_lost(self, reason):
+    logging.info('Disconnect received. Ping Count %d' % self._base_server._outstanding_pings)
+    assert(self._base_server._outstanding_pings == 0)
+    self._base_server.on_connection_lost(reason)

+ 28 - 0
test/http2_test/test_rst_after_data.py

@@ -0,0 +1,28 @@
+import http2_base_server
+
+class TestcaseRstStreamAfterData(object):
+  """
+    In response to an incoming request, this test sends headers, followed by
+    data, followed by a reset stream frame. Client asserts that the RPC failed.
+    Client needs to deliver the complete message to the application layer.
+  """
+  def __init__(self):
+    self._base_server = http2_base_server.H2ProtocolBaseServer()
+    self._base_server._handlers['DataReceived'] = self.on_data_received
+    self._base_server._handlers['SendDone'] = self.on_send_done
+
+  def get_base_server(self):
+    return self._base_server
+
+  def on_data_received(self, event):
+    self._base_server.on_data_received_default(event)
+    sr = self._base_server.parse_received_data(event.stream_id)
+    if sr:
+      response_data = self._base_server.default_response_data(sr.response_size)
+      self._ready_to_send = True
+      self._base_server.setup_send(response_data, event.stream_id)
+      # send reset stream
+
+  def on_send_done(self, stream_id):
+    self._base_server.send_reset_stream()
+    self._base_server._stream_status[stream_id] = False

+ 19 - 0
test/http2_test/test_rst_after_header.py

@@ -0,0 +1,19 @@
+import http2_base_server
+
+class TestcaseRstStreamAfterHeader(object):
+  """
+    In response to an incoming request, this test sends headers, followed by
+    a reset stream frame. Client asserts that the RPC failed.
+  """
+  def __init__(self):
+    self._base_server = http2_base_server.H2ProtocolBaseServer()
+    self._base_server._handlers['RequestReceived'] = self.on_request_received
+
+  def get_base_server(self):
+    return self._base_server
+
+  def on_request_received(self, event):
+    # send initial headers
+    self._base_server.on_request_received_default(event)
+    # send reset stream
+    self._base_server.send_reset_stream()

+ 29 - 0
test/http2_test/test_rst_during_data.py

@@ -0,0 +1,29 @@
+import http2_base_server
+
+class TestcaseRstStreamDuringData(object):
+  """
+    In response to an incoming request, this test sends headers, followed by
+    some data, followed by a reset stream frame. Client asserts that the RPC
+    failed and does not deliver the message to the application.
+  """
+  def __init__(self):
+    self._base_server = http2_base_server.H2ProtocolBaseServer()
+    self._base_server._handlers['DataReceived'] = self.on_data_received
+    self._base_server._handlers['SendDone'] = self.on_send_done
+
+  def get_base_server(self):
+    return self._base_server
+
+  def on_data_received(self, event):
+    self._base_server.on_data_received_default(event)
+    sr = self._base_server.parse_received_data(event.stream_id)
+    if sr:
+      response_data = self._base_server.default_response_data(sr.response_size)
+      self._ready_to_send = True
+      response_len = len(response_data)
+      truncated_response_data = response_data[0:response_len/2]
+      self._base_server.setup_send(truncated_response_data, event.stream_id)
+
+  def on_send_done(self, stream_id):
+    self._base_server.send_reset_stream()
+    self._base_server._stream_status[stream_id] = False

+ 1 - 1
tools/distrib/python/grpcio_tools/MANIFEST.in

@@ -2,6 +2,6 @@ include grpc_version.py
 include protoc_deps.py
 include protoc_deps.py
 include protoc_lib_deps.py
 include protoc_lib_deps.py
 include README.rst
 include README.rst
-graft grpc
+graft grpc_tools
 graft grpc_root
 graft grpc_root
 graft third_party
 graft third_party

+ 1 - 1
tools/distrib/python/grpcio_tools/grpc_tools/command.py

@@ -49,7 +49,7 @@ def build_package_protos(package_root):
 
 
   for proto_file in proto_files:
   for proto_file in proto_files:
     command = [
     command = [
-        'grpc.tools.protoc',
+        'grpc_tools.protoc',
         '--proto_path={}'.format(inclusion_root),
         '--proto_path={}'.format(inclusion_root),
         '--proto_path={}'.format(well_known_protos_include),
         '--proto_path={}'.format(well_known_protos_include),
         '--python_out={}'.format(inclusion_root),
         '--python_out={}'.format(inclusion_root),

+ 1 - 1
tools/distrib/python/grpcio_tools/grpc_tools/protoc.py

@@ -45,5 +45,5 @@ def main(command_arguments):
   return _protoc_compiler.run_main(command_arguments)
   return _protoc_compiler.run_main(command_arguments)
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
-  proto_include = pkg_resources.resource_filename('grpc.tools', '_proto')
+  proto_include = pkg_resources.resource_filename('grpc_tools', '_proto')
   sys.exit(main(sys.argv + ['-I{}'.format(proto_include)]))
   sys.exit(main(sys.argv + ['-I{}'.format(proto_include)]))

+ 6 - 0
tools/dockerfile/grpc_artifact_protoc/Dockerfile

@@ -59,5 +59,11 @@ RUN yum install -y devtoolset-1.1 \
                    devtoolset-1.1-libstdc++-devel \
                    devtoolset-1.1-libstdc++-devel \
                    devtoolset-1.1-libstdc++-devel.i686 || true
                    devtoolset-1.1-libstdc++-devel.i686 || true
 
 
+# Update Git to version >1.7 to allow cloning submodules with --reference arg.
+RUN yum remove -y git
+RUN yum install -y epel-release
+RUN yum install -y https://centos6.iuscommunity.org/ius-release.rpm
+RUN yum install -y git2u
+
 # Start in devtoolset environment that uses GCC 4.7
 # Start in devtoolset environment that uses GCC 4.7
 CMD ["scl", "enable", "devtoolset-1.1", "bash"]
 CMD ["scl", "enable", "devtoolset-1.1", "bash"]

Энэ ялгаанд хэт олон файл өөрчлөгдсөн тул зарим файлыг харуулаагүй болно