| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264 | 
							- #!/usr/bin/env python
 
- # Copyright 2020 gRPC authors.
 
- #
 
- # Licensed under the Apache License, Version 2.0 (the "License");
 
- # you may not use this file except in compliance with the License.
 
- # You may obtain a copy of the License at
 
- #
 
- #     http://www.apache.org/licenses/LICENSE-2.0
 
- #
 
- # Unless required by applicable law or agreed to in writing, software
 
- # distributed under the License is distributed on an "AS IS" BASIS,
 
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
- # See the License for the specific language governing permissions and
 
- # limitations under the License.
 
- """Run xDS integration tests on GCP using Traffic Director."""
 
- import argparse
 
- import googleapiclient.discovery
 
- import grpc
 
- import json
 
- import logging
 
- import os
 
- import random
 
- import shlex
 
- import socket
 
- import subprocess
 
- import sys
 
- import tempfile
 
- import time
 
- from oauth2client.client import GoogleCredentials
 
- import python_utils.jobset as jobset
 
- import python_utils.report_utils as report_utils
 
- from src.proto.grpc.health.v1 import health_pb2
 
- from src.proto.grpc.health.v1 import health_pb2_grpc
 
- from src.proto.grpc.testing import empty_pb2
 
- from src.proto.grpc.testing import messages_pb2
 
- from src.proto.grpc.testing import test_pb2_grpc
 
- logger = logging.getLogger()
 
- console_handler = logging.StreamHandler()
 
- formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
 
- console_handler.setFormatter(formatter)
 
- logger.handlers = []
 
- logger.addHandler(console_handler)
 
- logger.setLevel(logging.WARNING)
 
- _TEST_CASES = [
 
-     'backends_restart',
 
-     'change_backend_service',
 
-     'gentle_failover',
 
-     'ping_pong',
 
-     'remove_instance_group',
 
-     'round_robin',
 
-     'secondary_locality_gets_no_requests_on_partial_primary_failure',
 
-     'secondary_locality_gets_requests_on_primary_failure',
 
-     'traffic_splitting',
 
- ]
 
- # Valid test cases, but not in all. So the tests can only run manually, and
 
- # aren't enabled automatically for all languages.
 
- #
 
- # TODO: Move them into _TEST_CASES when support is ready in all languages.
 
- _ADDITIONAL_TEST_CASES = [
 
-     'path_matching',
 
-     'header_matching',
 
-     'circuit_breaking',
 
- ]
 
- def parse_test_cases(arg):
 
-     if arg == '':
 
-         return []
 
-     arg_split = arg.split(',')
 
-     test_cases = set()
 
-     all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES
 
-     for arg in arg_split:
 
-         if arg == "all":
 
-             test_cases = test_cases.union(_TEST_CASES)
 
-         else:
 
-             test_cases = test_cases.union([arg])
 
-     if not all([test_case in all_test_cases for test_case in test_cases]):
 
-         raise Exception('Failed to parse test cases %s' % arg)
 
-     # Perserve order.
 
-     return [x for x in all_test_cases if x in test_cases]
 
- def parse_port_range(port_arg):
 
-     try:
 
-         port = int(port_arg)
 
-         return range(port, port + 1)
 
-     except:
 
-         port_min, port_max = port_arg.split(':')
 
-         return range(int(port_min), int(port_max) + 1)
 
- argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP')
 
- argp.add_argument('--project_id', help='GCP project id')
 
- argp.add_argument(
 
-     '--gcp_suffix',
 
-     default='',
 
-     help='Optional suffix for all generated GCP resource names. Useful to '
 
-     'ensure distinct names across test runs.')
 
- argp.add_argument(
 
-     '--test_case',
 
-     default='ping_pong',
 
-     type=parse_test_cases,
 
-     help='Comma-separated list of test cases to run. Available tests: %s, '
 
-     '(or \'all\' to run every test). '
 
-     'Alternative tests not included in \'all\': %s' %
 
-     (','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES)))
 
- argp.add_argument(
 
-     '--bootstrap_file',
 
-     default='',
 
-     help='File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in '
 
-     'bootstrap generation')
 
- argp.add_argument(
 
-     '--xds_v3_support',
 
-     default=False,
 
-     action='store_true',
 
-     help='Support xDS v3 via GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. '
 
-     'If a pre-created bootstrap file is provided via the --bootstrap_file '
 
-     'parameter, it should include xds_v3 in its server_features field.')
 
- argp.add_argument(
 
-     '--client_cmd',
 
-     default=None,
 
-     help='Command to launch xDS test client. {server_uri}, {stats_port} and '
 
-     '{qps} references will be replaced using str.format(). GRPC_XDS_BOOTSTRAP '
 
-     'will be set for the command')
 
- argp.add_argument(
 
-     '--client_hosts',
 
-     default=None,
 
-     help='Comma-separated list of hosts running client processes. If set, '
 
-     '--client_cmd is ignored and client processes are assumed to be running on '
 
-     'the specified hosts.')
 
- argp.add_argument('--zone', default='us-central1-a')
 
- argp.add_argument('--secondary_zone',
 
-                   default='us-west1-b',
 
-                   help='Zone to use for secondary TD locality tests')
 
- argp.add_argument('--qps', default=100, type=int, help='Client QPS')
 
- argp.add_argument(
 
-     '--wait_for_backend_sec',
 
-     default=1200,
 
-     type=int,
 
-     help='Time limit for waiting for created backend services to report '
 
-     'healthy when launching or updated GCP resources')
 
- argp.add_argument(
 
-     '--use_existing_gcp_resources',
 
-     default=False,
 
-     action='store_true',
 
-     help=
 
-     'If set, find and use already created GCP resources instead of creating new'
 
-     ' ones.')
 
- argp.add_argument(
 
-     '--keep_gcp_resources',
 
-     default=False,
 
-     action='store_true',
 
-     help=
 
-     'Leave GCP VMs and configuration running after test. Default behavior is '
 
-     'to delete when tests complete.')
 
- argp.add_argument(
 
-     '--compute_discovery_document',
 
-     default=None,
 
-     type=str,
 
-     help=
 
-     'If provided, uses this file instead of retrieving via the GCP discovery '
 
-     'API')
 
- argp.add_argument(
 
-     '--alpha_compute_discovery_document',
 
-     default=None,
 
-     type=str,
 
-     help='If provided, uses this file instead of retrieving via the alpha GCP '
 
-     'discovery API')
 
- argp.add_argument('--network',
 
-                   default='global/networks/default',
 
-                   help='GCP network to use')
 
- argp.add_argument('--service_port_range',
 
-                   default='8080:8110',
 
-                   type=parse_port_range,
 
-                   help='Listening port for created gRPC backends. Specified as '
 
-                   'either a single int or as a range in the format min:max, in '
 
-                   'which case an available port p will be chosen s.t. min <= p '
 
-                   '<= max')
 
- argp.add_argument(
 
-     '--stats_port',
 
-     default=8079,
 
-     type=int,
 
-     help='Local port for the client process to expose the LB stats service')
 
- argp.add_argument('--xds_server',
 
-                   default='trafficdirector.googleapis.com:443',
 
-                   help='xDS server')
 
- argp.add_argument('--source_image',
 
-                   default='projects/debian-cloud/global/images/family/debian-9',
 
-                   help='Source image for VMs created during the test')
 
- argp.add_argument('--path_to_server_binary',
 
-                   default=None,
 
-                   type=str,
 
-                   help='If set, the server binary must already be pre-built on '
 
-                   'the specified source image')
 
- argp.add_argument('--machine_type',
 
-                   default='e2-standard-2',
 
-                   help='Machine type for VMs created during the test')
 
- argp.add_argument(
 
-     '--instance_group_size',
 
-     default=2,
 
-     type=int,
 
-     help='Number of VMs to create per instance group. Certain test cases (e.g., '
 
-     'round_robin) may not give meaningful results if this is set to a value '
 
-     'less than 2.')
 
- argp.add_argument('--verbose',
 
-                   help='verbose log output',
 
-                   default=False,
 
-                   action='store_true')
 
- # TODO(ericgribkoff) Remove this param once the sponge-formatted log files are
 
- # visible in all test environments.
 
- argp.add_argument('--log_client_output',
 
-                   help='Log captured client output',
 
-                   default=False,
 
-                   action='store_true')
 
- # TODO(ericgribkoff) Remove this flag once all test environments are verified to
 
- # have access to the alpha compute APIs.
 
- argp.add_argument('--only_stable_gcp_apis',
 
-                   help='Do not use alpha compute APIs. Some tests may be '
 
-                   'incompatible with this option (gRPC health checks are '
 
-                   'currently alpha and required for simulating server failure',
 
-                   default=False,
 
-                   action='store_true')
 
- args = argp.parse_args()
 
- if args.verbose:
 
-     logger.setLevel(logging.DEBUG)
 
- CLIENT_HOSTS = []
 
- if args.client_hosts:
 
-     CLIENT_HOSTS = args.client_hosts.split(',')
 
- _DEFAULT_SERVICE_PORT = 80
 
- _WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec
 
- _WAIT_FOR_OPERATION_SEC = 1200
 
- _INSTANCE_GROUP_SIZE = args.instance_group_size
 
- _NUM_TEST_RPCS = 10 * args.qps
 
- _WAIT_FOR_STATS_SEC = 360
 
- _WAIT_FOR_VALID_CONFIG_SEC = 60
 
- _WAIT_FOR_URL_MAP_PATCH_SEC = 300
 
- _CONNECTION_TIMEOUT_SEC = 60
 
- _GCP_API_RETRIES = 5
 
- _BOOTSTRAP_TEMPLATE = """
 
- {{
 
-   "node": {{
 
-     "id": "{node_id}",
 
-     "metadata": {{
 
-       "TRAFFICDIRECTOR_NETWORK_NAME": "%s"
 
-     }},
 
-     "locality": {{
 
-       "zone": "%s"
 
-     }}
 
-   }},
 
-   "xds_servers": [{{
 
-     "server_uri": "%s",
 
-     "channel_creds": [
 
-       {{
 
-         "type": "google_default",
 
-         "config": {{}}
 
-       }}
 
-     ],
 
-     "server_features": {server_features}
 
-   }}]
 
- }}""" % (args.network.split('/')[-1], args.zone, args.xds_server)
 
- # TODO(ericgribkoff) Add change_backend_service to this list once TD no longer
 
- # sends an update with no localities when adding the MIG to the backend service
 
- # can race with the URL map patch.
 
- _TESTS_TO_FAIL_ON_RPC_FAILURE = ['ping_pong', 'round_robin']
 
- # Tests that run UnaryCall and EmptyCall.
 
- _TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching']
 
- # Tests that make UnaryCall with test metadata.
 
- _TESTS_TO_SEND_METADATA = ['header_matching']
 
- _TEST_METADATA_KEY = 'xds_md'
 
- _TEST_METADATA_VALUE_UNARY = 'unary_yranu'
 
- _TEST_METADATA_VALUE_EMPTY = 'empty_ytpme'
 
- _PATH_MATCHER_NAME = 'path-matcher'
 
- _BASE_TEMPLATE_NAME = 'test-template'
 
- _BASE_INSTANCE_GROUP_NAME = 'test-ig'
 
- _BASE_HEALTH_CHECK_NAME = 'test-hc'
 
- _BASE_FIREWALL_RULE_NAME = 'test-fw-rule'
 
- _BASE_BACKEND_SERVICE_NAME = 'test-backend-service'
 
- _BASE_URL_MAP_NAME = 'test-map'
 
- _BASE_SERVICE_HOST = 'grpc-test'
 
- _BASE_TARGET_PROXY_NAME = 'test-target-proxy'
 
- _BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule'
 
- _TEST_LOG_BASE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)),
 
-                                   '../../reports')
 
