|  | @@ -1,6 +1,6 @@
 | 
	
		
			
				|  |  |  /*
 | 
	
		
			
				|  |  |   *
 | 
	
		
			
				|  |  | - * Copyright 2015, Google Inc.
 | 
	
		
			
				|  |  | + * Copyright 2015-2016, Google Inc.
 | 
	
		
			
				|  |  |   * All rights reserved.
 | 
	
		
			
				|  |  |   *
 | 
	
		
			
				|  |  |   * Redistribution and use in source and binary forms, with or without
 | 
	
	
		
			
				|  | @@ -57,7 +57,7 @@ namespace testing {
 | 
	
		
			
				|  |  |  namespace {
 | 
	
		
			
				|  |  |  // The same value is defined by the Java client.
 | 
	
		
			
				|  |  |  const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
 | 
	
		
			
				|  |  | -const std::vector<int> response_stream_sizes = {31415, 59, 2653, 58979};
 | 
	
		
			
				|  |  | +const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
 | 
	
		
			
				|  |  |  const int kNumResponseMessages = 2000;
 | 
	
		
			
				|  |  |  const int kResponseMessageSize = 1030;
 | 
	
		
			
				|  |  |  const int kReceiveDelayMilliSeconds = 20;
 | 
	
	
		
			
				|  | @@ -67,28 +67,23 @@ const int kLargeResponseSize = 314159;
 | 
	
		
			
				|  |  |  void NoopChecks(const InteropClientContextInspector& inspector,
 | 
	
		
			
				|  |  |                  const SimpleRequest* request, const SimpleResponse* response) {}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void CompressionChecks(const InteropClientContextInspector& inspector,
 | 
	
		
			
				|  |  | -                       const SimpleRequest* request,
 | 
	
		
			
				|  |  | -                       const SimpleResponse* response) {
 | 
	
		
			
				|  |  | +void UnaryCompressionChecks(const InteropClientContextInspector& inspector,
 | 
	
		
			
				|  |  | +                            const SimpleRequest* request,
 | 
	
		
			
				|  |  | +                            const SimpleResponse* response) {
 | 
	
		
			
				|  |  |    const grpc_compression_algorithm received_compression =
 | 
	
		
			
				|  |  |        inspector.GetCallCompressionAlgorithm();
 | 
	
		
			
				|  |  | -  if (request->request_compressed_response() &&
 | 
	
		
			
				|  |  | -      received_compression == GRPC_COMPRESS_NONE) {
 | 
	
		
			
				|  |  | -    if (request->request_compressed_response() &&
 | 
	
		
			
				|  |  | -        received_compression == GRPC_COMPRESS_NONE) {
 | 
	
		
			
				|  |  | +  if (request->response_compressed().value()) {
 | 
	
		
			
				|  |  | +    if (received_compression == GRPC_COMPRESS_NONE) {
 | 
	
		
			
				|  |  |        // Requested some compression, got NONE. This is an error.
 | 
	
		
			
				|  |  |        gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  |                "Failure: Requested compression but got uncompressed response "
 | 
	
		
			
				|  |  |                "from server.");
 | 
	
		
			
				|  |  |        abort();
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (!request->request_compressed_response()) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
 | 
	
		
			
				|  |  | -  } else if (request->response_type() == PayloadType::COMPRESSABLE) {
 | 
	
		
			
				|  |  | -    // requested compression and compressable response => results should always
 | 
	
		
			
				|  |  | -    // be compressed.
 | 
	
		
			
				|  |  |      GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS);
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    // Didn't request compression -> make sure the response is uncompressed
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  }  // namespace
 | 
	
	
		
			
				|  | @@ -190,11 +185,16 @@ bool InteropClient::PerformLargeUnary(SimpleRequest* request,
 | 
	
		
			
				|  |  |                                        CheckerFn custom_checks_fn) {
 | 
	
		
			
				|  |  |    ClientContext context;
 | 
	
		
			
				|  |  |    InteropClientContextInspector inspector(context);
 | 
	
		
			
				|  |  | -  // If the request doesn't already specify the response type, default to
 | 
	
		
			
				|  |  | -  // COMPRESSABLE.
 | 
	
		
			
				|  |  |    request->set_response_size(kLargeResponseSize);
 | 
	
		
			
				|  |  |    grpc::string payload(kLargeRequestSize, '\0');
 | 
	
		
			
				|  |  |    request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
 | 
	
		
			
				|  |  | +  if (request->has_expect_compressed()) {
 | 
	
		
			
				|  |  | +    if (request->expect_compressed().value()) {
 | 
	
		
			
				|  |  | +      context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      context.set_compression_algorithm(GRPC_COMPRESS_NONE);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    Status s = serviceStub_.Get()->UnaryCall(&context, *request, response);
 | 
	
		
			
				|  |  |    if (!AssertStatusOk(s)) {
 | 
	
	
		
			
				|  | @@ -204,27 +204,8 @@ bool InteropClient::PerformLargeUnary(SimpleRequest* request,
 | 
	
		
			
				|  |  |    custom_checks_fn(inspector, request, response);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    // Payload related checks.
 | 
	
		
			
				|  |  | -  GPR_ASSERT(response->payload().type() == request->response_type());
 | 
	
		
			
				|  |  | -  switch (response->payload().type()) {
 | 
	
		
			
				|  |  | -    case PayloadType::COMPRESSABLE:
 | 
	
		
			
				|  |  | -      GPR_ASSERT(response->payload().body() ==
 | 
	
		
			
				|  |  | -                 grpc::string(kLargeResponseSize, '\0'));
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -    case PayloadType::UNCOMPRESSABLE: {
 | 
	
		
			
				|  |  | -      // We don't really check anything: We can't assert that the payload is
 | 
	
		
			
				|  |  | -      // uncompressed because it's the server's prerogative to decide on that,
 | 
	
		
			
				|  |  | -      // and different implementations decide differently (ie, Java always
 | 
	
		
			
				|  |  | -      // compresses when requested to do so, whereas C core throws away the
 | 
	
		
			
				|  |  | -      // compressed payload if the output is larger than the input).
 | 
	
		
			
				|  |  | -      // In addition, we don't compare the actual random bytes received because
 | 
	
		
			
				|  |  | -      // asserting that data is sent/received properly isn't the purpose of this
 | 
	
		
			
				|  |  | -      // test. Moreover, different implementations are also free to use
 | 
	
		
			
				|  |  | -      // different sets of random bytes.
 | 
	
		
			
				|  |  | -    } break;
 | 
	
		
			
				|  |  | -    default:
 | 
	
		
			
				|  |  | -      GPR_ASSERT(false);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | +  GPR_ASSERT(response->payload().body() ==
 | 
	
		
			
				|  |  | +             grpc::string(kLargeResponseSize, '\0'));
 | 
	
		
			
				|  |  |    return true;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -237,7 +218,6 @@ bool InteropClient::DoComputeEngineCreds(
 | 
	
		
			
				|  |  |    SimpleResponse response;
 | 
	
		
			
				|  |  |    request.set_fill_username(true);
 | 
	
		
			
				|  |  |    request.set_fill_oauth_scope(true);
 | 
	
		
			
				|  |  | -  request.set_response_type(PayloadType::COMPRESSABLE);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (!PerformLargeUnary(&request, &response)) {
 | 
	
		
			
				|  |  |      return false;
 | 
	
	
		
			
				|  | @@ -311,7 +291,6 @@ bool InteropClient::DoJwtTokenCreds(const grpc::string& username) {
 | 
	
		
			
				|  |  |    SimpleRequest request;
 | 
	
		
			
				|  |  |    SimpleResponse response;
 | 
	
		
			
				|  |  |    request.set_fill_username(true);
 | 
	
		
			
				|  |  | -  request.set_response_type(PayloadType::COMPRESSABLE);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (!PerformLargeUnary(&request, &response)) {
 | 
	
		
			
				|  |  |      return false;
 | 
	
	
		
			
				|  | @@ -327,7 +306,6 @@ bool InteropClient::DoLargeUnary() {
 | 
	
		
			
				|  |  |    gpr_log(GPR_DEBUG, "Sending a large unary rpc...");
 | 
	
		
			
				|  |  |    SimpleRequest request;
 | 
	
		
			
				|  |  |    SimpleResponse response;
 | 
	
		
			
				|  |  | -  request.set_response_type(PayloadType::COMPRESSABLE);
 | 
	
		
			
				|  |  |    if (!PerformLargeUnary(&request, &response)) {
 | 
	
		
			
				|  |  |      return false;
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -335,32 +313,73 @@ bool InteropClient::DoLargeUnary() {
 | 
	
		
			
				|  |  |    return true;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -bool InteropClient::DoLargeCompressedUnary() {
 | 
	
		
			
				|  |  | -  const bool request_compression[] = {false, true};
 | 
	
		
			
				|  |  | -  const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE};
 | 
	
		
			
				|  |  | -  for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
 | 
	
		
			
				|  |  | -    for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) {
 | 
	
		
			
				|  |  | -      char* log_suffix;
 | 
	
		
			
				|  |  | -      gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
 | 
	
		
			
				|  |  | -                   request_compression[j] ? "true" : "false",
 | 
	
		
			
				|  |  | -                   PayloadType_Name(payload_types[i]).c_str());
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      gpr_log(GPR_DEBUG, "Sending a large compressed unary rpc %s.",
 | 
	
		
			
				|  |  | -              log_suffix);
 | 
	
		
			
				|  |  | -      SimpleRequest request;
 | 
	
		
			
				|  |  | -      SimpleResponse response;
 | 
	
		
			
				|  |  | -      request.set_response_type(payload_types[i]);
 | 
	
		
			
				|  |  | -      request.set_request_compressed_response(request_compression[j]);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      if (!PerformLargeUnary(&request, &response, CompressionChecks)) {
 | 
	
		
			
				|  |  | -        gpr_log(GPR_ERROR, "Large compressed unary failed %s", log_suffix);
 | 
	
		
			
				|  |  | -        gpr_free(log_suffix);
 | 
	
		
			
				|  |  | -        return false;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      gpr_log(GPR_DEBUG, "Large compressed unary done %s.", log_suffix);
 | 
	
		
			
				|  |  | +bool InteropClient::DoClientCompressedUnary() {
 | 
	
		
			
				|  |  | +  // Probing for compression-checks support.
 | 
	
		
			
				|  |  | +  ClientContext probe_context;
 | 
	
		
			
				|  |  | +  SimpleRequest probe_req;
 | 
	
		
			
				|  |  | +  SimpleResponse probe_res;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
 | 
	
		
			
				|  |  | +  probe_req.mutable_expect_compressed()->set_value(true);  // lies!
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  probe_req.set_response_size(kLargeResponseSize);
 | 
	
		
			
				|  |  | +  probe_req.mutable_payload()->set_body(grpc::string(kLargeRequestSize, '\0'));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  gpr_log(GPR_DEBUG, "Sending probe for compressed unary request.");
 | 
	
		
			
				|  |  | +  const Status s =
 | 
	
		
			
				|  |  | +      serviceStub_.Get()->UnaryCall(&probe_context, probe_req, &probe_res);
 | 
	
		
			
				|  |  | +  if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
 | 
	
		
			
				|  |  | +    // The server isn't able to evaluate incoming compression, making the rest
 | 
	
		
			
				|  |  | +    // of this test moot.
 | 
	
		
			
				|  |  | +    gpr_log(GPR_DEBUG, "Compressed unary request probe failed");
 | 
	
		
			
				|  |  | +    return false;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  gpr_log(GPR_DEBUG, "Compressed unary request probe succeeded. Proceeding.");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  const std::vector<bool> compressions = {true, false};
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < compressions.size(); i++) {
 | 
	
		
			
				|  |  | +    char* log_suffix;
 | 
	
		
			
				|  |  | +    gpr_asprintf(&log_suffix, "(compression=%s)",
 | 
	
		
			
				|  |  | +                 compressions[i] ? "true" : "false");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    gpr_log(GPR_DEBUG, "Sending compressed unary request %s.", log_suffix);
 | 
	
		
			
				|  |  | +    SimpleRequest request;
 | 
	
		
			
				|  |  | +    SimpleResponse response;
 | 
	
		
			
				|  |  | +    request.mutable_expect_compressed()->set_value(compressions[i]);
 | 
	
		
			
				|  |  | +    if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_ERROR, "Compressed unary request failed %s", log_suffix);
 | 
	
		
			
				|  |  | +      gpr_free(log_suffix);
 | 
	
		
			
				|  |  | +      return false;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    gpr_log(GPR_DEBUG, "Compressed unary request failed %s", log_suffix);
 | 
	
		
			
				|  |  | +    gpr_free(log_suffix);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  return true;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +bool InteropClient::DoServerCompressedUnary() {
 | 
	
		
			
				|  |  | +  const std::vector<bool> compressions = {true, false};
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < compressions.size(); i++) {
 | 
	
		
			
				|  |  | +    char* log_suffix;
 | 
	
		
			
				|  |  | +    gpr_asprintf(&log_suffix, "(compression=%s)",
 | 
	
		
			
				|  |  | +                 compressions[i] ? "true" : "false");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    gpr_log(GPR_DEBUG, "Sending unary request for compressed response %s.",
 | 
	
		
			
				|  |  | +            log_suffix);
 | 
	
		
			
				|  |  | +    SimpleRequest request;
 | 
	
		
			
				|  |  | +    SimpleResponse response;
 | 
	
		
			
				|  |  | +    request.mutable_response_compressed()->set_value(compressions[i]);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_ERROR, "Request for compressed unary failed %s", log_suffix);
 | 
	
		
			
				|  |  |        gpr_free(log_suffix);
 | 
	
		
			
				|  |  | +      return false;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    gpr_log(GPR_DEBUG, "Request for compressed unary failed %s", log_suffix);
 | 
	
		
			
				|  |  | +    gpr_free(log_suffix);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    return true;
 | 
	
	
		
			
				|  | @@ -387,7 +406,7 @@ bool InteropClient::DoRequestStreaming() {
 | 
	
		
			
				|  |  |        serviceStub_.Get()->StreamingInputCall(&context, &response));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    int aggregated_payload_size = 0;
 | 
	
		
			
				|  |  | -  for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < request_stream_sizes.size(); ++i) {
 | 
	
		
			
				|  |  |      Payload* payload = request.mutable_payload();
 | 
	
		
			
				|  |  |      payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
 | 
	
		
			
				|  |  |      if (!stream->Write(request)) {
 | 
	
	
		
			
				|  | @@ -396,7 +415,7 @@ bool InteropClient::DoRequestStreaming() {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      aggregated_payload_size += request_stream_sizes[i];
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  stream->WritesDone();
 | 
	
		
			
				|  |  | +  GPR_ASSERT(stream->WritesDone());
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    Status s = stream->Finish();
 | 
	
		
			
				|  |  |    if (!AssertStatusOk(s)) {
 | 
	
	
		
			
				|  | @@ -446,92 +465,129 @@ bool InteropClient::DoResponseStreaming() {
 | 
	
		
			
				|  |  |    return true;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -bool InteropClient::DoResponseCompressedStreaming() {
 | 
	
		
			
				|  |  | -  const bool request_compression[] = {false, true};
 | 
	
		
			
				|  |  | -  const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE};
 | 
	
		
			
				|  |  | -  for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
 | 
	
		
			
				|  |  | -    for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) {
 | 
	
		
			
				|  |  | -      ClientContext context;
 | 
	
		
			
				|  |  | -      InteropClientContextInspector inspector(context);
 | 
	
		
			
				|  |  | -      StreamingOutputCallRequest request;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      char* log_suffix;
 | 
	
		
			
				|  |  | -      gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
 | 
	
		
			
				|  |  | -                   request_compression[j] ? "true" : "false",
 | 
	
		
			
				|  |  | -                   PayloadType_Name(payload_types[i]).c_str());
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      gpr_log(GPR_DEBUG, "Receiving response streaming rpc %s.", log_suffix);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      request.set_response_type(payload_types[i]);
 | 
	
		
			
				|  |  | -      request.set_request_compressed_response(request_compression[j]);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      for (size_t k = 0; k < response_stream_sizes.size(); ++k) {
 | 
	
		
			
				|  |  | -        ResponseParameters* response_parameter =
 | 
	
		
			
				|  |  | -            request.add_response_parameters();
 | 
	
		
			
				|  |  | -        response_parameter->set_size(response_stream_sizes[k]);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      StreamingOutputCallResponse response;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
 | 
	
		
			
				|  |  | -          serviceStub_.Get()->StreamingOutputCall(&context, request));
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      size_t k = 0;
 | 
	
		
			
				|  |  | -      while (stream->Read(&response)) {
 | 
	
		
			
				|  |  | -        // Payload related checks.
 | 
	
		
			
				|  |  | -        GPR_ASSERT(response.payload().type() == request.response_type());
 | 
	
		
			
				|  |  | -        switch (response.payload().type()) {
 | 
	
		
			
				|  |  | -          case PayloadType::COMPRESSABLE:
 | 
	
		
			
				|  |  | -            GPR_ASSERT(response.payload().body() ==
 | 
	
		
			
				|  |  | -                       grpc::string(response_stream_sizes[k], '\0'));
 | 
	
		
			
				|  |  | -            break;
 | 
	
		
			
				|  |  | -          case PayloadType::UNCOMPRESSABLE:
 | 
	
		
			
				|  |  | -            break;
 | 
	
		
			
				|  |  | -          default:
 | 
	
		
			
				|  |  | -            GPR_ASSERT(false);
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        // Compression related checks.
 | 
	
		
			
				|  |  | -        if (request.request_compressed_response()) {
 | 
	
		
			
				|  |  | -          GPR_ASSERT(inspector.GetCallCompressionAlgorithm() >
 | 
	
		
			
				|  |  | -                     GRPC_COMPRESS_NONE);
 | 
	
		
			
				|  |  | -          if (request.response_type() == PayloadType::COMPRESSABLE) {
 | 
	
		
			
				|  |  | -            // requested compression and compressable response => results should
 | 
	
		
			
				|  |  | -            // always be compressed.
 | 
	
		
			
				|  |  | -            GPR_ASSERT(inspector.GetMessageFlags() &
 | 
	
		
			
				|  |  | -                       GRPC_WRITE_INTERNAL_COMPRESS);
 | 
	
		
			
				|  |  | -          }
 | 
	
		
			
				|  |  | -        } else {
 | 
	
		
			
				|  |  | -          // requested *no* compression.
 | 
	
		
			
				|  |  | -          GPR_ASSERT(
 | 
	
		
			
				|  |  | -              !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        ++k;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      gpr_log(GPR_DEBUG, "Response streaming done %s.", log_suffix);
 | 
	
		
			
				|  |  | -      gpr_free(log_suffix);
 | 
	
		
			
				|  |  | +bool InteropClient::DoClientCompressedStreaming() {
 | 
	
		
			
				|  |  | +  // Probing for compression-checks support.
 | 
	
		
			
				|  |  | +  ClientContext probe_context;
 | 
	
		
			
				|  |  | +  StreamingInputCallRequest probe_req;
 | 
	
		
			
				|  |  | +  StreamingInputCallResponse probe_res;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
 | 
	
		
			
				|  |  | +  probe_req.mutable_expect_compressed()->set_value(true);  // lies!
 | 
	
		
			
				|  |  | +  probe_req.mutable_payload()->set_body(grpc::string(27182, '\0'));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  gpr_log(GPR_DEBUG, "Sending probe for compressed streaming request.");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  std::unique_ptr<ClientWriter<StreamingInputCallRequest>> probe_stream(
 | 
	
		
			
				|  |  | +      serviceStub_.Get()->StreamingInputCall(&probe_context, &probe_res));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (!probe_stream->Write(probe_req)) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
 | 
	
		
			
				|  |  | +    return TransientFailureOrAbort();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  Status s = probe_stream->Finish();
 | 
	
		
			
				|  |  | +  if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
 | 
	
		
			
				|  |  | +    // The server isn't able to evaluate incoming compression, making the rest
 | 
	
		
			
				|  |  | +    // of this test moot.
 | 
	
		
			
				|  |  | +    gpr_log(GPR_DEBUG, "Compressed streaming request probe failed");
 | 
	
		
			
				|  |  | +    return false;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  gpr_log(GPR_DEBUG,
 | 
	
		
			
				|  |  | +          "Compressed streaming request probe succeeded. Proceeding.");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  ClientContext context;
 | 
	
		
			
				|  |  | +  StreamingInputCallRequest request;
 | 
	
		
			
				|  |  | +  StreamingInputCallResponse response;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
 | 
	
		
			
				|  |  | +  std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
 | 
	
		
			
				|  |  | +      serviceStub_.Get()->StreamingInputCall(&context, &response));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  request.mutable_payload()->set_body(grpc::string(27182, '\0'));
 | 
	
		
			
				|  |  | +  request.mutable_expect_compressed()->set_value(true);
 | 
	
		
			
				|  |  | +  gpr_log(GPR_DEBUG, "Sending streaming request with compression enabled");
 | 
	
		
			
				|  |  | +  if (!stream->Write(request)) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
 | 
	
		
			
				|  |  | +    return TransientFailureOrAbort();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  WriteOptions wopts;
 | 
	
		
			
				|  |  | +  wopts.set_no_compression();
 | 
	
		
			
				|  |  | +  request.mutable_payload()->set_body(grpc::string(45904, '\0'));
 | 
	
		
			
				|  |  | +  request.mutable_expect_compressed()->set_value(false);
 | 
	
		
			
				|  |  | +  gpr_log(GPR_DEBUG, "Sending streaming request with compression disabled");
 | 
	
		
			
				|  |  | +  if (!stream->Write(request, wopts)) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
 | 
	
		
			
				|  |  | +    return TransientFailureOrAbort();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  GPR_ASSERT(stream->WritesDone());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  s = stream->Finish();
 | 
	
		
			
				|  |  | +  if (!AssertStatusOk(s)) {
 | 
	
		
			
				|  |  | +    return false;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  return true;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -      if (k < response_stream_sizes.size()) {
 | 
	
		
			
				|  |  | -        // stream->Read() failed before reading all the expected messages. This
 | 
	
		
			
				|  |  | -        // is most likely due to a connection failure.
 | 
	
		
			
				|  |  | -        gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | -                "DoResponseCompressedStreaming(): Responses read (k=%" PRIuPTR
 | 
	
		
			
				|  |  | -                ") is "
 | 
	
		
			
				|  |  | -                "less than the expected messages (i.e "
 | 
	
		
			
				|  |  | -                "response_stream_sizes.size() (%" PRIuPTR ")). (i=%" PRIuPTR
 | 
	
		
			
				|  |  | -                ", j=%" PRIuPTR ")",
 | 
	
		
			
				|  |  | -                k, response_stream_sizes.size(), i, j);
 | 
	
		
			
				|  |  | -        return TransientFailureOrAbort();
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      Status s = stream->Finish();
 | 
	
		
			
				|  |  | -      if (!AssertStatusOk(s)) {
 | 
	
		
			
				|  |  | -        return false;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | +bool InteropClient::DoServerCompressedStreaming() {
 | 
	
		
			
				|  |  | +  const std::vector<bool> compressions = {true, false};
 | 
	
		
			
				|  |  | +  const std::vector<int> sizes = {31415, 92653};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  ClientContext context;
 | 
	
		
			
				|  |  | +  InteropClientContextInspector inspector(context);
 | 
	
		
			
				|  |  | +  StreamingOutputCallRequest request;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  GPR_ASSERT(compressions.size() == sizes.size());
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < sizes.size(); i++) {
 | 
	
		
			
				|  |  | +    char* log_suffix;
 | 
	
		
			
				|  |  | +    gpr_asprintf(&log_suffix, "(compression=%s; size=%d)",
 | 
	
		
			
				|  |  | +                 compressions[i] ? "true" : "false", sizes[i]);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    gpr_log(GPR_DEBUG, "Sending request streaming rpc %s.", log_suffix);
 | 
	
		
			
				|  |  | +    gpr_free(log_suffix);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    ResponseParameters* const response_parameter =
 | 
	
		
			
				|  |  | +        request.add_response_parameters();
 | 
	
		
			
				|  |  | +    response_parameter->mutable_compressed()->set_value(compressions[i]);
 | 
	
		
			
				|  |  | +    response_parameter->set_size(sizes[i]);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
 | 
	
		
			
				|  |  | +      serviceStub_.Get()->StreamingOutputCall(&context, request));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  size_t k = 0;
 | 
	
		
			
				|  |  | +  StreamingOutputCallResponse response;
 | 
	
		
			
				|  |  | +  while (stream->Read(&response)) {
 | 
	
		
			
				|  |  | +    // Payload size checks.
 | 
	
		
			
				|  |  | +    GPR_ASSERT(response.payload().body() ==
 | 
	
		
			
				|  |  | +               grpc::string(request.response_parameters(k).size(), '\0'));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    // Compression checks.
 | 
	
		
			
				|  |  | +    GPR_ASSERT(request.response_parameters(k).has_compressed());
 | 
	
		
			
				|  |  | +    if (request.response_parameters(k).compressed().value()) {
 | 
	
		
			
				|  |  | +      GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE);
 | 
	
		
			
				|  |  | +      GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS);
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      // requested *no* compression.
 | 
	
		
			
				|  |  | +      GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +    ++k;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (k < sizes.size()) {
 | 
	
		
			
				|  |  | +    // stream->Read() failed before reading all the expected messages. This
 | 
	
		
			
				|  |  | +    // is most likely due to a connection failure.
 | 
	
		
			
				|  |  | +    gpr_log(GPR_ERROR, "%s(): Responses read (k=%" PRIuPTR
 | 
	
		
			
				|  |  | +                       ") is "
 | 
	
		
			
				|  |  | +                       "less than the expected messages (i.e "
 | 
	
		
			
				|  |  | +                       "response_stream_sizes.size() (%" PRIuPTR ")).",
 | 
	
		
			
				|  |  | +            __func__, k, response_stream_sizes.size());
 | 
	
		
			
				|  |  | +    return TransientFailureOrAbort();
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  Status s = stream->Finish();
 | 
	
		
			
				|  |  | +  if (!AssertStatusOk(s)) {
 | 
	
		
			
				|  |  | +    return false;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |    return true;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -632,7 +688,6 @@ bool InteropClient::DoPingPong() {
 | 
	
		
			
				|  |  |        stream(serviceStub_.Get()->FullDuplexCall(&context));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    StreamingOutputCallRequest request;
 | 
	
		
			
				|  |  | -  request.set_response_type(PayloadType::COMPRESSABLE);
 | 
	
		
			
				|  |  |    ResponseParameters* response_parameter = request.add_response_parameters();
 | 
	
		
			
				|  |  |    Payload* payload = request.mutable_payload();
 | 
	
		
			
				|  |  |    StreamingOutputCallResponse response;
 | 
	
	
		
			
				|  | @@ -699,7 +754,6 @@ bool InteropClient::DoCancelAfterFirstResponse() {
 | 
	
		
			
				|  |  |        stream(serviceStub_.Get()->FullDuplexCall(&context));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    StreamingOutputCallRequest request;
 | 
	
		
			
				|  |  | -  request.set_response_type(PayloadType::COMPRESSABLE);
 | 
	
		
			
				|  |  |    ResponseParameters* response_parameter = request.add_response_parameters();
 | 
	
		
			
				|  |  |    response_parameter->set_size(31415);
 | 
	
		
			
				|  |  |    request.mutable_payload()->set_body(grpc::string(27182, '\0'));
 | 
	
	
		
			
				|  | @@ -839,7 +893,6 @@ bool InteropClient::DoCustomMetadata() {
 | 
	
		
			
				|  |  |          stream(serviceStub_.Get()->FullDuplexCall(&context));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      StreamingOutputCallRequest request;
 | 
	
		
			
				|  |  | -    request.set_response_type(PayloadType::COMPRESSABLE);
 | 
	
		
			
				|  |  |      ResponseParameters* response_parameter = request.add_response_parameters();
 | 
	
		
			
				|  |  |      response_parameter->set_size(kLargeResponseSize);
 | 
	
		
			
				|  |  |      grpc::string payload(kLargeRequestSize, '\0');
 |