|  | @@ -32,6 +32,8 @@ from oauth2client.client import GoogleCredentials
 | 
	
		
			
				|  |  |  import python_utils.jobset as jobset
 | 
	
		
			
				|  |  |  import python_utils.report_utils as report_utils
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +from src.proto.grpc.health.v1 import health_pb2
 | 
	
		
			
				|  |  | +from src.proto.grpc.health.v1 import health_pb2_grpc
 | 
	
		
			
				|  |  |  from src.proto.grpc.testing import empty_pb2
 | 
	
		
			
				|  |  |  from src.proto.grpc.testing import messages_pb2
 | 
	
		
			
				|  |  |  from src.proto.grpc.testing import test_pb2_grpc
 | 
	
	
		
			
				|  | @@ -352,7 +354,6 @@ def test_backends_restart(gcp, backend_service, instance_group):
 | 
	
		
			
				|  |  |      start_time = time.time()
 | 
	
		
			
				|  |  |      wait_until_all_rpcs_go_to_given_backends(instance_names,
 | 
	
		
			
				|  |  |                                               _WAIT_FOR_STATS_SEC)
 | 
	
		
			
				|  |  | -    stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
 | 
	
		
			
				|  |  |      try:
 | 
	
		
			
				|  |  |          resize_instance_group(gcp, instance_group, 0)
 | 
	
		
			
				|  |  |          wait_until_all_rpcs_go_to_given_backends_or_fail([],
 | 
	
	
		
			
				|  | @@ -363,15 +364,6 @@ def test_backends_restart(gcp, backend_service, instance_group):
 | 
	
		
			
				|  |  |      new_instance_names = get_instance_names(gcp, instance_group)
 | 
	
		
			
				|  |  |      wait_until_all_rpcs_go_to_given_backends(new_instance_names,
 | 
	
		
			
				|  |  |                                               _WAIT_FOR_BACKEND_SEC)
 | 
	
		
			
				|  |  | -    new_stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
 | 
	
		
			
				|  |  | -    original_distribution = list(stats.rpcs_by_peer.values())
 | 
	
		
			
				|  |  | -    original_distribution.sort()
 | 
	
		
			
				|  |  | -    new_distribution = list(new_stats.rpcs_by_peer.values())
 | 
	
		
			
				|  |  | -    new_distribution.sort()
 | 
	
		
			
				|  |  | -    threshold = 3
 | 
	
		
			
				|  |  | -    for i in range(len(original_distribution)):
 | 
	
		
			
				|  |  | -        if abs(original_distribution[i] - new_distribution[i]) > threshold:
 | 
	
		
			
				|  |  | -            raise Exception('Distributions do not match: ', stats, new_stats)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  def test_change_backend_service(gcp, original_backend_service, instance_group,
 | 
	
	
		
			
				|  | @@ -517,7 +509,9 @@ def test_round_robin(gcp, backend_service, instance_group):
 | 
	
		
			
				|  |  |      # may result in briefly receiving an empty EDS update, resulting in failed
 | 
	
		
			
				|  |  |      # RPCs. Retry distribution validation if this occurs; long-term fix is
 | 
	
		
			
				|  |  |      # creating new backend resources for each individual test case.
 | 
	
		
			
				|  |  | -    max_attempts = 10
 | 
	
		
			
				|  |  | +    # Each attempt takes 10 seconds. Config propagation can take several
 | 
	
		
			
				|  |  | +    # minutes.
 | 
	
		
			
				|  |  | +    max_attempts = 40
 | 
	
		
			
				|  |  |      for i in range(max_attempts):
 | 
	
		
			
				|  |  |          stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
 | 
	
		
			
				|  |  |          requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]
 | 
	
	
		
			
				|  | @@ -728,14 +722,30 @@ def test_traffic_splitting(gcp, original_backend_service, instance_group,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  def set_serving_status(instances, service_port, serving):
 | 
	
		
			
				|  |  | +    logger.info('setting %s serving status to %s', instances, serving)
 | 
	
		
			
				|  |  |      for instance in instances:
 | 
	
		
			
				|  |  |          with grpc.insecure_channel('%s:%d' %
 | 
	
		
			
				|  |  |                                     (instance, service_port)) as channel:
 | 
	
		
			
				|  |  | +            logger.info('setting %s serving status to %s', instance, serving)
 | 
	
		
			
				|  |  |              stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel)
 | 
	
		
			
				|  |  | -            if serving:
 | 
	
		
			
				|  |  | -                stub.SetServing(empty_pb2.Empty())
 | 
	
		
			
				|  |  | -            else:
 | 
	
		
			
				|  |  | -                stub.SetNotServing(empty_pb2.Empty())
 | 
	
		
			
				|  |  | +            health_stub = health_pb2_grpc.HealthStub(channel)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            retry_count = 5
 | 
	
		
			
				|  |  | +            for i in range(5):
 | 
	
		
			
				|  |  | +                if serving:
 | 
	
		
			
				|  |  | +                    stub.SetServing(empty_pb2.Empty())
 | 
	
		
			
				|  |  | +                else:
 | 
	
		
			
				|  |  | +                    stub.SetNotServing(empty_pb2.Empty())
 | 
	
		
			
				|  |  | +                serving_status = health_stub.Check(
 | 
	
		
			
				|  |  | +                    health_pb2.HealthCheckRequest())
 | 
	
		
			
				|  |  | +                logger.info('got instance service status %s', serving_status)
 | 
	
		
			
				|  |  | +                want_status = health_pb2.HealthCheckResponse.SERVING if serving else health_pb2.HealthCheckResponse.NOT_SERVING
 | 
	
		
			
				|  |  | +                if serving_status.status == want_status:
 | 
	
		
			
				|  |  | +                    break
 | 
	
		
			
				|  |  | +                if i == retry_count - 1:
 | 
	
		
			
				|  |  | +                    raise Exception(
 | 
	
		
			
				|  |  | +                        'failed to set instance service status after %d retries'
 | 
	
		
			
				|  |  | +                        % retry_count)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  def is_primary_instance_group(gcp, instance_group):
 |