- _SPONGE_LOG_NAME = 'sponge_log.log'
 
- _SPONGE_XML_NAME = 'sponge_log.xml'
 
- def get_client_stats(num_rpcs, timeout_sec):
 
-     if CLIENT_HOSTS:
 
-         hosts = CLIENT_HOSTS
 
-     else:
 
-         hosts = ['localhost']
 
-     for host in hosts:
 
-         with grpc.insecure_channel('%s:%d' %
 
-                                    (host, args.stats_port)) as channel:
 
-             stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
 
-             request = messages_pb2.LoadBalancerStatsRequest()
 
-             request.num_rpcs = num_rpcs
 
-             request.timeout_sec = timeout_sec
 
-             rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC
 
-             logger.debug('Invoking GetClientStats RPC to %s:%d:', host,
 
-                          args.stats_port)
 
-             response = stub.GetClientStats(request,
 
-                                            wait_for_ready=True,
 
-                                            timeout=rpc_timeout)
 
-             logger.debug('Invoked GetClientStats RPC to %s: %s', host, response)
 
-             return response
 
- def get_client_accumulated_stats():
 
-     if CLIENT_HOSTS:
 
-         hosts = CLIENT_HOSTS
 
-     else:
 
-         hosts = ['localhost']
 
-     for host in hosts:
 
-         with grpc.insecure_channel('%s:%d' %
 
-                                    (host, args.stats_port)) as channel:
 
-             stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
 
-             request = messages_pb2.LoadBalancerAccumulatedStatsRequest()
 
-             logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:',
 
-                          host, args.stats_port)
 
-             response = stub.GetClientAccumulatedStats(
 
-                 request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC)
 
-             logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s',
 
-                          host, response)
 
-             return response
 
- def configure_client(rpc_types, metadata):
 
-     if CLIENT_HOSTS:
 
-         hosts = CLIENT_HOSTS
 
-     else:
 
-         hosts = ['localhost']
 
-     for host in hosts:
 
-         with grpc.insecure_channel('%s:%d' %
 
-                                    (host, args.stats_port)) as channel:
 
-             stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel)
 
-             request = messages_pb2.ClientConfigureRequest()
 
-             request.types.extend(rpc_types)
 
-             for rpc_type, md_key, md_value in metadata:
 
-                 md = request.metadata.add()
 
-                 md.type = rpc_type
 
-                 md.key = md_key
 
-                 md.value = md_value
 
-             logger.debug(
 
-                 'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s',
 
-                 host, args.stats_port, request)
 
-             stub.Configure(request,
 
-                            wait_for_ready=True,
 
-                            timeout=_CONNECTION_TIMEOUT_SEC)
 
-             logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s',
 
-                          host)
 
- class RpcDistributionError(Exception):
 
-     pass
 
- def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs,
 
-                                    allow_failures):
 
-     start_time = time.time()
 
-     error_msg = None
 
-     logger.debug('Waiting for %d sec until backends %s receive load' %
 
-                  (timeout_sec, backends))
 
-     while time.time() - start_time <= timeout_sec:
 
-         error_msg = None
 
-         stats = get_client_stats(num_rpcs, timeout_sec)
 
-         rpcs_by_peer = stats.rpcs_by_peer
 
-         for backend in backends:
 
-             if backend not in rpcs_by_peer:
 
-                 error_msg = 'Backend %s did not receive load' % backend
 
-                 break
 
-         if not error_msg and len(rpcs_by_peer) > len(backends):
 
-             error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer
 
-         if not allow_failures and stats.num_failures > 0:
 
-             error_msg = '%d RPCs failed' % stats.num_failures
 
-         if not error_msg:
 
-             return
 
-     raise RpcDistributionError(error_msg)
 
- def wait_until_all_rpcs_go_to_given_backends_or_fail(backends,
 
-                                                      timeout_sec,
 
-                                                      num_rpcs=_NUM_TEST_RPCS):
 
-     _verify_rpcs_to_given_backends(backends,
 
-                                    timeout_sec,
 
-                                    num_rpcs,
 
-                                    allow_failures=True)
 
- def wait_until_all_rpcs_go_to_given_backends(backends,
 
-                                              timeout_sec,
 
-                                              num_rpcs=_NUM_TEST_RPCS):
 
-     _verify_rpcs_to_given_backends(backends,
 
-                                    timeout_sec,
 
-                                    num_rpcs,
 
-                                    allow_failures=False)
 
- def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold):
 
-     '''Block until the test client reaches the state with the given number
 
-     of RPCs being outstanding stably.
 
-     Args:
 
-       rpc_type: A string indicating the RPC method to check for. Either
 
-         'UnaryCall' or 'EmptyCall'.
 
-       timeout_sec: Maximum number of seconds to wait until the desired state
 
-         is reached.
 
-       num_rpcs: Expected number of RPCs to be in-flight.
 
-       threshold: Number within [0,100], the tolerable percentage by which
 
-         the actual number of RPCs in-flight can differ from the expected number.
 
-     '''
 
-     if threshold < 0 or threshold > 100:
 
-         raise ValueError('Value error: Threshold should be between 0 to 100')
 
-     threshold_fraction = threshold / 100.0
 
-     start_time = time.time()
 
-     error_msg = None
 
-     logger.debug(
 
-         'Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' %
 
-         (timeout_sec, num_rpcs, rpc_type, threshold))
 
-     while time.time() - start_time <= timeout_sec:
 
-         error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold,
 
-                                           threshold_fraction)
 
-         if error_msg:
 
-             logger.debug('Progress: %s', error_msg)
 
-             time.sleep(2)
 
-         else:
 
-             break
 
-     # Ensure the number of outstanding RPCs is stable.
 
-     if not error_msg:
 
-         time.sleep(5)
 
-         error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold,
 
-                                           threshold_fraction)
 
-     if error_msg:
 
-         raise Exception("Wrong number of %s RPCs in-flight: %s" %
 
-                         (rpc_type, error_msg))
 
- def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction):
 
-     error_msg = None
 
-     stats = get_client_accumulated_stats()
 
-     rpcs_started = stats.num_rpcs_started_by_method[rpc_type]
 
-     rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type]
 
-     rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type]
 
-     rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed
 
-     if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)):
 
-         error_msg = ('actual(%d) < expected(%d - %d%%)' %
 
-                      (rpcs_in_flight, num_rpcs, threshold))
 
-     elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)):
 
-         error_msg = ('actual(%d) > expected(%d + %d%%)' %
 
-                      (rpcs_in_flight, num_rpcs, threshold))
 
-     return error_msg
 
- def compare_distributions(actual_distribution, expected_distribution,
 
-                           threshold):
 
-     """Compare if two distributions are similar.
 
-     Args:
 
-       actual_distribution: A list of floats, contains the actual distribution.
 
-       expected_distribution: A list of floats, contains the expected distribution.
 
-       threshold: Number within [0,100], the threshold percentage by which the
 
-         actual distribution can differ from the expected distribution.
 
-     Returns:
 
-       The similarity between the distributions as a boolean. Returns true if the
 
-       actual distribution lies within the threshold of the expected
 
-       distribution, false otherwise.
 
-     
 
-     Raises:
 
-       ValueError: if threshold is not with in [0,100].
 
-       Exception: containing detailed error messages.
 
-     """
 
-     if len(expected_distribution) != len(actual_distribution):
 
-         raise Exception(
 
-             'Error: expected and actual distributions have different size (%d vs %d)'
 
-             % (len(expected_distribution), len(actual_distribution)))
 
-     if threshold < 0 or threshold > 100:
 
-         raise ValueError('Value error: Threshold should be between 0 to 100')
 
-     threshold_fraction = threshold / 100.0
 
-     for expected, actual in zip(expected_distribution, actual_distribution):
 
-         if actual < (expected * (1 - threshold_fraction)):
 
-             raise Exception("actual(%f) < expected(%f-%d%%)" %
 
-                             (actual, expected, threshold))
 
-         if actual > (expected * (1 + threshold_fraction)):
 
-             raise Exception("actual(%f) > expected(%f+%d%%)" %
 
-                             (actual, expected, threshold))
 
-     return True
 
- def compare_expected_instances(stats, expected_instances):
 
-     """Compare if stats have expected instances for each type of RPC.
 
-     Args:
 
-       stats: LoadBalancerStatsResponse reported by interop client.
 
-       expected_instances: a dict with key as the RPC type (string), value as
 
-         the expected backend instances (list of strings).
 
-     Returns:
 
-       Returns true if the instances are expected. False if not.
 
-     """
 
-     for rpc_type, expected_peers in expected_instances.items():
 
-         rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type]
 
-         rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None
 
-         logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer)
 
-         peers = list(rpcs_by_peer.keys())
 
-         if set(peers) != set(expected_peers):
 
-             logger.info('unexpected peers for %s, got %s, want %s', rpc_type,
 
-                         peers, expected_peers)
 
-             return False
 
-     return True
 
- def test_backends_restart(gcp, backend_service, instance_group):
 
-     logger.info('Running test_backends_restart')
 
-     instance_names = get_instance_names(gcp, instance_group)
 
-     num_instances = len(instance_names)
 
-     start_time = time.time()
 
-     wait_until_all_rpcs_go_to_given_backends(instance_names,
 
-                                              _WAIT_FOR_STATS_SEC)
 
-     try:
 
-         resize_instance_group(gcp, instance_group, 0)
 
-         wait_until_all_rpcs_go_to_given_backends_or_fail([],
 
-                                                          _WAIT_FOR_BACKEND_SEC)
 
-     finally:
 
-         resize_instance_group(gcp, instance_group, num_instances)
 
-     wait_for_healthy_backends(gcp, backend_service, instance_group)
 
-     new_instance_names = get_instance_names(gcp, instance_group)
 
-     wait_until_all_rpcs_go_to_given_backends(new_instance_names,
 
-                                              _WAIT_FOR_BACKEND_SEC)
 
- def test_change_backend_service(gcp, original_backend_service, instance_group,
 
-                                 alternate_backend_service,
 
-                                 same_zone_instance_group):
 
-     logger.info('Running test_change_backend_service')
 
-     original_backend_instances = get_instance_names(gcp, instance_group)
 
-     alternate_backend_instances = get_instance_names(gcp,
 
-                                                      same_zone_instance_group)
 
-     patch_backend_service(gcp, alternate_backend_service,
 
-                           [same_zone_instance_group])
 
-     wait_for_healthy_backends(gcp, original_backend_service, instance_group)
 
-     wait_for_healthy_backends(gcp, alternate_backend_service,
 
-                               same_zone_instance_group)
 
-     wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
 
-                                              _WAIT_FOR_STATS_SEC)
 
-     try:
 
-         patch_url_map_backend_service(gcp, alternate_backend_service)
 
-         wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances,
 
-                                                  _WAIT_FOR_URL_MAP_PATCH_SEC)
 
-     finally:
 
-         patch_url_map_backend_service(gcp, original_backend_service)
 
-         patch_backend_service(gcp, alternate_backend_service, [])
 
- def test_gentle_failover(gcp,
 
-                          backend_service,
 
-                          primary_instance_group,
 
-                          secondary_instance_group,
 
-                          swapped_primary_and_secondary=False):
 
-     logger.info('Running test_gentle_failover')
 
-     num_primary_instances = len(get_instance_names(gcp, primary_instance_group))
 
-     min_instances_for_gentle_failover = 3  # Need >50% failure to start failover
 
-     try:
 
-         if num_primary_instances < min_instances_for_gentle_failover:
 
-             resize_instance_group(gcp, primary_instance_group,
 
-                                   min_instances_for_gentle_failover)
 
-         patch_backend_service(
 
-             gcp, backend_service,
 
-             [primary_instance_group, secondary_instance_group])
 
