# Copyright 2020 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import functools import json import logging import subprocess import time from typing import Optional, List, Tuple import retrying import kubernetes.config from kubernetes import client from kubernetes import utils logger = logging.getLogger(__name__) # Type aliases V1Deployment = client.V1Deployment V1ServiceAccount = client.V1ServiceAccount V1Pod = client.V1Pod V1PodList = client.V1PodList V1Service = client.V1Service V1Namespace = client.V1Namespace ApiException = client.ApiException def simple_resource_get(func): def wrap_not_found_return_none(*args, **kwargs): try: return func(*args, **kwargs) except client.ApiException as e: if e.status == 404: # Ignore 404 return None raise return wrap_not_found_return_none def label_dict_to_selector(labels: dict) -> str: return ','.join(f'{k}=={v}' for k, v in labels.items()) class KubernetesApiManager: def __init__(self, context): self.context = context self.client = self._cached_api_client_for_context(context) self.apps = client.AppsV1Api(self.client) self.core = client.CoreV1Api(self.client) def close(self): self.client.close() @classmethod @functools.lru_cache(None) def _cached_api_client_for_context(cls, context: str) -> client.ApiClient: return kubernetes.config.new_client_from_config(context=context) class PortForwardingError(Exception): """Error forwarding port""" class KubernetesNamespace: NEG_STATUS_META = 'cloud.google.com/neg-status' PORT_FORWARD_LOCAL_ADDRESS: str = '127.0.0.1' DELETE_GRACE_PERIOD_SEC: int = 5 def __init__(self, api: KubernetesApiManager, name: str): self.name = name self.api = api def apply_manifest(self, manifest): return utils.create_from_dict(self.api.client, manifest, namespace=self.name) @simple_resource_get def get_service(self, name) -> V1Service: return self.api.core.read_namespaced_service(name, self.name) @simple_resource_get def get_service_account(self, name) -> V1Service: return self.api.core.read_namespaced_service_account(name, self.name) def delete_service(self, name, grace_period_seconds=DELETE_GRACE_PERIOD_SEC): self.api.core.delete_namespaced_service( name=name, namespace=self.name, body=client.V1DeleteOptions( propagation_policy='Foreground', grace_period_seconds=grace_period_seconds)) def delete_service_account(self, name, grace_period_seconds=DELETE_GRACE_PERIOD_SEC): self.api.core.delete_namespaced_service_account( name=name, namespace=self.name, body=client.V1DeleteOptions( propagation_policy='Foreground', grace_period_seconds=grace_period_seconds)) @simple_resource_get def get(self) -> V1Namespace: return self.api.core.read_namespace(self.name) def delete(self, grace_period_seconds=DELETE_GRACE_PERIOD_SEC): self.api.core.delete_namespace( name=self.name, body=client.V1DeleteOptions( propagation_policy='Foreground', grace_period_seconds=grace_period_seconds)) def wait_for_service_deleted(self, name: str, timeout_sec=60, wait_sec=1): @retrying.retry(retry_on_result=lambda r: r is not None, stop_max_delay=timeout_sec * 1000, wait_fixed=wait_sec * 1000) def _wait_for_deleted_service_with_retry(): service = self.get_service(name) if service is not None: logger.info('Waiting for service %s to be deleted', service.metadata.name) return service _wait_for_deleted_service_with_retry() def wait_for_service_account_deleted(self, name: str, timeout_sec=60, wait_sec=1): @retrying.retry(retry_on_result=lambda r: r is not None, stop_max_delay=timeout_sec * 1000, wait_fixed=wait_sec * 1000) def _wait_for_deleted_service_account_with_retry(): service_account = self.get_service_account(name) if service_account is not None: logger.info('Waiting for service account %s to be deleted', service_account.metadata.name) return service_account _wait_for_deleted_service_account_with_retry() def wait_for_namespace_deleted(self, timeout_sec=240, wait_sec=2): @retrying.retry(retry_on_result=lambda r: r is not None, stop_max_delay=timeout_sec * 1000, wait_fixed=wait_sec * 1000) def _wait_for_deleted_namespace_with_retry(): namespace = self.get() if namespace is not None: logger.info('Waiting for namespace %s to be deleted', namespace.metadata.name) return namespace _wait_for_deleted_namespace_with_retry() def wait_for_service_neg(self, name: str, timeout_sec=60, wait_sec=1): @retrying.retry(retry_on_result=lambda r: not r, stop_max_delay=timeout_sec * 1000, wait_fixed=wait_sec * 1000) def _wait_for_service_neg(): service = self.get_service(name) if self.NEG_STATUS_META not in service.metadata.annotations: logger.info('Waiting for service %s NEG', service.metadata.name) return False return True _wait_for_service_neg() def get_service_neg(self, service_name: str, service_port: int) -> Tuple[str, List[str]]: service = self.get_service(service_name) neg_info: dict = json.loads( service.metadata.annotations[self.NEG_STATUS_META]) neg_name: str = neg_info['network_endpoint_groups'][str(service_port)] neg_zones: List[str] = neg_info['zones'] return neg_name, neg_zones @simple_resource_get def get_deployment(self, name) -> V1Deployment: return self.api.apps.read_namespaced_deployment(name, self.name) def delete_deployment(self, name, grace_period_seconds=DELETE_GRACE_PERIOD_SEC): self.api.apps.delete_namespaced_deployment( name=name, namespace=self.name, body=client.V1DeleteOptions( propagation_policy='Foreground', grace_period_seconds=grace_period_seconds)) def list_deployment_pods(self, deployment: V1Deployment) -> List[V1Pod]: # V1LabelSelector.match_expressions not supported at the moment return self.list_pods_with_labels(deployment.spec.selector.match_labels) def wait_for_deployment_available_replicas(self, name, count=1, timeout_sec=60, wait_sec=1): @retrying.retry( retry_on_result=lambda r: not self._replicas_available(r, count), stop_max_delay=timeout_sec * 1000, wait_fixed=wait_sec * 1000) def _wait_for_deployment_available_replicas(): deployment = self.get_deployment(name) logger.info( 'Waiting for deployment %s to have %s available ' 'replicas, current count %s', deployment.metadata.name, count, deployment.status.available_replicas) return deployment _wait_for_deployment_available_replicas() def wait_for_deployment_deleted(self, deployment_name: str, timeout_sec=60, wait_sec=1): @retrying.retry(retry_on_result=lambda r: r is not None, stop_max_delay=timeout_sec * 1000, wait_fixed=wait_sec * 1000) def _wait_for_deleted_deployment_with_retry(): deployment = self.get_deployment(deployment_name) if deployment is not None: logger.info( 'Waiting for deployment %s to be deleted. ' 'Non-terminated replicas: %s', deployment.metadata.name, deployment.status.replicas) return deployment _wait_for_deleted_deployment_with_retry() def list_pods_with_labels(self, labels: dict) -> List[V1Pod]: pod_list: V1PodList = self.api.core.list_namespaced_pod( self.name, label_selector=label_dict_to_selector(labels)) return pod_list.items def get_pod(self, name) -> client.V1Pod: return self.api.core.read_namespaced_pod(name, self.name) def wait_for_pod_started(self, pod_name, timeout_sec=60, wait_sec=1): @retrying.retry(retry_on_result=lambda r: not self._pod_started(r), stop_max_delay=timeout_sec * 1000, wait_fixed=wait_sec * 1000) def _wait_for_pod_started(): pod = self.get_pod(pod_name) logger.info('Waiting for pod %s to start, current phase: %s', pod.metadata.name, pod.status.phase) return pod _wait_for_pod_started() def port_forward_pod( self, pod: V1Pod, remote_port: int, local_port: Optional[int] = None, local_address: Optional[str] = None, ) -> subprocess.Popen: """Experimental""" local_address = local_address or self.PORT_FORWARD_LOCAL_ADDRESS local_port = local_port or remote_port cmd = [ "kubectl", "--context", self.api.context, "--namespace", self.name, "port-forward", "--address", local_address, f"pod/{pod.metadata.name}", f"{local_port}:{remote_port}" ] pf = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) # Wait for stdout line indicating successful start. expected = (f"Forwarding from {local_address}:{local_port}" f" -> {remote_port}") try: while True: time.sleep(0.05) output = pf.stdout.readline().strip() if not output: return_code = pf.poll() if return_code is not None: errors = [error for error in pf.stdout.readlines()] raise PortForwardingError( 'Error forwarding port, kubectl return ' f'code {return_code}, output {errors}') elif output != expected: raise PortForwardingError( f'Error forwarding port, unexpected output {output}') else: logger.info(output) break except Exception: self.port_forward_stop(pf) raise # TODO(sergiitk): return new PortForwarder object return pf @staticmethod def port_forward_stop(pf): logger.info('Shutting down port forwarding, pid %s', pf.pid) pf.kill() stdout, _stderr = pf.communicate(timeout=5) logger.info('Port forwarding stopped') # TODO(sergiitk): make debug logger.info('Port forwarding remaining stdout: %s', stdout) @staticmethod def _pod_started(pod: V1Pod): return pod.status.phase not in ('Pending', 'Unknown') @staticmethod def _replicas_available(deployment, count): return (deployment is not None and deployment.status.available_replicas is not None and deployment.status.available_replicas >= count)