123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- # Copyright 2020 gRPC authors.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import functools
- import json
- import logging
- import subprocess
- import time
- from typing import Optional, List, Tuple
- # TODO(sergiitk): replace with tenacity
- import retrying
- import kubernetes.config
- from kubernetes import client
- from kubernetes import utils
- logger = logging.getLogger(__name__)
- # Type aliases
- V1Deployment = client.V1Deployment
- V1ServiceAccount = client.V1ServiceAccount
- V1Pod = client.V1Pod
- V1PodList = client.V1PodList
- V1Service = client.V1Service
- V1Namespace = client.V1Namespace
- ApiException = client.ApiException
- def simple_resource_get(func):
- def wrap_not_found_return_none(*args, **kwargs):
- try:
- return func(*args, **kwargs)
- except client.ApiException as e:
- if e.status == 404:
- # Ignore 404
- return None
- raise
- return wrap_not_found_return_none
- def label_dict_to_selector(labels: dict) -> str:
- return ','.join(f'{k}=={v}' for k, v in labels.items())
- class KubernetesApiManager:
- def __init__(self, context):
- self.context = context
- self.client = self._cached_api_client_for_context(context)
- self.apps = client.AppsV1Api(self.client)
- self.core = client.CoreV1Api(self.client)
- def close(self):
- self.client.close()
- @classmethod
- @functools.lru_cache(None)
- def _cached_api_client_for_context(cls, context: str) -> client.ApiClient:
- return kubernetes.config.new_client_from_config(context=context)
- class PortForwardingError(Exception):
- """Error forwarding port"""
- class KubernetesNamespace:
- NEG_STATUS_META = 'cloud.google.com/neg-status'
- PORT_FORWARD_LOCAL_ADDRESS: str = '127.0.0.1'
- DELETE_GRACE_PERIOD_SEC: int = 5
- def __init__(self, api: KubernetesApiManager, name: str):
- self.name = name
- self.api = api
- def apply_manifest(self, manifest):
- return utils.create_from_dict(self.api.client,
- manifest,
- namespace=self.name)
- @simple_resource_get
- def get_service(self, name) -> V1Service:
- return self.api.core.read_namespaced_service(name, self.name)
- @simple_resource_get
- def get_service_account(self, name) -> V1Service:
- return self.api.core.read_namespaced_service_account(name, self.name)
- def delete_service(self, name,
- grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
- self.api.core.delete_namespaced_service(
- name=name,
- namespace=self.name,
- body=client.V1DeleteOptions(
- propagation_policy='Foreground',
- grace_period_seconds=grace_period_seconds))
- def delete_service_account(self,
- name,
- grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
- self.api.core.delete_namespaced_service_account(
- name=name,
- namespace=self.name,
- body=client.V1DeleteOptions(
- propagation_policy='Foreground',
- grace_period_seconds=grace_period_seconds))
- @simple_resource_get
- def get(self) -> V1Namespace:
- return self.api.core.read_namespace(self.name)
- def delete(self, grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
- self.api.core.delete_namespace(
- name=self.name,
- body=client.V1DeleteOptions(
- propagation_policy='Foreground',
- grace_period_seconds=grace_period_seconds))
- def wait_for_service_deleted(self, name: str, timeout_sec=60, wait_sec=1):
- @retrying.retry(retry_on_result=lambda r: r is not None,
- stop_max_delay=timeout_sec * 1000,
- wait_fixed=wait_sec * 1000)
- def _wait_for_deleted_service_with_retry():
- service = self.get_service(name)
- if service is not None:
- logger.info('Waiting for service %s to be deleted',
- service.metadata.name)
- return service
- _wait_for_deleted_service_with_retry()
- def wait_for_service_account_deleted(self,
- name: str,
- timeout_sec=60,
- wait_sec=1):
- @retrying.retry(retry_on_result=lambda r: r is not None,
- stop_max_delay=timeout_sec * 1000,
- wait_fixed=wait_sec * 1000)
- def _wait_for_deleted_service_account_with_retry():
- service_account = self.get_service_account(name)
- if service_account is not None:
- logger.info('Waiting for service account %s to be deleted',
- service_account.metadata.name)
- return service_account
- _wait_for_deleted_service_account_with_retry()
- def wait_for_namespace_deleted(self, timeout_sec=240, wait_sec=2):
- @retrying.retry(retry_on_result=lambda r: r is not None,
- stop_max_delay=timeout_sec * 1000,
- wait_fixed=wait_sec * 1000)
- def _wait_for_deleted_namespace_with_retry():
- namespace = self.get()
- if namespace is not None:
- logger.info('Waiting for namespace %s to be deleted',
- namespace.metadata.name)
- return namespace
- _wait_for_deleted_namespace_with_retry()
- def wait_for_service_neg(self, name: str, timeout_sec=60, wait_sec=1):
- @retrying.retry(retry_on_result=lambda r: not r,
- stop_max_delay=timeout_sec * 1000,
- wait_fixed=wait_sec * 1000)
- def _wait_for_service_neg():
- service = self.get_service(name)
- if self.NEG_STATUS_META not in service.metadata.annotations:
- logger.info('Waiting for service %s NEG', service.metadata.name)
- return False
- return True
- _wait_for_service_neg()
- def get_service_neg(self, service_name: str,
- service_port: int) -> Tuple[str, List[str]]:
- service = self.get_service(service_name)
- neg_info: dict = json.loads(
- service.metadata.annotations[self.NEG_STATUS_META])
- neg_name: str = neg_info['network_endpoint_groups'][str(service_port)]
- neg_zones: List[str] = neg_info['zones']
- return neg_name, neg_zones
- @simple_resource_get
- def get_deployment(self, name) -> V1Deployment:
- return self.api.apps.read_namespaced_deployment(name, self.name)
- def delete_deployment(self,
- name,
- grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
- self.api.apps.delete_namespaced_deployment(
- name=name,
- namespace=self.name,
- body=client.V1DeleteOptions(
- propagation_policy='Foreground',
- grace_period_seconds=grace_period_seconds))
- def list_deployment_pods(self, deployment: V1Deployment) -> List[V1Pod]:
- # V1LabelSelector.match_expressions not supported at the moment
- return self.list_pods_with_labels(deployment.spec.selector.match_labels)
- def wait_for_deployment_available_replicas(self,
- name,
- count=1,
- timeout_sec=60,
- wait_sec=1):
- @retrying.retry(
- retry_on_result=lambda r: not self._replicas_available(r, count),
- stop_max_delay=timeout_sec * 1000,
- wait_fixed=wait_sec * 1000)
- def _wait_for_deployment_available_replicas():
- deployment = self.get_deployment(name)
- logger.info(
- 'Waiting for deployment %s to have %s available '
- 'replicas, current count %s', deployment.metadata.name, count,
- deployment.status.available_replicas)
- return deployment
- _wait_for_deployment_available_replicas()
- def wait_for_deployment_deleted(self,
- deployment_name: str,
- timeout_sec=60,
- wait_sec=1):
- @retrying.retry(retry_on_result=lambda r: r is not None,
- stop_max_delay=timeout_sec * 1000,
- wait_fixed=wait_sec * 1000)
- def _wait_for_deleted_deployment_with_retry():
- deployment = self.get_deployment(deployment_name)
- if deployment is not None:
- logger.info(
- 'Waiting for deployment %s to be deleted. '
- 'Non-terminated replicas: %s', deployment.metadata.name,
- deployment.status.replicas)
- return deployment
- _wait_for_deleted_deployment_with_retry()
- def list_pods_with_labels(self, labels: dict) -> List[V1Pod]:
- pod_list: V1PodList = self.api.core.list_namespaced_pod(
- self.name, label_selector=label_dict_to_selector(labels))
- return pod_list.items
- def get_pod(self, name) -> client.V1Pod:
- return self.api.core.read_namespaced_pod(name, self.name)
- def wait_for_pod_started(self, pod_name, timeout_sec=60, wait_sec=1):
- @retrying.retry(retry_on_result=lambda r: not self._pod_started(r),
- stop_max_delay=timeout_sec * 1000,
- wait_fixed=wait_sec * 1000)
- def _wait_for_pod_started():
- pod = self.get_pod(pod_name)
- logger.info('Waiting for pod %s to start, current phase: %s',
- pod.metadata.name, pod.status.phase)
- return pod
- _wait_for_pod_started()
- def port_forward_pod(
- self,
- pod: V1Pod,
- remote_port: int,
- local_port: Optional[int] = None,
- local_address: Optional[str] = None,
- ) -> subprocess.Popen:
- """Experimental"""
- local_address = local_address or self.PORT_FORWARD_LOCAL_ADDRESS
- local_port = local_port or remote_port
- cmd = [
- "kubectl", "--context", self.api.context, "--namespace", self.name,
- "port-forward", "--address", local_address,
- f"pod/{pod.metadata.name}", f"{local_port}:{remote_port}"
- ]
- pf = subprocess.Popen(cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- universal_newlines=True)
- # Wait for stdout line indicating successful start.
- expected = (f"Forwarding from {local_address}:{local_port}"
- f" -> {remote_port}")
- try:
- while True:
- time.sleep(0.05)
- output = pf.stdout.readline().strip()
- if not output:
- return_code = pf.poll()
- if return_code is not None:
- errors = [error for error in pf.stdout.readlines()]
- raise PortForwardingError(
- 'Error forwarding port, kubectl return '
- f'code {return_code}, output {errors}')
- elif output != expected:
- raise PortForwardingError(
- f'Error forwarding port, unexpected output {output}')
- else:
- logger.info(output)
- break
- except Exception:
- self.port_forward_stop(pf)
- raise
- # TODO(sergiitk): return new PortForwarder object
- return pf
- @staticmethod
- def port_forward_stop(pf):
- logger.info('Shutting down port forwarding, pid %s', pf.pid)
- pf.kill()
- stdout, _stderr = pf.communicate(timeout=5)
- logger.info('Port forwarding stopped')
- # TODO(sergiitk): make debug
- logger.info('Port forwarding remaining stdout: %s', stdout)
- @staticmethod
- def _pod_started(pod: V1Pod):
- return pod.status.phase not in ('Pending', 'Unknown')
- @staticmethod
- def _replicas_available(deployment, count):
- return (deployment is not None and
- deployment.status.available_replicas is not None and
- deployment.status.available_replicas >= count)
|