-         primary_instance_names = get_instance_names(gcp, primary_instance_group)
 
-         secondary_instance_names = get_instance_names(gcp,
 
-                                                       secondary_instance_group)
 
-         wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
 
-         wait_for_healthy_backends(gcp, backend_service,
 
-                                   secondary_instance_group)
 
-         wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
 
-                                                  _WAIT_FOR_STATS_SEC)
 
-         instances_to_stop = primary_instance_names[:-1]
 
-         remaining_instances = primary_instance_names[-1:]
 
-         try:
 
-             set_serving_status(instances_to_stop,
 
-                                gcp.service_port,
 
-                                serving=False)
 
-             wait_until_all_rpcs_go_to_given_backends(
 
-                 remaining_instances + secondary_instance_names,
 
-                 _WAIT_FOR_BACKEND_SEC)
 
-         finally:
 
-             set_serving_status(primary_instance_names,
 
-                                gcp.service_port,
 
-                                serving=True)
 
-     except RpcDistributionError as e:
 
-         if not swapped_primary_and_secondary and is_primary_instance_group(
 
-                 gcp, secondary_instance_group):
 
-             # Swap expectation of primary and secondary instance groups.
 
-             test_gentle_failover(gcp,
 
-                                  backend_service,
 
-                                  secondary_instance_group,
 
-                                  primary_instance_group,
 
-                                  swapped_primary_and_secondary=True)
 
-         else:
 
-             raise e
 
-     finally:
 
-         patch_backend_service(gcp, backend_service, [primary_instance_group])
 
-         resize_instance_group(gcp, primary_instance_group,
 
-                               num_primary_instances)
 
-         instance_names = get_instance_names(gcp, primary_instance_group)
 
-         wait_until_all_rpcs_go_to_given_backends(instance_names,
 
-                                                  _WAIT_FOR_BACKEND_SEC)
 
- def test_ping_pong(gcp, backend_service, instance_group):
 
-     logger.info('Running test_ping_pong')
 
-     wait_for_healthy_backends(gcp, backend_service, instance_group)
 
-     instance_names = get_instance_names(gcp, instance_group)
 
-     wait_until_all_rpcs_go_to_given_backends(instance_names,
 
-                                              _WAIT_FOR_STATS_SEC)
 
- def test_remove_instance_group(gcp, backend_service, instance_group,
 
-                                same_zone_instance_group):
 
-     logger.info('Running test_remove_instance_group')
 
-     try:
 
-         patch_backend_service(gcp,
 
-                               backend_service,
 
-                               [instance_group, same_zone_instance_group],
 
-                               balancing_mode='RATE')
 
-         wait_for_healthy_backends(gcp, backend_service, instance_group)
 
-         wait_for_healthy_backends(gcp, backend_service,
 
-                                   same_zone_instance_group)
 
-         instance_names = get_instance_names(gcp, instance_group)
 
-         same_zone_instance_names = get_instance_names(gcp,
 
-                                                       same_zone_instance_group)
 
-         try:
 
-             wait_until_all_rpcs_go_to_given_backends(
 
-                 instance_names + same_zone_instance_names,
 
-                 _WAIT_FOR_OPERATION_SEC)
 
-             remaining_instance_group = same_zone_instance_group
 
-             remaining_instance_names = same_zone_instance_names
 
-         except RpcDistributionError as e:
 
-             # If connected to TD in a different zone, we may route traffic to
 
-             # only one instance group. Determine which group that is to continue
 
-             # with the remainder of the test case.
 
-             try:
 
-                 wait_until_all_rpcs_go_to_given_backends(
 
-                     instance_names, _WAIT_FOR_STATS_SEC)
 
-                 remaining_instance_group = same_zone_instance_group
 
-                 remaining_instance_names = same_zone_instance_names
 
-             except RpcDistributionError as e:
 
-                 wait_until_all_rpcs_go_to_given_backends(
 
-                     same_zone_instance_names, _WAIT_FOR_STATS_SEC)
 
-                 remaining_instance_group = instance_group
 
-                 remaining_instance_names = instance_names
 
-         patch_backend_service(gcp,
 
-                               backend_service, [remaining_instance_group],
 
-                               balancing_mode='RATE')
 
-         wait_until_all_rpcs_go_to_given_backends(remaining_instance_names,
 
-                                                  _WAIT_FOR_BACKEND_SEC)
 
-     finally:
 
-         patch_backend_service(gcp, backend_service, [instance_group])
 
-         wait_until_all_rpcs_go_to_given_backends(instance_names,
 
-                                                  _WAIT_FOR_BACKEND_SEC)
 
- def test_round_robin(gcp, backend_service, instance_group):
 
-     logger.info('Running test_round_robin')
 
-     wait_for_healthy_backends(gcp, backend_service, instance_group)
 
-     instance_names = get_instance_names(gcp, instance_group)
 
-     threshold = 1
 
-     wait_until_all_rpcs_go_to_given_backends(instance_names,
 
-                                              _WAIT_FOR_STATS_SEC)
 
-     # TODO(ericgribkoff) Delayed config propagation from earlier tests
 
-     # may result in briefly receiving an empty EDS update, resulting in failed
 
-     # RPCs. Retry distribution validation if this occurs; long-term fix is
 
-     # creating new backend resources for each individual test case.
 
-     # Each attempt takes 10 seconds. Config propagation can take several
 
-     # minutes.
 
-     max_attempts = 40
 
-     for i in range(max_attempts):
 
-         stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
 
-         requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]
 
-         total_requests_received = sum(requests_received)
 
-         if total_requests_received != _NUM_TEST_RPCS:
 
-             logger.info('Unexpected RPC failures, retrying: %s', stats)
 
-             continue
 
-         expected_requests = total_requests_received / len(instance_names)
 
-         for instance in instance_names:
 
-             if abs(stats.rpcs_by_peer[instance] -
 
-                    expected_requests) > threshold:
 
-                 raise Exception(
 
-                     'RPC peer distribution differs from expected by more than %d '
 
-                     'for instance %s (%s)' % (threshold, instance, stats))
 
-         return
 
-     raise Exception('RPC failures persisted through %d retries' % max_attempts)
 
- def test_secondary_locality_gets_no_requests_on_partial_primary_failure(
 
-         gcp,
 
-         backend_service,
 
-         primary_instance_group,
 
-         secondary_instance_group,
 
-         swapped_primary_and_secondary=False):
 
-     logger.info(
 
-         'Running secondary_locality_gets_no_requests_on_partial_primary_failure'
 
-     )
 
-     try:
 
-         patch_backend_service(
 
-             gcp, backend_service,
 
-             [primary_instance_group, secondary_instance_group])
 
-         wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
 
-         wait_for_healthy_backends(gcp, backend_service,
 
-                                   secondary_instance_group)
 
-         primary_instance_names = get_instance_names(gcp, primary_instance_group)
 
-         wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
 
-                                                  _WAIT_FOR_STATS_SEC)
 
-         instances_to_stop = primary_instance_names[:1]
 
-         remaining_instances = primary_instance_names[1:]
 
-         try:
 
-             set_serving_status(instances_to_stop,
 
-                                gcp.service_port,
 
-                                serving=False)
 
-             wait_until_all_rpcs_go_to_given_backends(remaining_instances,
 
-                                                      _WAIT_FOR_BACKEND_SEC)
 
-         finally:
 
-             set_serving_status(primary_instance_names,
 
-                                gcp.service_port,
 
-                                serving=True)
 
-     except RpcDistributionError as e:
 
-         if not swapped_primary_and_secondary and is_primary_instance_group(
 
-                 gcp, secondary_instance_group):
 
-             # Swap expectation of primary and secondary instance groups.
 
-             test_secondary_locality_gets_no_requests_on_partial_primary_failure(
 
-                 gcp,
 
-                 backend_service,
 
-                 secondary_instance_group,
 
-                 primary_instance_group,
 
-                 swapped_primary_and_secondary=True)
 
-         else:
 
-             raise e
 
-     finally:
 
-         patch_backend_service(gcp, backend_service, [primary_instance_group])
 
- def test_secondary_locality_gets_requests_on_primary_failure(
 
-         gcp,
 
-         backend_service,
 
-         primary_instance_group,
 
-         secondary_instance_group,
 
-         swapped_primary_and_secondary=False):
 
-     logger.info('Running secondary_locality_gets_requests_on_primary_failure')
 
-     try:
 
-         patch_backend_service(
 
-             gcp, backend_service,
 
-             [primary_instance_group, secondary_instance_group])
 
-         wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
 
-         wait_for_healthy_backends(gcp, backend_service,
 
-                                   secondary_instance_group)
 
-         primary_instance_names = get_instance_names(gcp, primary_instance_group)
 
-         secondary_instance_names = get_instance_names(gcp,
 
-                                                       secondary_instance_group)
 
-         wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
 
-                                                  _WAIT_FOR_STATS_SEC)
 
-         try:
 
-             set_serving_status(primary_instance_names,
 
-                                gcp.service_port,
 
-                                serving=False)
 
-             wait_until_all_rpcs_go_to_given_backends(secondary_instance_names,
 
-                                                      _WAIT_FOR_BACKEND_SEC)
 
-         finally:
 
-             set_serving_status(primary_instance_names,
 
-                                gcp.service_port,
 
-                                serving=True)
 
-     except RpcDistributionError as e:
 
-         if not swapped_primary_and_secondary and is_primary_instance_group(
 
-                 gcp, secondary_instance_group):
 
-             # Swap expectation of primary and secondary instance groups.
 
-             test_secondary_locality_gets_requests_on_primary_failure(
 
-                 gcp,
 
-                 backend_service,
 
-                 secondary_instance_group,
 
-                 primary_instance_group,
 
-                 swapped_primary_and_secondary=True)
 
-         else:
 
-             raise e
 
-     finally:
 
-         patch_backend_service(gcp, backend_service, [primary_instance_group])
 
- def prepare_services_for_urlmap_tests(gcp, original_backend_service,
 
-                                       instance_group, alternate_backend_service,
 
-                                       same_zone_instance_group):
 
-     '''
 
-     This function prepares the services to be ready for tests that modifies
 
-     urlmaps.
 
-     Returns:
 
-       Returns original and alternate backend names as lists of strings.
 
-     '''
 
-     logger.info('waiting for original backends to become healthy')
 
-     wait_for_healthy_backends(gcp, original_backend_service, instance_group)
 
-     patch_backend_service(gcp, alternate_backend_service,
 
-                           [same_zone_instance_group])
 
-     logger.info('waiting for alternate to become healthy')
 
-     wait_for_healthy_backends(gcp, alternate_backend_service,
 
-                               same_zone_instance_group)
 
-     original_backend_instances = get_instance_names(gcp, instance_group)
 
-     logger.info('original backends instances: %s', original_backend_instances)
 
-     alternate_backend_instances = get_instance_names(gcp,
 
-                                                      same_zone_instance_group)
 
-     logger.info('alternate backends instances: %s', alternate_backend_instances)
 
-     # Start with all traffic going to original_backend_service.
 
-     logger.info('waiting for traffic to all go to original backends')
 
-     wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
 
-                                              _WAIT_FOR_STATS_SEC)
 
-     return original_backend_instances, alternate_backend_instances
 
- def test_traffic_splitting(gcp, original_backend_service, instance_group,
 
-                            alternate_backend_service, same_zone_instance_group):
 
-     # This test start with all traffic going to original_backend_service. Then
 
-     # it updates URL-map to set default action to traffic splitting between
 
-     # original and alternate. It waits for all backends in both services to
 
-     # receive traffic, then verifies that weights are expected.
 
-     logger.info('Running test_traffic_splitting')
 
-     original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
 
-         gcp, original_backend_service, instance_group,
 
-         alternate_backend_service, same_zone_instance_group)
 
-     try:
 
