k8s.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. # Copyright 2020 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import functools
  15. import json
  16. import logging
  17. import subprocess
  18. import time
  19. from typing import Optional, List, Tuple
  20. import retrying
  21. import kubernetes.config
  22. from kubernetes import client
  23. from kubernetes import utils
  24. logger = logging.getLogger(__name__)
  25. # Type aliases
  26. V1Deployment = client.V1Deployment
  27. V1ServiceAccount = client.V1ServiceAccount
  28. V1Pod = client.V1Pod
  29. V1PodList = client.V1PodList
  30. V1Service = client.V1Service
  31. V1Namespace = client.V1Namespace
  32. ApiException = client.ApiException
  33. def simple_resource_get(func):
  34. def wrap_not_found_return_none(*args, **kwargs):
  35. try:
  36. return func(*args, **kwargs)
  37. except client.ApiException as e:
  38. if e.status == 404:
  39. # Ignore 404
  40. return None
  41. raise
  42. return wrap_not_found_return_none
  43. def label_dict_to_selector(labels: dict) -> str:
  44. return ','.join(f'{k}=={v}' for k, v in labels.items())
  45. class KubernetesApiManager:
  46. def __init__(self, context):
  47. self.context = context
  48. self.client = self._cached_api_client_for_context(context)
  49. self.apps = client.AppsV1Api(self.client)
  50. self.core = client.CoreV1Api(self.client)
  51. def close(self):
  52. self.client.close()
  53. @classmethod
  54. @functools.lru_cache(None)
  55. def _cached_api_client_for_context(cls, context: str) -> client.ApiClient:
  56. return kubernetes.config.new_client_from_config(context=context)
  57. class PortForwardingError(Exception):
  58. """Error forwarding port"""
  59. class KubernetesNamespace:
  60. NEG_STATUS_META = 'cloud.google.com/neg-status'
  61. PORT_FORWARD_LOCAL_ADDRESS: str = '127.0.0.1'
  62. DELETE_GRACE_PERIOD_SEC: int = 5
  63. def __init__(self, api: KubernetesApiManager, name: str):
  64. self.name = name
  65. self.api = api
  66. def apply_manifest(self, manifest):
  67. return utils.create_from_dict(self.api.client,
  68. manifest,
  69. namespace=self.name)
  70. @simple_resource_get
  71. def get_service(self, name) -> V1Service:
  72. return self.api.core.read_namespaced_service(name, self.name)
  73. @simple_resource_get
  74. def get_service_account(self, name) -> V1Service:
  75. return self.api.core.read_namespaced_service_account(name, self.name)
  76. def delete_service(self, name,
  77. grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
  78. self.api.core.delete_namespaced_service(
  79. name=name,
  80. namespace=self.name,
  81. body=client.V1DeleteOptions(
  82. propagation_policy='Foreground',
  83. grace_period_seconds=grace_period_seconds))
  84. def delete_service_account(self,
  85. name,
  86. grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
  87. self.api.core.delete_namespaced_service_account(
  88. name=name,
  89. namespace=self.name,
  90. body=client.V1DeleteOptions(
  91. propagation_policy='Foreground',
  92. grace_period_seconds=grace_period_seconds))
  93. @simple_resource_get
  94. def get(self) -> V1Namespace:
  95. return self.api.core.read_namespace(self.name)
  96. def delete(self, grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
  97. self.api.core.delete_namespace(
  98. name=self.name,
  99. body=client.V1DeleteOptions(
  100. propagation_policy='Foreground',
  101. grace_period_seconds=grace_period_seconds))
  102. def wait_for_service_deleted(self, name: str, timeout_sec=60, wait_sec=1):
  103. @retrying.retry(retry_on_result=lambda r: r is not None,
  104. stop_max_delay=timeout_sec * 1000,
  105. wait_fixed=wait_sec * 1000)
  106. def _wait_for_deleted_service_with_retry():
  107. service = self.get_service(name)
  108. if service is not None:
  109. logger.info('Waiting for service %s to be deleted',
  110. service.metadata.name)
  111. return service
  112. _wait_for_deleted_service_with_retry()
  113. def wait_for_service_account_deleted(self,
  114. name: str,
  115. timeout_sec=60,
  116. wait_sec=1):
  117. @retrying.retry(retry_on_result=lambda r: r is not None,
  118. stop_max_delay=timeout_sec * 1000,
  119. wait_fixed=wait_sec * 1000)
  120. def _wait_for_deleted_service_account_with_retry():
  121. service_account = self.get_service_account(name)
  122. if service_account is not None:
  123. logger.info('Waiting for service account %s to be deleted',
  124. service_account.metadata.name)
  125. return service_account
  126. _wait_for_deleted_service_account_with_retry()
  127. def wait_for_namespace_deleted(self, timeout_sec=240, wait_sec=2):
  128. @retrying.retry(retry_on_result=lambda r: r is not None,
  129. stop_max_delay=timeout_sec * 1000,
  130. wait_fixed=wait_sec * 1000)
  131. def _wait_for_deleted_namespace_with_retry():
  132. namespace = self.get()
  133. if namespace is not None:
  134. logger.info('Waiting for namespace %s to be deleted',
  135. namespace.metadata.name)
  136. return namespace
  137. _wait_for_deleted_namespace_with_retry()
  138. def wait_for_service_neg(self, name: str, timeout_sec=60, wait_sec=1):
  139. @retrying.retry(retry_on_result=lambda r: not r,
  140. stop_max_delay=timeout_sec * 1000,
  141. wait_fixed=wait_sec * 1000)
  142. def _wait_for_service_neg():
  143. service = self.get_service(name)
  144. if self.NEG_STATUS_META not in service.metadata.annotations:
  145. logger.info('Waiting for service %s NEG', service.metadata.name)
  146. return False
  147. return True
  148. _wait_for_service_neg()
  149. def get_service_neg(self, service_name: str,
  150. service_port: int) -> Tuple[str, List[str]]:
  151. service = self.get_service(service_name)
  152. neg_info: dict = json.loads(
  153. service.metadata.annotations[self.NEG_STATUS_META])
  154. neg_name: str = neg_info['network_endpoint_groups'][str(service_port)]
  155. neg_zones: List[str] = neg_info['zones']
  156. return neg_name, neg_zones
  157. @simple_resource_get
  158. def get_deployment(self, name) -> V1Deployment:
  159. return self.api.apps.read_namespaced_deployment(name, self.name)
  160. def delete_deployment(self,
  161. name,
  162. grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
  163. self.api.apps.delete_namespaced_deployment(
  164. name=name,
  165. namespace=self.name,
  166. body=client.V1DeleteOptions(
  167. propagation_policy='Foreground',
  168. grace_period_seconds=grace_period_seconds))
  169. def list_deployment_pods(self, deployment: V1Deployment) -> List[V1Pod]:
  170. # V1LabelSelector.match_expressions not supported at the moment
  171. return self.list_pods_with_labels(deployment.spec.selector.match_labels)
  172. def wait_for_deployment_available_replicas(self,
  173. name,
  174. count=1,
  175. timeout_sec=60,
  176. wait_sec=1):
  177. @retrying.retry(
  178. retry_on_result=lambda r: not self._replicas_available(r, count),
  179. stop_max_delay=timeout_sec * 1000,
  180. wait_fixed=wait_sec * 1000)
  181. def _wait_for_deployment_available_replicas():
  182. deployment = self.get_deployment(name)
  183. logger.info(
  184. 'Waiting for deployment %s to have %s available '
  185. 'replicas, current count %s', deployment.metadata.name, count,
  186. deployment.status.available_replicas)
  187. return deployment
  188. _wait_for_deployment_available_replicas()
  189. def wait_for_deployment_deleted(self,
  190. deployment_name: str,
  191. timeout_sec=60,
  192. wait_sec=1):
  193. @retrying.retry(retry_on_result=lambda r: r is not None,
  194. stop_max_delay=timeout_sec * 1000,
  195. wait_fixed=wait_sec * 1000)
  196. def _wait_for_deleted_deployment_with_retry():
  197. deployment = self.get_deployment(deployment_name)
  198. if deployment is not None:
  199. logger.info(
  200. 'Waiting for deployment %s to be deleted. '
  201. 'Non-terminated replicas: %s', deployment.metadata.name,
  202. deployment.status.replicas)
  203. return deployment
  204. _wait_for_deleted_deployment_with_retry()
  205. def list_pods_with_labels(self, labels: dict) -> List[V1Pod]:
  206. pod_list: V1PodList = self.api.core.list_namespaced_pod(
  207. self.name, label_selector=label_dict_to_selector(labels))
  208. return pod_list.items
  209. def get_pod(self, name) -> client.V1Pod:
  210. return self.api.core.read_namespaced_pod(name, self.name)
  211. def wait_for_pod_started(self, pod_name, timeout_sec=60, wait_sec=1):
  212. @retrying.retry(retry_on_result=lambda r: not self._pod_started(r),
  213. stop_max_delay=timeout_sec * 1000,
  214. wait_fixed=wait_sec * 1000)
  215. def _wait_for_pod_started():
  216. pod = self.get_pod(pod_name)
  217. logger.info('Waiting for pod %s to start, current phase: %s',
  218. pod.metadata.name, pod.status.phase)
  219. return pod
  220. _wait_for_pod_started()
  221. def port_forward_pod(
  222. self,
  223. pod: V1Pod,
  224. remote_port: int,
  225. local_port: Optional[int] = None,
  226. local_address: Optional[str] = None,
  227. ) -> subprocess.Popen:
  228. """Experimental"""
  229. local_address = local_address or self.PORT_FORWARD_LOCAL_ADDRESS
  230. local_port = local_port or remote_port
  231. cmd = [
  232. "kubectl", "--context", self.api.context, "--namespace", self.name,
  233. "port-forward", "--address", local_address,
  234. f"pod/{pod.metadata.name}", f"{local_port}:{remote_port}"
  235. ]
  236. pf = subprocess.Popen(cmd,
  237. stdout=subprocess.PIPE,
  238. stderr=subprocess.STDOUT,
  239. universal_newlines=True)
  240. # Wait for stdout line indicating successful start.
  241. expected = (f"Forwarding from {local_address}:{local_port}"
  242. f" -> {remote_port}")
  243. try:
  244. while True:
  245. time.sleep(0.05)
  246. output = pf.stdout.readline().strip()
  247. if not output:
  248. return_code = pf.poll()
  249. if return_code is not None:
  250. errors = [error for error in pf.stdout.readlines()]
  251. raise PortForwardingError(
  252. 'Error forwarding port, kubectl return '
  253. f'code {return_code}, output {errors}')
  254. elif output != expected:
  255. raise PortForwardingError(
  256. f'Error forwarding port, unexpected output {output}')
  257. else:
  258. logger.info(output)
  259. break
  260. except Exception:
  261. self.port_forward_stop(pf)
  262. raise
  263. # TODO(sergiitk): return new PortForwarder object
  264. return pf
  265. @staticmethod
  266. def port_forward_stop(pf):
  267. logger.info('Shutting down port forwarding, pid %s', pf.pid)
  268. pf.kill()
  269. stdout, _stderr = pf.communicate(timeout=5)
  270. logger.info('Port forwarding stopped')
  271. # TODO(sergiitk): make debug
  272. logger.info('Port forwarding remaining stdout: %s', stdout)
  273. @staticmethod
  274. def _pod_started(pod: V1Pod):
  275. return pod.status.phase not in ('Pending', 'Unknown')
  276. @staticmethod
  277. def _replicas_available(deployment, count):
  278. return (deployment is not None and
  279. deployment.status.available_replicas is not None and
  280. deployment.status.available_replicas >= count)