-         # Patch urlmap, change route action to traffic splitting between
 
-         # original and alternate.
 
-         logger.info('patching url map with traffic splitting')
 
-         original_service_percentage, alternate_service_percentage = 20, 80
 
-         patch_url_map_backend_service(
 
-             gcp,
 
-             services_with_weights={
 
-                 original_backend_service: original_service_percentage,
 
-                 alternate_backend_service: alternate_service_percentage,
 
-             })
 
-         # Split percentage between instances: [20,80] -> [10,10,40,40].
 
-         expected_instance_percentage = [
 
-             original_service_percentage * 1.0 / len(original_backend_instances)
 
-         ] * len(original_backend_instances) + [
 
-             alternate_service_percentage * 1.0 /
 
-             len(alternate_backend_instances)
 
-         ] * len(alternate_backend_instances)
 
-         # Wait for traffic to go to both services.
 
-         logger.info(
 
-             'waiting for traffic to go to all backends (including alternate)')
 
-         wait_until_all_rpcs_go_to_given_backends(
 
-             original_backend_instances + alternate_backend_instances,
 
-             _WAIT_FOR_STATS_SEC)
 
-         # Verify that weights between two services are expected.
 
-         retry_count = 10
 
-         # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
 
-         # seconds timeout.
 
-         for i in range(retry_count):
 
-             stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
 
-             got_instance_count = [
 
-                 stats.rpcs_by_peer[i] for i in original_backend_instances
 
-             ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances]
 
-             total_count = sum(got_instance_count)
 
-             got_instance_percentage = [
 
-                 x * 100.0 / total_count for x in got_instance_count
 
-             ]
 
-             try:
 
-                 compare_distributions(got_instance_percentage,
 
-                                       expected_instance_percentage, 5)
 
-             except Exception as e:
 
-                 logger.info('attempt %d', i)
 
-                 logger.info('got percentage: %s', got_instance_percentage)
 
-                 logger.info('expected percentage: %s',
 
-                             expected_instance_percentage)
 
-                 logger.info(e)
 
-                 if i == retry_count - 1:
 
-                     raise Exception(
 
-                         'RPC distribution (%s) differs from expected (%s)' %
 
-                         (got_instance_percentage, expected_instance_percentage))
 
-             else:
 
-                 logger.info("success")
 
-                 break
 
-     finally:
 
-         patch_url_map_backend_service(gcp, original_backend_service)
 
-         patch_backend_service(gcp, alternate_backend_service, [])
 
- def test_path_matching(gcp, original_backend_service, instance_group,
 
-                        alternate_backend_service, same_zone_instance_group):
 
-     # This test start with all traffic (UnaryCall and EmptyCall) going to
 
-     # original_backend_service.
 
-     #
 
-     # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to
 
-     # go different backends. It waits for all backends in both services to
 
-     # receive traffic, then verifies that traffic goes to the expected
 
-     # backends.
 
-     logger.info('Running test_path_matching')
 
-     original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
 
-         gcp, original_backend_service, instance_group,
 
-         alternate_backend_service, same_zone_instance_group)
 
-     try:
 
-         # A list of tuples (route_rules, expected_instances).
 
-         test_cases = [
 
-             (
 
-                 [{
 
-                     'priority': 0,
 
-                     # FullPath EmptyCall -> alternate_backend_service.
 
-                     'matchRules': [{
 
-                         'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
 
-                     }],
 
-                     'service': alternate_backend_service.url
 
-                 }],
 
-                 {
 
-                     "EmptyCall": alternate_backend_instances,
 
-                     "UnaryCall": original_backend_instances
 
-                 }),
 
-             (
 
-                 [{
 
-                     'priority': 0,
 
-                     # Prefix UnaryCall -> alternate_backend_service.
 
-                     'matchRules': [{
 
-                         'prefixMatch': '/grpc.testing.TestService/Unary'
 
-                     }],
 
-                     'service': alternate_backend_service.url
 
-                 }],
 
-                 {
 
-                     "UnaryCall": alternate_backend_instances,
 
-                     "EmptyCall": original_backend_instances
 
-                 }),
 
-             (
 
-                 # This test case is similar to the one above (but with route
 
-                 # services swapped). This test has two routes (full_path and
 
-                 # the default) to match EmptyCall, and both routes set
 
-                 # alternative_backend_service as the action. This forces the
 
-                 # client to handle duplicate Clusters in the RDS response.
 
-                 [
 
-                     {
 
-                         'priority': 0,
 
-                         # Prefix UnaryCall -> original_backend_service.
 
-                         'matchRules': [{
 
-                             'prefixMatch': '/grpc.testing.TestService/Unary'
 
-                         }],
 
-                         'service': original_backend_service.url
 
-                     },
 
-                     {
 
-                         'priority': 1,
 
-                         # FullPath EmptyCall -> alternate_backend_service.
 
-                         'matchRules': [{
 
-                             'fullPathMatch':
 
-                                 '/grpc.testing.TestService/EmptyCall'
 
-                         }],
 
-                         'service': alternate_backend_service.url
 
-                     }
 
-                 ],
 
-                 {
 
-                     "UnaryCall": original_backend_instances,
 
-                     "EmptyCall": alternate_backend_instances
 
-                 })
 
-         ]
 
-         for (route_rules, expected_instances) in test_cases:
 
-             logger.info('patching url map with %s', route_rules)
 
-             patch_url_map_backend_service(gcp,
 
-                                           original_backend_service,
 
-                                           route_rules=route_rules)
 
-             # Wait for traffic to go to both services.
 
-             logger.info(
 
-                 'waiting for traffic to go to all backends (including alternate)'
 
-             )
 
-             wait_until_all_rpcs_go_to_given_backends(
 
-                 original_backend_instances + alternate_backend_instances,
 
-                 _WAIT_FOR_STATS_SEC)
 
-             retry_count = 20
 
-             # Each attempt takes about 10 seconds, 20 retries is equivalent to 200
 
-             # seconds timeout.
 
-             for i in range(retry_count):
 
-                 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
 
-                 if not stats.rpcs_by_method:
 
-                     raise ValueError(
 
-                         'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
 
-                     )
 
-                 logger.info('attempt %d', i)
 
-                 if compare_expected_instances(stats, expected_instances):
 
-                     logger.info("success")
 
-                     break
 
-     finally:
 
-         patch_url_map_backend_service(gcp, original_backend_service)
 
-         patch_backend_service(gcp, alternate_backend_service, [])
 
- def test_header_matching(gcp, original_backend_service, instance_group,
 
-                          alternate_backend_service, same_zone_instance_group):
 
-     # This test start with all traffic (UnaryCall and EmptyCall) going to
 
-     # original_backend_service.
 
-     #
 
-     # Then it updates URL-map to add routes, to make RPCs with test headers to
 
-     # go to different backends. It waits for all backends in both services to
 
-     # receive traffic, then verifies that traffic goes to the expected
 
-     # backends.
 
-     logger.info('Running test_header_matching')
 
-     original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
 
-         gcp, original_backend_service, instance_group,
 
-         alternate_backend_service, same_zone_instance_group)
 
-     try:
 
-         # A list of tuples (route_rules, expected_instances).
 
-         test_cases = [
 
-             (
 
-                 [{
 
-                     'priority': 0,
 
-                     # Header ExactMatch -> alternate_backend_service.
 
-                     # EmptyCall is sent with the metadata.
 
-                     'matchRules': [{
 
-                         'prefixMatch':
 
-                             '/',
 
-                         'headerMatches': [{
 
-                             'headerName': _TEST_METADATA_KEY,
 
-                             'exactMatch': _TEST_METADATA_VALUE_EMPTY
 
-                         }]
 
-                     }],
 
-                     'service': alternate_backend_service.url
 
-                 }],
 
-                 {
 
-                     "EmptyCall": alternate_backend_instances,
 
-                     "UnaryCall": original_backend_instances
 
-                 }),
 
-             (
 
-                 [{
 
-                     'priority': 0,
 
-                     # Header PrefixMatch -> alternate_backend_service.
 
-                     # UnaryCall is sent with the metadata.
 
-                     'matchRules': [{
 
-                         'prefixMatch':
 
-                             '/',
 
-                         'headerMatches': [{
 
-                             'headerName': _TEST_METADATA_KEY,
 
-                             'prefixMatch': _TEST_METADATA_VALUE_UNARY[:2]
 
-                         }]
 
-                     }],
 
-                     'service': alternate_backend_service.url
 
-                 }],
 
-                 {
 
-                     "EmptyCall": original_backend_instances,
 
-                     "UnaryCall": alternate_backend_instances
 
-                 }),
 
-             (
 
-                 [{
 
-                     'priority': 0,
 
-                     # Header SuffixMatch -> alternate_backend_service.
 
-                     # EmptyCall is sent with the metadata.
 
-                     'matchRules': [{
 
-                         'prefixMatch':
 
-                             '/',
 
-                         'headerMatches': [{
 
-                             'headerName': _TEST_METADATA_KEY,
 
-                             'suffixMatch': _TEST_METADATA_VALUE_EMPTY[-2:]
 
-                         }]
 
-                     }],
 
-                     'service': alternate_backend_service.url
 
-                 }],
 
-                 {
 
-                     "EmptyCall": alternate_backend_instances,
 
-                     "UnaryCall": original_backend_instances
 
-                 }),
 
-             (
 
-                 [{
 
-                     'priority': 0,
 
-                     # Header invert ExactMatch -> alternate_backend_service.
 
-                     # EmptyCall is sent with the metadata, so will be sent to original.
 
-                     'matchRules': [{
 
-                         'prefixMatch':
 
-                             '/',
 
-                         'headerMatches': [{
 
-                             'headerName': _TEST_METADATA_KEY,
 
-                             'exactMatch': _TEST_METADATA_VALUE_EMPTY,
 
-                             'invertMatch': True
 
-                         }]
 
-                     }],
 
-                     'service': alternate_backend_service.url
 
-                 }],
 
-                 {
 
-                     "EmptyCall": original_backend_instances,
 
-                     "UnaryCall": alternate_backend_instances
 
-                 }),
 
-         ]
 
-         for (route_rules, expected_instances) in test_cases:
 
-             logger.info('patching url map with %s -> alternative',
 
-                         route_rules[0]['matchRules'])
 
-             patch_url_map_backend_service(gcp,
 
-                                           original_backend_service,
 
-                                           route_rules=route_rules)
 
-             # Wait for traffic to go to both services.
 
-             logger.info(
 
-                 'waiting for traffic to go to all backends (including alternate)'
 
-             )
 
-             wait_until_all_rpcs_go_to_given_backends(
 
-                 original_backend_instances + alternate_backend_instances,
 
-                 _WAIT_FOR_STATS_SEC)
 
-             retry_count = 20
 
-             # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
 
-             # seconds timeout.
 
-             for i in range(retry_count):
 
-                 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
 
-                 if not stats.rpcs_by_method:
 
-                     raise ValueError(
 
-                         'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
 
-                     )
 
-                 logger.info('attempt %d', i)
 
-                 if compare_expected_instances(stats, expected_instances):
 
-                     logger.info("success")
 
-                     break
 
-     finally:
 
-         patch_url_map_backend_service(gcp, original_backend_service)
 
-         patch_backend_service(gcp, alternate_backend_service, [])
 
- def test_circuit_breaking(gcp, original_backend_service, instance_group,
 
-                           same_zone_instance_group):
 
-     '''
 
-     Since backend service circuit_breakers configuration cannot be unset,
 
-     which causes trouble for restoring validate_for_proxy flag in target
 
-     proxy/global forwarding rule. This test uses dedicated backend sevices.
 
-     The url_map and backend services undergoes the following state changes:
 
-     Before test:
 
-        original_backend_service -> [instance_group]
 
-        extra_backend_service -> []
 
-        more_extra_backend_service -> []
 
-        url_map -> [original_backend_service]
 
-     In test:
 
-        extra_backend_service (with circuit_breakers) -> [instance_group]
 
-        more_extra_backend_service (with circuit_breakers) -> [same_zone_instance_group]
 
-        url_map -> [extra_backend_service, more_extra_backend_service]
 
-     After test:
 
-        original_backend_service -> [instance_group]
 
-        extra_backend_service (with circuit_breakers) -> []
 
-        more_extra_backend_service (with circuit_breakers) -> []
 
-        url_map -> [original_backend_service]
 
-     '''
 
-     logger.info('Running test_circuit_breaking')
 
-     additional_backend_services = []
 
-     try:
 
-         # TODO(chengyuanzhang): Dedicated backend services created for circuit
 
-         # breaking test. Once the issue for unsetting backend service circuit
 
-         # breakers is resolved or configuring backend service circuit breakers is
 
-         # enabled for config validation, these dedicated backend services can be
 
-         # eliminated.
 
-         extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix
 
-         more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix
 
-         extra_backend_service = add_backend_service(gcp,
 
-                                                     extra_backend_service_name)
 
-         additional_backend_services.append(extra_backend_service)
 
-         more_extra_backend_service = add_backend_service(
 
-             gcp, more_extra_backend_service_name)
 
-         additional_backend_services.append(more_extra_backend_service)
 
-         # The config validation for proxyless doesn't allow setting
 
-         # circuit_breakers. Disable validate validate_for_proxyless
 
-         # for this test. This can be removed when validation
 
-         # accepts circuit_breakers.
 
-         logger.info('disabling validate_for_proxyless in target proxy')
 
-         set_validate_for_proxyless(gcp, False)
 
-         extra_backend_service_max_requests = 500
 
-         more_extra_backend_service_max_requests = 1000
 
-         patch_backend_service(gcp,
 
-                               extra_backend_service, [instance_group],
 
-                               circuit_breakers={
 
-                                   'maxRequests':
 
-                                       extra_backend_service_max_requests
 
-                               })
 
-         logger.info('Waiting for extra backends to become healthy')
 
-         wait_for_healthy_backends(gcp, extra_backend_service, instance_group)
 
-         patch_backend_service(gcp,
 
-                               more_extra_backend_service,
 
-                               [same_zone_instance_group],
 
-                               circuit_breakers={
 
-                                   'maxRequests':
 
-                                       more_extra_backend_service_max_requests
 
-                               })
 
-         logger.info('Waiting for more extra backend to become healthy')
 
-         wait_for_healthy_backends(gcp, more_extra_backend_service,
 
-                                   same_zone_instance_group)
 
-         extra_backend_instances = get_instance_names(gcp, instance_group)
 
-         more_extra_backend_instances = get_instance_names(
 
-             gcp, same_zone_instance_group)
 
-         route_rules = [
 
-             {
 
-                 'priority': 0,
 
-                 # UnaryCall -> extra_backend_service
 
-                 'matchRules': [{
 
-                     'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
 
-                 }],
 
-                 'service': extra_backend_service.url
 
-             },
 
-             {
 
-                 'priority': 1,
 
-                 # EmptyCall -> more_extra_backend_service
 
-                 'matchRules': [{
 
-                     'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
 
-                 }],
 
-                 'service': more_extra_backend_service.url
 
-             },
 
-         ]
 
-         # Make client send UNARY_CALL and EMPTY_CALL.
 
-         configure_client([
 
-             messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
 
-             messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
 
-         ], [])
 
-         logger.info('Patching url map with %s', route_rules)
 
-         patch_url_map_backend_service(gcp,
 
-                                       extra_backend_service,
 
-                                       route_rules=route_rules)
 
-         logger.info('Waiting for traffic to go to all backends')
 
-         wait_until_all_rpcs_go_to_given_backends(
 
-             extra_backend_instances + more_extra_backend_instances,
 
-             _WAIT_FOR_STATS_SEC)
 
-         # Make all calls keep-open.
 
-         configure_client([
 
-             messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
 
-             messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
 
-         ], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
 
-              'rpc-behavior', 'keep-open'),
 
-             (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
 
-              'rpc-behavior', 'keep-open')])
 
-         wait_until_rpcs_in_flight(
 
-             'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC +
 
-                            int(extra_backend_service_max_requests / args.qps)),
 
-             extra_backend_service_max_requests, 1)
 
-         logger.info('UNARY_CALL reached stable state (%d)',
 
-                     extra_backend_service_max_requests)
 
-         wait_until_rpcs_in_flight(
 
-             'EMPTY_CALL',
 
-             (_WAIT_FOR_BACKEND_SEC +
 
-              int(more_extra_backend_service_max_requests / args.qps)),
 
-             more_extra_backend_service_max_requests, 1)
 
-         logger.info('EMPTY_CALL reached stable state (%d)',
 
-                     more_extra_backend_service_max_requests)
 
-         # Increment circuit breakers max_requests threshold.
 
-         extra_backend_service_max_requests = 800
 
-         patch_backend_service(gcp,
 
-                               extra_backend_service, [instance_group],
 
-                               circuit_breakers={
 
-                                   'maxRequests':
 
-                                       extra_backend_service_max_requests
 
-                               })
 
-         wait_until_rpcs_in_flight(
 
-             'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC +
 
-                            int(extra_backend_service_max_requests / args.qps)),
 
-             extra_backend_service_max_requests, 1)
 
-         logger.info('UNARY_CALL reached stable state after increase (%d)',
 
-                     extra_backend_service_max_requests)
 
-         logger.info('success')
 
-         # Avoid new RPCs being outstanding (some test clients create threads
 
-         # for sending RPCs) after restoring backend services.
 
-         configure_client(
 
-             [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL], [])
 
-     finally:
 
-         patch_url_map_backend_service(gcp, original_backend_service)
 
-         patch_backend_service(gcp, original_backend_service, [instance_group])
 
-         for backend_service in additional_backend_services:
 
-             delete_backend_service(gcp, backend_service)
 
-         set_validate_for_proxyless(gcp, True)
 
- def set_validate_for_proxyless(gcp, validate_for_proxyless):
 
-     if not gcp.alpha_compute:
 
-         logger.debug(
 
-             'Not setting validateForProxy because alpha is not enabled')
 
-         return
 
-     # This function deletes global_forwarding_rule and target_proxy, then
 
-     # recreate target_proxy with validateForProxyless=False. This is necessary
 
-     # because patching target_grpc_proxy isn't supported.
 
-     delete_global_forwarding_rule(gcp)
 
-     delete_target_proxy(gcp)
 
-     create_target_proxy(gcp, gcp.target_proxy.name, validate_for_proxyless)
 
-     create_global_forwarding_rule(gcp, gcp.global_forwarding_rule.name,
 
-                                   [gcp.service_port])
 
- def get_serving_status(instance, service_port):
 
-     with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel:
 
-         health_stub = health_pb2_grpc.HealthStub(channel)
 
-         return health_stub.Check(health_pb2.HealthCheckRequest())
 
- def set_serving_status(instances, service_port, serving):
 
-     logger.info('setting %s serving status to %s', instances, serving)
 
-     for instance in instances:
 
-         with grpc.insecure_channel('%s:%d' %
 
-                                    (instance, service_port)) as channel:
 
-             logger.info('setting %s serving status to %s', instance, serving)
 
-             stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel)
 
-             retry_count = 5
 
-             for i in range(5):
 
-                 if serving:
 
-                     stub.SetServing(empty_pb2.Empty())
 
-                 else:
 
-                     stub.SetNotServing(empty_pb2.Empty())
 
-                 serving_status = get_serving_status(instance, service_port)
 
-                 logger.info('got instance service status %s', serving_status)
 
-                 want_status = health_pb2.HealthCheckResponse.SERVING if serving else health_pb2.HealthCheckResponse.NOT_SERVING
 
-                 if serving_status.status == want_status:
 
-                     break
 
-                 if i == retry_count - 1:
 
-                     raise Exception(
 
-                         'failed to set instance service status after %d retries'
 
-                         % retry_count)
 
- def is_primary_instance_group(gcp, instance_group):
 
-     # Clients may connect to a TD instance in a different region than the
 
-     # client, in which case primary/secondary assignments may not be based on
 
-     # the client's actual locality.
 
-     instance_names = get_instance_names(gcp, instance_group)
 
-     stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
 
-     return all(peer in instance_names for peer in stats.rpcs_by_peer.keys())
 
- def get_startup_script(path_to_server_binary, service_port):
 
-     if path_to_server_binary:
 
-         return "nohup %s --port=%d 1>/dev/null &" % (path_to_server_binary,
 
-                                                      service_port)
 
-     else:
 
-         return """#!/bin/bash
 
- sudo apt update
 
- sudo apt install -y git default-jdk
 
- mkdir java_server
 
- pushd java_server
 
- git clone https://github.com/grpc/grpc-java.git
 
- pushd grpc-java
 
- pushd interop-testing
 
- ../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true
 
- nohup build/install/grpc-interop-testing/bin/xds-test-server \
 
-     --port=%d 1>/dev/null &""" % service_port
 
- def create_instance_template(gcp, name, network, source_image, machine_type,
 
-                              startup_script):
 
-     config = {
 
-         'name': name,
 
-         'properties': {
 
-             'tags': {
 
-                 'items': ['allow-health-checks']
 
-             },
 
-             'machineType': machine_type,
 
-             'serviceAccounts': [{
 
-                 'email': 'default',
 
-                 'scopes': ['https://www.googleapis.com/auth/cloud-platform',]
 
-             }],
 
-             'networkInterfaces': [{
 
-                 'accessConfigs': [{
 
-                     'type': 'ONE_TO_ONE_NAT'
 
-                 }],
 
-                 'network': network
 
-             }],
 
-             'disks': [{
 
-                 'boot': True,
 
-                 'initializeParams': {
 
-                     'sourceImage': source_image
 
-                 }
 
-             }],
 
-             'metadata': {
 
-                 'items': [{
 
-                     'key': 'startup-script',
 
-                     'value': startup_script
 
-                 }]
 
-             }
 
-         }
 
-     }
 
-     logger.debug('Sending GCP request with body=%s', config)
 
-     result = gcp.compute.instanceTemplates().insert(
 
-         project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
 
-     wait_for_global_operation(gcp, result['name'])
 
-     gcp.instance_template = GcpResource(config['name'], result['targetLink'])
 
- def add_instance_group(gcp, zone, name, size):
 
-     config = {
 
-         'name': name,
 
-         'instanceTemplate': gcp.instance_template.url,
 
-         'targetSize': size,
 
-         'namedPorts': [{
 
-             'name': 'grpc',
 
-             'port': gcp.service_port
 
-         }]
 
-     }
 
-     logger.debug('Sending GCP request with body=%s', config)
 
-     result = gcp.compute.instanceGroupManagers().insert(
 
-         project=gcp.project, zone=zone,
 
-         body=config).execute(num_retries=_GCP_API_RETRIES)
 
-     wait_for_zone_operation(gcp, zone, result['name'])
 
-     result = gcp.compute.instanceGroupManagers().get(
 
-         project=gcp.project, zone=zone,
 
-         instanceGroupManager=config['name']).execute(
 
-             num_retries=_GCP_API_RETRIES)
 
-     instance_group = InstanceGroup(config['name'], result['instanceGroup'],
 
-                                    zone)
 
-     gcp.instance_groups.append(instance_group)
 
-     wait_for_instance_group_to_reach_expected_size(gcp, instance_group, size,
 
-                                                    _WAIT_FOR_OPERATION_SEC)
 
-     return instance_group
 
- def create_health_check(gcp, name):
 
-     if gcp.alpha_compute:
 
-         config = {
 
-             'name': name,
 
-             'type': 'GRPC',
 
-             'grpcHealthCheck': {
 
-                 'portSpecification': 'USE_SERVING_PORT'
 
-             }
 
-         }
 
-         compute_to_use = gcp.alpha_compute
 
-     else:
 
-         config = {
 
-             'name': name,
 
-             'type': 'TCP',
 
-             'tcpHealthCheck': {
 
-                 'portName': 'grpc'
 
-             }
 
-         }
 
-         compute_to_use = gcp.compute
 
-     logger.debug('Sending GCP request with body=%s', config)
 
-     result = compute_to_use.healthChecks().insert(
 
-         project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
 
-     wait_for_global_operation(gcp, result['name'])
 
-     gcp.health_check = GcpResource(config['name'], result['targetLink'])
 
- def create_health_check_firewall_rule(gcp, name):
 
-     config = {
 
-         'name': name,
 
-         'direction': 'INGRESS',
 
-         'allowed': [{
 
-             'IPProtocol': 'tcp'
 
-         }],
 
-         'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'],
 
-         'targetTags': ['allow-health-checks'],
 
-     }
 
-     logger.debug('Sending GCP request with body=%s', config)
 
-     result = gcp.compute.firewalls().insert(
 
-         project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
 
-     wait_for_global_operation(gcp, result['name'])
 
-     gcp.health_check_firewall_rule = GcpResource(config['name'],
 
-                                                  result['targetLink'])
 
- def add_backend_service(gcp, name):
 
-     if gcp.alpha_compute:
 
-         protocol = 'GRPC'
 
-         compute_to_use = gcp.alpha_compute
 
-     else:
 
-         protocol = 'HTTP2'
 
-         compute_to_use = gcp.compute
 
-     config = {
 
-         'name': name,
 
-         'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
 
-         'healthChecks': [gcp.health_check.url],
 
-         'portName': 'grpc',
 
-         'protocol': protocol
 
-     }
 
-     logger.debug('Sending GCP request with body=%s', config)
 
-     result = compute_to_use.backendServices().insert(
 
-         project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
 
-     wait_for_global_operation(gcp, result['name'])
 
-     backend_service = GcpResource(config['name'], result['targetLink'])
 
-     gcp.backend_services.append(backend_service)
 
-     return backend_service
 
- def create_url_map(gcp, name, backend_service, host_name):
 
-     config = {
 
-         'name': name,
 
-         'defaultService': backend_service.url,
 
-         'pathMatchers': [{
 
-             'name': _PATH_MATCHER_NAME,
 
-             'defaultService': backend_service.url,
 
-         }],
 
-         'hostRules': [{
 
-             'hosts': [host_name],
 
-             'pathMatcher': _PATH_MATCHER_NAME
 
-         }]
 
-     }
 
-     logger.debug('Sending GCP request with body=%s', config)
 
-     result = gcp.compute.urlMaps().insert(
 
-         project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
 
-     wait_for_global_operation(gcp, result['name'])
 
-     gcp.url_map = GcpResource(config['name'], result['targetLink'])
 
- def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name):
 
-     config = {
 
-         'hostRules': [{
 
-             'hosts': ['%s:%d' % (host_name, gcp.service_port)],
 
-             'pathMatcher': _PATH_MATCHER_NAME
 
-         }]
 
-     }
 
-     logger.debug('Sending GCP request with body=%s', config)
 
-     result = gcp.compute.urlMaps().patch(
 
-         project=gcp.project, urlMap=name,
 
-         body=config).execute(num_retries=_GCP_API_RETRIES)
 
-     wait_for_global_operation(gcp, result['name'])
 
- def create_target_proxy(gcp, name, validate_for_proxyless=True):
 
-     if gcp.alpha_compute:
 
-         config = {
 
-             'name': name,
 
-             'url_map': gcp.url_map.url,
 
-             'validate_for_proxyless': validate_for_proxyless
 
-         }
 
-         logger.debug('Sending GCP request with body=%s', config)
 
-         result = gcp.alpha_compute.targetGrpcProxies().insert(
 
-             project=gcp.project,
 
-             body=config).execute(num_retries=_GCP_API_RETRIES)
 
-     else:
 
-         config = {
 
-             'name': name,
 
-             'url_map': gcp.url_map.url,
 
-         }
 
-         logger.debug('Sending GCP request with body=%s', config)
 
-         result = gcp.compute.targetHttpProxies().insert(
 
-             project=gcp.project,
 
-             body=config).execute(num_retries=_GCP_API_RETRIES)
 
-     wait_for_global_operation(gcp, result['name'])
 
-     gcp.target_proxy = GcpResource(config['name'], result['targetLink'])
 
- def create_global_forwarding_rule(gcp, name, potential_ports):
 
-     if gcp.alpha_compute:
 
-         compute_to_use = gcp.alpha_compute
 
-     else:
 
-         compute_to_use = gcp.compute
 
-     for port in potential_ports:
 
-         try:
 
-             config = {
 
-                 'name': name,
 
-                 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
 
-                 'portRange': str(port),
 
-                 'IPAddress': '0.0.0.0',
 
-                 'network': args.network,
 
-                 'target': gcp.target_proxy.url,
 
-             }
 
-             logger.debug('Sending GCP request with body=%s', config)
 
-             result = compute_to_use.globalForwardingRules().insert(
 
-                 project=gcp.project,
 
-                 body=config).execute(num_retries=_GCP_API_RETRIES)
 
-             wait_for_global_operation(gcp, result['name'])
 
-             gcp.global_forwarding_rule = GcpResource(config['name'],
 
-                                                      result['targetLink'])
 
-             gcp.service_port = port
 
-             return
 
-         except googleapiclient.errors.HttpError as http_error:
 
-             logger.warning(
 
-                 'Got error %s when attempting to create forwarding rule to '
 
-                 '0.0.0.0:%d. Retrying with another port.' % (http_error, port))
 
- def get_health_check(gcp, health_check_name):
 
-     result = gcp.compute.healthChecks().get(
 
-         project=gcp.project, healthCheck=health_check_name).execute()
 
-     gcp.health_check = GcpResource(health_check_name, result['selfLink'])
 
- def get_health_check_firewall_rule(gcp, firewall_name):
 
-     result = gcp.compute.firewalls().get(project=gcp.project,
 
-                                          firewall=firewall_name).execute()
 
-     gcp.health_check_firewall_rule = GcpResource(firewall_name,
 
-                                                  result['selfLink'])
 
- def get_backend_service(gcp, backend_service_name):
 
-     result = gcp.compute.backendServices().get(
 
-         project=gcp.project, backendService=backend_service_name).execute()
 
-     backend_service = GcpResource(backend_service_name, result['selfLink'])
 
-     gcp.backend_services.append(backend_service)
 
-     return backend_service
 
- def get_url_map(gcp, url_map_name):
 
-     result = gcp.compute.urlMaps().get(project=gcp.project,
 
-                                        urlMap=url_map_name).execute()
 
-     gcp.url_map = GcpResource(url_map_name, result['selfLink'])
 
- def get_target_proxy(gcp, target_proxy_name):
 
-     if gcp.alpha_compute:
 
-         result = gcp.alpha_compute.targetGrpcProxies().get(
 
-             project=gcp.project, targetGrpcProxy=target_proxy_name).execute()
 
-     else:
 
-         result = gcp.compute.targetHttpProxies().get(
 
-             project=gcp.project, targetHttpProxy=target_proxy_name).execute()
 
-     gcp.target_proxy = GcpResource(target_proxy_name, result['selfLink'])
 
- def get_global_forwarding_rule(gcp, forwarding_rule_name):
 
-     result = gcp.compute.globalForwardingRules().get(
 
-         project=gcp.project, forwardingRule=forwarding_rule_name).execute()
 
-     gcp.global_forwarding_rule = GcpResource(forwarding_rule_name,
 
-                                              result['selfLink'])
 
- def get_instance_template(gcp, template_name):
 
-     result = gcp.compute.instanceTemplates().get(
 
-         project=gcp.project, instanceTemplate=template_name).execute()
 
-     gcp.instance_template = GcpResource(template_name, result['selfLink'])
 
- def get_instance_group(gcp, zone, instance_group_name):
 
-     result = gcp.compute.instanceGroups().get(
 
-         project=gcp.project, zone=zone,
 
-         instanceGroup=instance_group_name).execute()
 
-     gcp.service_port = result['namedPorts'][0]['port']
 
-     instance_group = InstanceGroup(instance_group_name, result['selfLink'],
 
-                                    zone)
 
-     gcp.instance_groups.append(instance_group)
 
-     return instance_group
 
- def delete_global_forwarding_rule(gcp):
 
-     try:
 
-         result = gcp.compute.globalForwardingRules().delete(
 
-             project=gcp.project,
 
-             forwardingRule=gcp.global_forwarding_rule.name).execute(
 
-                 num_retries=_GCP_API_RETRIES)
 
-         wait_for_global_operation(gcp, result['name'])
 
-     except googleapiclient.errors.HttpError as http_error:
 
-         logger.info('Delete failed: %s', http_error)
 
- def delete_target_proxy(gcp):
 
-     try:
 
-         if gcp.alpha_compute:
 
-             result = gcp.alpha_compute.targetGrpcProxies().delete(
 
-                 project=gcp.project,
 
-                 targetGrpcProxy=gcp.target_proxy.name).execute(
 
-                     num_retries=_GCP_API_RETRIES)
 
-         else:
 
-             result = gcp.compute.targetHttpProxies().delete(
 
-                 project=gcp.project,
 
-                 targetHttpProxy=gcp.target_proxy.name).execute(
 
-                     num_retries=_GCP_API_RETRIES)
 
-         wait_for_global_operation(gcp, result['name'])
 
-     except googleapiclient.errors.HttpError as http_error:
 
-         logger.info('Delete failed: %s', http_error)
 
- def delete_url_map(gcp):
 
-     try:
 
-         result = gcp.compute.urlMaps().delete(
 
-             project=gcp.project,
 
-             urlMap=gcp.url_map.name).execute(num_retries=_GCP_API_RETRIES)
 
-         wait_for_global_operation(gcp, result['name'])
 
-     except googleapiclient.errors.HttpError as http_error:
 
-         logger.info('Delete failed: %s', http_error)
 
- def delete_backend_service(gcp, backend_service):
 
-     try:
 
-         result = gcp.compute.backendServices().delete(
 
-             project=gcp.project, backendService=backend_service.name).execute(
 
-                 num_retries=_GCP_API_RETRIES)
 
-         wait_for_global_operation(gcp, result['name'])
 
-     except googleapiclient.errors.HttpError as http_error:
 
-         logger.info('Delete failed: %s', http_error)
 
- def delete_backend_services(gcp):
 
-     for backend_service in gcp.backend_services:
 
-         delete_backend_service(gcp, backend_service)
 
- def delete_firewall(gcp):
 
-     try:
 
-         result = gcp.compute.firewalls().delete(
 
-             project=gcp.project,
 
-             firewall=gcp.health_check_firewall_rule.name).execute(
 
-                 num_retries=_GCP_API_RETRIES)
 
-         wait_for_global_operation(gcp, result['name'])
 
-     except googleapiclient.errors.HttpError as http_error:
 
-         logger.info('Delete failed: %s', http_error)
 
- def delete_health_check(gcp):
 
-     try:
 
-         result = gcp.compute.healthChecks().delete(
 
-             project=gcp.project, healthCheck=gcp.health_check.name).execute(
 
-                 num_retries=_GCP_API_RETRIES)
 
-         wait_for_global_operation(gcp, result['name'])
 
-     except googleapiclient.errors.HttpError as http_error:
 
-         logger.info('Delete failed: %s', http_error)
 
- def delete_instance_groups(gcp):
 
-     for instance_group in gcp.instance_groups:
 
-         try:
 
-             result = gcp.compute.instanceGroupManagers().delete(
 
-                 project=gcp.project,
 
-                 zone=instance_group.zone,
 
-                 instanceGroupManager=instance_group.name).execute(
 
-                     num_retries=_GCP_API_RETRIES)
 
-             wait_for_zone_operation(gcp,
 
-                                     instance_group.zone,
 
-                                     result['name'],
 
-                                     timeout_sec=_WAIT_FOR_BACKEND_SEC)
 
-         except googleapiclient.errors.HttpError as http_error:
 
-             logger.info('Delete failed: %s', http_error)
 
- def delete_instance_template(gcp):
 
-     try:
 
-         result = gcp.compute.instanceTemplates().delete(
 
-             project=gcp.project,
 
-             instanceTemplate=gcp.instance_template.name).execute(
 
-                 num_retries=_GCP_API_RETRIES)
 
-         wait_for_global_operation(gcp, result['name'])
 
-     except googleapiclient.errors.HttpError as http_error:
 
-         logger.info('Delete failed: %s', http_error)
 
- def patch_backend_service(gcp,
 
-                           backend_service,
 
-                           instance_groups,
 
-                           balancing_mode='UTILIZATION',
 
-                           circuit_breakers=None):
 
-     if gcp.alpha_compute:
 
-         compute_to_use = gcp.alpha_compute
 
-     else:
 
-         compute_to_use = gcp.compute
 
-     config = {
 
-         'backends': [{
 
-             'group': instance_group.url,
 
-             'balancingMode': balancing_mode,
 
-             'maxRate': 1 if balancing_mode == 'RATE' else None
 
-         } for instance_group in instance_groups],
 
-         'circuitBreakers': circuit_breakers,
 
-     }
 
-     logger.debug('Sending GCP request with body=%s', config)
 
-     result = compute_to_use.backendServices().patch(
 
-         project=gcp.project, backendService=backend_service.name,
 
-         body=config).execute(num_retries=_GCP_API_RETRIES)
 
-     wait_for_global_operation(gcp,
 
-                               result['name'],
 
-                               timeout_sec=_WAIT_FOR_BACKEND_SEC)
 
- def resize_instance_group(gcp,
 
-                           instance_group,
 
-                           new_size,
 
-                           timeout_sec=_WAIT_FOR_OPERATION_SEC):
 
-     result = gcp.compute.instanceGroupManagers().resize(
 
-         project=gcp.project,
 
-         zone=instance_group.zone,
 
-         instanceGroupManager=instance_group.name,
 
-         size=new_size).execute(num_retries=_GCP_API_RETRIES)
 
-     wait_for_zone_operation(gcp,
 
-                             instance_group.zone,
 
-                             result['name'],
 
-                             timeout_sec=360)
 
-     wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
 
-                                                    new_size, timeout_sec)
 
- def patch_url_map_backend_service(gcp,
 
-                                   backend_service=None,
 
-                                   services_with_weights=None,
 
-                                   route_rules=None):
 
-     '''change url_map's backend service
 
-     Only one of backend_service and service_with_weights can be not None.
 
-     '''
 
-     if backend_service and services_with_weights:
 
-         raise ValueError(
 
-             'both backend_service and service_with_weights are not None.')
 
-     default_service = backend_service.url if backend_service else None
 
-     default_route_action = {
 
-         'weightedBackendServices': [{
 
-             'backendService': service.url,
 
-             'weight': w,
 
-         } for service, w in services_with_weights.items()]
 
-     } if services_with_weights else None
 
-     config = {
 
-         'pathMatchers': [{
 
-             'name': _PATH_MATCHER_NAME,
 
-             'defaultService': default_service,
 
-             'defaultRouteAction': default_route_action,
 
-             'routeRules': route_rules,
 
-         }]
 
-     }
 
-     logger.debug('Sending GCP request with body=%s', config)
 
-     result = gcp.compute.urlMaps().patch(
 
-         project=gcp.project, urlMap=gcp.url_map.name,
 
-         body=config).execute(num_retries=_GCP_API_RETRIES)
 
-     wait_for_global_operation(gcp, result['name'])
 
- def wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
 
-                                                    expected_size, timeout_sec):
 
-     start_time = time.time()
 
-     while True:
 
-         current_size = len(get_instance_names(gcp, instance_group))
 
-         if current_size == expected_size:
 
-             break
 
-         if time.time() - start_time > timeout_sec:
 
-             raise Exception(
 
-                 'Instance group had expected size %d but actual size %d' %
 
-                 (expected_size, current_size))
 
-         time.sleep(2)
 
- def wait_for_global_operation(gcp,
 
-                               operation,
 
-                               timeout_sec=_WAIT_FOR_OPERATION_SEC):
 
-     start_time = time.time()
 
-     while time.time() - start_time <= timeout_sec:
 
-         result = gcp.compute.globalOperations().get(
 
-             project=gcp.project,
 
-             operation=operation).execute(num_retries=_GCP_API_RETRIES)
 
-         if result['status'] == 'DONE':
 
-             if 'error' in result:
 
-                 raise Exception(result['error'])
 
-             return
 
-         time.sleep(2)
 
-     raise Exception('Operation %s did not complete within %d' %
 
-                     (operation, timeout_sec))
 
- def wait_for_zone_operation(gcp,
 
-                             zone,
 
-                             operation,
 
-                             timeout_sec=_WAIT_FOR_OPERATION_SEC):
 
-     start_time = time.time()
 
-     while time.time() - start_time <= timeout_sec:
 
-         result = gcp.compute.zoneOperations().get(
 
-             project=gcp.project, zone=zone,
 
-             operation=operation).execute(num_retries=_GCP_API_RETRIES)
 
-         if result['status'] == 'DONE':
 
-             if 'error' in result:
 
-                 raise Exception(result['error'])
 
-             return
 
-         time.sleep(2)
 
-     raise Exception('Operation %s did not complete within %d' %
 
-                     (operation, timeout_sec))
 
- def wait_for_healthy_backends(gcp,
 
-                               backend_service,
 
-                               instance_group,
 
-                               timeout_sec=_WAIT_FOR_BACKEND_SEC):
 
-     start_time = time.time()
 
-     config = {'group': instance_group.url}
 
-     instance_names = get_instance_names(gcp, instance_group)
 
-     expected_size = len(instance_names)
 
-     while time.time() - start_time <= timeout_sec:
 
-         for instance_name in instance_names:
 
-             try:
 
-                 status = get_serving_status(instance_name, gcp.service_port)
 
-                 logger.info('serving status response from %s: %s',
 
-                             instance_name, status)
 
-             except grpc.RpcError as rpc_error:
 
-                 logger.info('checking serving status of %s failed: %s',
 
-                             instance_name, rpc_error)
 
-         result = gcp.compute.backendServices().getHealth(
 
-             project=gcp.project,
 
-             backendService=backend_service.name,
 
-             body=config).execute(num_retries=_GCP_API_RETRIES)
 
-         if 'healthStatus' in result:
 
-             logger.info('received GCP healthStatus: %s', result['healthStatus'])
 
-             healthy = True
 
-             for instance in result['healthStatus']:
 
-                 if instance['healthState'] != 'HEALTHY':
 
-                     healthy = False
 
-                     break
 
-             if healthy and expected_size == len(result['healthStatus']):
 
-                 return
 
-         else:
 
-             logger.info('no healthStatus received from GCP')
 
-         time.sleep(5)
 
-     raise Exception('Not all backends became healthy within %d seconds: %s' %
 
-                     (timeout_sec, result))
 
- def get_instance_names(gcp, instance_group):
 
-     instance_names = []
 
-     result = gcp.compute.instanceGroups().listInstances(
 
-         project=gcp.project,
 
-         zone=instance_group.zone,
 
-         instanceGroup=instance_group.name,
 
-         body={
 
-             'instanceState': 'ALL'
 
-         }).execute(num_retries=_GCP_API_RETRIES)
 
-     if 'items' not in result:
 
-         return []
 
-     for item in result['items']:
 
-         # listInstances() returns the full URL of the instance, which ends with
 
-         # the instance name. compute.instances().get() requires using the
 
-         # instance name (not the full URL) to look up instance details, so we
 
-         # just extract the name manually.
 
-         instance_name = item['instance'].split('/')[-1]
 
-         instance_names.append(instance_name)
 
-     logger.info('retrieved instance names: %s', instance_names)
 
-     return instance_names
 
- def clean_up(gcp):
 
-     if gcp.global_forwarding_rule:
 
-         delete_global_forwarding_rule(gcp)
 
-     if gcp.target_proxy:
 
-         delete_target_proxy(gcp)
 
-     if gcp.url_map:
 
-         delete_url_map(gcp)
 
-     delete_backend_services(gcp)
 
-     if gcp.health_check_firewall_rule:
 
-         delete_firewall(gcp)
 
-     if gcp.health_check:
 
-         delete_health_check(gcp)
 
-     delete_instance_groups(gcp)
 
-     if gcp.instance_template:
 
-         delete_instance_template(gcp)
 
- class InstanceGroup(object):
 
-     def __init__(self, name, url, zone):
 
-         self.name = name
 
-         self.url = url
 
-         self.zone = zone
 
- class GcpResource(object):
 
-     def __init__(self, name, url):
 
-         self.name = name
 
-         self.url = url
 
- class GcpState(object):
 
-     def __init__(self, compute, alpha_compute, project):
 
-         self.compute = compute
 
-         self.alpha_compute = alpha_compute
 
-         self.project = project
 
-         self.health_check = None
 
-         self.health_check_firewall_rule = None
 
-         self.backend_services = []
 
-         self.url_map = None
 
-         self.target_proxy = None
 
-         self.global_forwarding_rule = None
 
-         self.service_port = None
 
-         self.instance_template = None
 
-         self.instance_groups = []
 
- alpha_compute = None
 
- if args.compute_discovery_document:
 
-     with open(args.compute_discovery_document, 'r') as discovery_doc:
 
-         compute = googleapiclient.discovery.build_from_document(
 
-             discovery_doc.read())
 
-     if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document:
 
-         with open(args.alpha_compute_discovery_document, 'r') as discovery_doc:
 
-             alpha_compute = googleapiclient.discovery.build_from_document(
 
-                 discovery_doc.read())
 
- else:
 
-     compute = googleapiclient.discovery.build('compute', 'v1')
 
-     if not args.only_stable_gcp_apis:
 
-         alpha_compute = googleapiclient.discovery.build('compute', 'alpha')
 
- try:
 
-     gcp = GcpState(compute, alpha_compute, args.project_id)
 
-     gcp_suffix = args.gcp_suffix
 
-     health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
 
-     if not args.use_existing_gcp_resources:
 
-         if args.keep_gcp_resources:
 
-             # Auto-generating a unique suffix in case of conflict should not be
 
-             # combined with --keep_gcp_resources, as the suffix actually used
 
-             # for GCP resources will not match the provided --gcp_suffix value.
 
-             num_attempts = 1
 
-         else:
 
-             num_attempts = 5
 
-         for i in range(num_attempts):
 
-             try:
 
-                 logger.info('Using GCP suffix %s', gcp_suffix)
 
-                 create_health_check(gcp, health_check_name)
 
-                 break
 
-             except googleapiclient.errors.HttpError as http_error:
 
-                 gcp_suffix = '%s-%04d' % (gcp_suffix, random.randint(0, 9999))
 
-                 health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
 
-                 logger.exception('HttpError when creating health check')
 
-         if gcp.health_check is None:
 
-             raise Exception('Failed to create health check name after %d '
 
-                             'attempts' % num_attempts)
 
-     firewall_name = _BASE_FIREWALL_RULE_NAME + gcp_suffix
 
-     backend_service_name = _BASE_BACKEND_SERVICE_NAME + gcp_suffix
 
-     alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + gcp_suffix
 
-     url_map_name = _BASE_URL_MAP_NAME + gcp_suffix
 
-     service_host_name = _BASE_SERVICE_HOST + gcp_suffix
 
-     target_proxy_name = _BASE_TARGET_PROXY_NAME + gcp_suffix
 
-     forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + gcp_suffix
 
-     template_name = _BASE_TEMPLATE_NAME + gcp_suffix
 
-     instance_group_name = _BASE_INSTANCE_GROUP_NAME + gcp_suffix
 
-     same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + gcp_suffix
 
-     secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + gcp_suffix
 
-     if args.use_existing_gcp_resources:
 
-         logger.info('Reusing existing GCP resources')
 
-         get_health_check(gcp, health_check_name)
 
-         try:
 
-             get_health_check_firewall_rule(gcp, firewall_name)
 
-         except googleapiclient.errors.HttpError as http_error:
 
-             # Firewall rule may be auto-deleted periodically depending on GCP
 
-             # project settings.
 
-             logger.exception('Failed to find firewall rule, recreating')
 
-             create_health_check_firewall_rule(gcp, firewall_name)
 
-         backend_service = get_backend_service(gcp, backend_service_name)
 
-         alternate_backend_service = get_backend_service(
 
-             gcp, alternate_backend_service_name)
 
-         get_url_map(gcp, url_map_name)
 
-         get_target_proxy(gcp, target_proxy_name)
 
-         get_global_forwarding_rule(gcp, forwarding_rule_name)
 
-         get_instance_template(gcp, template_name)
 
-         instance_group = get_instance_group(gcp, args.zone, instance_group_name)
 
-         same_zone_instance_group = get_instance_group(
 
-             gcp, args.zone, same_zone_instance_group_name)
 
-         secondary_zone_instance_group = get_instance_group(
 
-             gcp, args.secondary_zone, secondary_zone_instance_group_name)
 
-     else:
 
-         create_health_check_firewall_rule(gcp, firewall_name)
 
-         backend_service = add_backend_service(gcp, backend_service_name)
 
-         alternate_backend_service = add_backend_service(
 
-             gcp, alternate_backend_service_name)
 
-         create_url_map(gcp, url_map_name, backend_service, service_host_name)
 
-         create_target_proxy(gcp, target_proxy_name)
 
-         potential_service_ports = list(args.service_port_range)
 
-         random.shuffle(potential_service_ports)
 
-         create_global_forwarding_rule(gcp, forwarding_rule_name,
 
-                                       potential_service_ports)
 
-         if not gcp.service_port:
 
-             raise Exception(
 
-                 'Failed to find a valid ip:port for the forwarding rule')
 
-         if gcp.service_port != _DEFAULT_SERVICE_PORT:
 
-             patch_url_map_host_rule_with_port(gcp, url_map_name,
 
-                                               backend_service,
 
-                                               service_host_name)
 
-         startup_script = get_startup_script(args.path_to_server_binary,
 
-                                             gcp.service_port)
 
-         create_instance_template(gcp, template_name, args.network,
 
-                                  args.source_image, args.machine_type,
 
-                                  startup_script)
 
-         instance_group = add_instance_group(gcp, args.zone, instance_group_name,
 
-                                             _INSTANCE_GROUP_SIZE)
 
-         patch_backend_service(gcp, backend_service, [instance_group])
 
-         same_zone_instance_group = add_instance_group(
 
-             gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE)
 
-         secondary_zone_instance_group = add_instance_group(
 
-             gcp, args.secondary_zone, secondary_zone_instance_group_name,
 
-             _INSTANCE_GROUP_SIZE)
 
-     wait_for_healthy_backends(gcp, backend_service, instance_group)
 
-     if args.test_case:
 
-         client_env = dict(os.environ)
 
-         bootstrap_server_features = []
 
-         if gcp.service_port == _DEFAULT_SERVICE_PORT:
 
-             server_uri = service_host_name
 
-         else:
 
-             server_uri = service_host_name + ':' + str(gcp.service_port)
 
-         if args.xds_v3_support:
 
-             client_env['GRPC_XDS_EXPERIMENTAL_V3_SUPPORT'] = 'true'
 
-             bootstrap_server_features.append('xds_v3')
 
-         if args.bootstrap_file:
 
-             bootstrap_path = os.path.abspath(args.bootstrap_file)
 
-         else:
 
-             with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file:
 
-                 bootstrap_file.write(
 
-                     _BOOTSTRAP_TEMPLATE.format(
 
-                         node_id=socket.gethostname(),
 
-                         server_features=json.dumps(
 
-                             bootstrap_server_features)).encode('utf-8'))
 
-                 bootstrap_path = bootstrap_file.name
 
-         client_env['GRPC_XDS_BOOTSTRAP'] = bootstrap_path
 
-         client_env['GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING'] = 'true'
 
-         test_results = {}
 
-         failed_tests = []
 
-         for test_case in args.test_case:
 
-             result = jobset.JobResult()
 
-             log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case)
 
-             if not os.path.exists(log_dir):
 
-                 os.makedirs(log_dir)
 
-             test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME)
 
-             test_log_file = open(test_log_filename, 'w+')
 
-             client_process = None
 
-             if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS:
 
-                 rpcs_to_send = '--rpc="UnaryCall,EmptyCall"'
 
-             else:
 
-                 rpcs_to_send = '--rpc="UnaryCall"'
 
-             if test_case in _TESTS_TO_SEND_METADATA:
 
-                 metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU}"'.format(
 
-                     keyE=_TEST_METADATA_KEY,
 
-                     valueE=_TEST_METADATA_VALUE_EMPTY,
 
-                     keyU=_TEST_METADATA_KEY,
 
-                     valueU=_TEST_METADATA_VALUE_UNARY)
 
-             else:
 
-                 # Setting the arg explicitly to empty with '--metadata=""'
 
-                 # makes C# client fail
 
-                 # (see https://github.com/commandlineparser/commandline/issues/412),
 
-                 # so instead we just rely on clients using the default when
 
-                 # metadata arg is not specified.
 
-                 metadata_to_send = ''
 
-             # TODO(ericgribkoff) Temporarily disable fail_on_failed_rpc checks
 
-             # in the client. This means we will ignore intermittent RPC
 
-             # failures (but this framework still checks that the final result
 
-             # is as expected).
 
-             #
 
-             # Reason for disabling this is, the resources are shared by
 
-             # multiple tests, and a change in previous test could be delayed
 
-             # until the second test starts. The second test may see
 
-             # intermittent failures because of that.
 
-             #
 
-             # A fix is to not share resources between tests (though that does
 
-             # mean the tests will be significantly slower due to creating new
 
-             # resources).
 
-             fail_on_failed_rpc = ''
 
-             try:
 
-                 if not CLIENT_HOSTS:
 
-                     client_cmd_formatted = args.client_cmd.format(
 
-                         server_uri=server_uri,
 
-                         stats_port=args.stats_port,
 
-                         qps=args.qps,
 
-                         fail_on_failed_rpc=fail_on_failed_rpc,
 
-                         rpcs_to_send=rpcs_to_send,
 
-                         metadata_to_send=metadata_to_send)
 
-                     logger.debug('running client: %s', client_cmd_formatted)
 
-                     client_cmd = shlex.split(client_cmd_formatted)
 
-                     client_process = subprocess.Popen(client_cmd,
 
-                                                       env=client_env,
 
-                                                       stderr=subprocess.STDOUT,
 
-                                                       stdout=test_log_file)
 
-                 if test_case == 'backends_restart':
 
-                     test_backends_restart(gcp, backend_service, instance_group)
 
-                 elif test_case == 'change_backend_service':
 
-                     test_change_backend_service(gcp, backend_service,
 
-                                                 instance_group,
 
-                                                 alternate_backend_service,
 
-                                                 same_zone_instance_group)
 
-                 elif test_case == 'gentle_failover':
 
-                     test_gentle_failover(gcp, backend_service, instance_group,
 
-                                          secondary_zone_instance_group)
 
-                 elif test_case == 'ping_pong':
 
-                     test_ping_pong(gcp, backend_service, instance_group)
 
-                 elif test_case == 'remove_instance_group':
 
-                     test_remove_instance_group(gcp, backend_service,
 
-                                                instance_group,
 
-                                                same_zone_instance_group)
 
-                 elif test_case == 'round_robin':
 
-                     test_round_robin(gcp, backend_service, instance_group)
 
-                 elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure':
 
-                     test_secondary_locality_gets_no_requests_on_partial_primary_failure(
 
-                         gcp, backend_service, instance_group,
 
-                         secondary_zone_instance_group)
 
-                 elif test_case == 'secondary_locality_gets_requests_on_primary_failure':
 
-                     test_secondary_locality_gets_requests_on_primary_failure(
 
-                         gcp, backend_service, instance_group,
 
-                         secondary_zone_instance_group)
 
-                 elif test_case == 'traffic_splitting':
 
-                     test_traffic_splitting(gcp, backend_service, instance_group,
 
-                                            alternate_backend_service,
 
-                                            same_zone_instance_group)
 
-                 elif test_case == 'path_matching':
 
-                     test_path_matching(gcp, backend_service, instance_group,
 
-                                        alternate_backend_service,
 
-                                        same_zone_instance_group)
 
-                 elif test_case == 'header_matching':
 
-                     test_header_matching(gcp, backend_service, instance_group,
 
-                                          alternate_backend_service,
 
-                                          same_zone_instance_group)
 
-                 elif test_case == 'circuit_breaking':
 
-                     test_circuit_breaking(gcp, backend_service, instance_group,
 
-                                           same_zone_instance_group)
 
-                 else:
 
-                     logger.error('Unknown test case: %s', test_case)
 
-                     sys.exit(1)
 
-                 if client_process and client_process.poll() is not None:
 
-                     raise Exception(
 
-                         'Client process exited prematurely with exit code %d' %
 
-                         client_process.returncode)
 
-                 result.state = 'PASSED'
 
-                 result.returncode = 0
 
-             except Exception as e:
 
-                 logger.exception('Test case %s failed', test_case)
 
-                 failed_tests.append(test_case)
 
-                 result.state = 'FAILED'
 
-                 result.message = str(e)
 
-             finally:
 
-                 if client_process:
 
-                     if client_process.returncode:
 
-                         logger.info('Client exited with code %d' %
 
-                                     client_process.returncode)
 
-                     else:
 
-                         client_process.terminate()
 
-                 test_log_file.close()
 
-                 # Workaround for Python 3, as report_utils will invoke decode() on
 
-                 # result.message, which has a default value of ''.
 
-                 result.message = result.message.encode('UTF-8')
 
-                 xds_version = 'v3' if args.xds_v3_support else 'v2'
 
-                 test_results[test_case + '_' + xds_version] = [result]
 
-                 if args.log_client_output:
 
-                     logger.info('Client output:')
 
-                     with open(test_log_filename, 'r') as client_output:
 
-                         logger.info(client_output.read())
 
-         if not os.path.exists(_TEST_LOG_BASE_DIR):
 
-             os.makedirs(_TEST_LOG_BASE_DIR)
 
-         report_utils.render_junit_xml_report(test_results,
 
-                                              os.path.join(
 
-                                                  _TEST_LOG_BASE_DIR,
 
-                                                  _SPONGE_XML_NAME),
 
-                                              suite_name='xds_tests',
 
-                                              multi_target=True)
 
-         if failed_tests:
 
-             logger.error('Test case(s) %s failed', failed_tests)
 
-             sys.exit(1)
 
- finally:
 
-     if not args.keep_gcp_resources:
 
-         logger.info('Cleaning up GCP resources. This may take some time.')
 
-         clean_up(gcp)
 
 
  |