import os import json from typing import Tuple from easydict import EasyDict import yaml import subprocess from enum import Enum, unique from ding.interaction.base import split_http_address from .default_helper import one_time_warning DEFAULT_NAMESPACE = 'default' DEFAULT_POD_NAME = 'dijob-example-coordinator' DEFAULT_API_VERSION = '/v1alpha1' DEFAULT_K8S_COLLECTOR_PORT = 22270 DEFAULT_K8S_LEARNER_PORT = 22271 DEFAULT_K8S_AGGREGATOR_SLAVE_PORT = 22272 DEFAULT_K8S_COORDINATOR_PORT = 22273 DEFAULT_K8S_AGGREGATOR_MASTER_PORT = 22273 def get_operator_server_kwargs(cfg: EasyDict) -> dict: """ Overview: Get kwarg dict from config file Arguments: - cfg (:obj:`EasyDict`) System config Returns: - result (:obj:`dict`) Containing ``api_version``, ``namespace``, ``name``, ``port``, ``host``. """ namespace = os.environ.get('KUBERNETES_POD_NAMESPACE', DEFAULT_NAMESPACE) name = os.environ.get('KUBERNETES_POD_NAME', DEFAULT_POD_NAME) url = cfg.get('system_addr', None) or os.environ.get('KUBERNETES_SERVER_URL', None) assert url, 'please set environment variable KUBERNETES_SERVER_URL in Kubenetes platform.' api_version = cfg.get('api_version', None) \ or os.environ.get('KUBERNETES_SERVER_API_VERSION', DEFAULT_API_VERSION) try: host, port = url.split(":")[0], int(url.split(":")[1]) except Exception as e: host, port, _, _ = split_http_address(url) return { 'api_version': api_version, 'namespace': namespace, 'name': name, 'host': host, 'port': port, } def exist_operator_server() -> bool: """ Overview: Check if the 'KUBERNETES_SERVER_URL' environment variable exists. """ return 'KUBERNETES_SERVER_URL' in os.environ def pod_exec_command(kubeconfig: str, name: str, namespace: str, cmd: str) -> Tuple[int, str]: """ Overview: Execute command in pod Arguments: - kubeconfig (:obj:`str`) The path of kubeconfig file - name (:obj:`str`) The name of pod - namespace (:obj:`str`) The namespace of pod """ try: from kubernetes import config from kubernetes.client import CoreV1Api from kubernetes.client.rest import ApiException from kubernetes.stream import stream except ModuleNotFoundError as e: one_time_warning("You have not installed kubernetes package! Please try 'pip install DI-engine[k8s]'.") exit(-1) config.load_kube_config(config_file=kubeconfig) core_v1 = CoreV1Api() resp = None try: resp = core_v1.read_namespaced_pod(name=name, namespace=namespace) except ApiException as e: if e.status != 404: return -1, "Unknown error: %s" % e if not resp: return -1, f"Pod {name} does not exist." if resp.status.phase != 'Running': return -1, f"Pod {name} is not in Running." exec_command = ['/bin/sh', '-c', cmd] resp = stream( core_v1.connect_get_namespaced_pod_exec, name, namespace, command=exec_command, stderr=False, stdin=False, stdout=True, tty=False ) resp = resp.replace("\'", "\"") \ .replace('None', 'null') \ .replace(': False', ': 0') \ .replace(': True', ': 1') \ .replace('"^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$"', '\\"^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$\\"') resp = json.loads(resp) return resp['code'], resp['message'] @unique class K8sType(Enum): Local = 1 K3s = 2 class K8sLauncher(object): """ Overview: object to manage the K8s cluster Interfaces: ``__init__``, ``_load``, ``create_cluster``, ``_check_k3d_tools``, ``delete_cluster``, ``preload_images`` """ def __init__(self, config_path: str) -> None: """ Overview: Initialize the K8sLauncher object. Arguments: - config_path (:obj:`str`): The path of the config file. """ self.name = None self.servers = 1 self.agents = 0 self.type = K8sType.Local self._images = [] self._load(config_path) self._check_k3d_tools() def _load(self, config_path: str) -> None: """ Overview: Load the config file. Arguments: - config_path (:obj:`str`): The path of the config file. """ with open(config_path, 'r') as f: data = yaml.safe_load(f) self.name = data.get('name') if data.get('name') else self.name if data.get('servers'): if type(data.get('servers')) is not int: raise TypeError(f"servers' type is expected int, actual {type(data.get('servers'))}") self.servers = data.get('servers') if data.get('agents'): if type(data.get('agents')) is not int: raise TypeError(f"agents' type is expected int, actual {type(data.get('agents'))}") self.agents = data.get('agents') if data.get('type'): if data.get('type') == 'k3s': self.type = K8sType.K3s elif data.get('type') == 'local': self.type = K8sType.Local else: raise ValueError(f"no type found for {data.get('type')}") if data.get('preload_images'): if type(data.get('preload_images')) is not list: raise TypeError(f"preload_images' type is expected list, actual {type(data.get('preload_images'))}") self._images = data.get('preload_images') def _check_k3d_tools(self) -> None: """ Overview: Check if the k3d tools exist. """ if self.type != K8sType.K3s: return args = ['which', 'k3d'] proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, _ = proc.communicate() if out.decode('utf-8') == '': raise FileNotFoundError( "No k3d tools found, please install by executing ./ding/scripts/install-k8s-tools.sh" ) def create_cluster(self) -> None: """ Overview: Create the k8s cluster. """ print('Creating k8s cluster...') if self.type != K8sType.K3s: return args = ['k3d', 'cluster', 'create', f'{self.name}', f'--servers={self.servers}', f'--agents={self.agents}'] proc = subprocess.Popen(args, stderr=subprocess.PIPE) _, err = proc.communicate() err_str = err.decode('utf-8').strip() if err_str != '' and 'WARN' not in err_str: if 'already exists' in err_str: print('K8s cluster already exists') else: raise RuntimeError(f'Failed to create cluster {self.name}: {err_str}') # preload images self.preload_images(self._images) def delete_cluster(self) -> None: """ Overview: Delete the k8s cluster. """ print('Deleting k8s cluster...') if self.type != K8sType.K3s: return args = ['k3d', 'cluster', 'delete', f'{self.name}'] proc = subprocess.Popen(args, stderr=subprocess.PIPE) _, err = proc.communicate() err_str = err.decode('utf-8').strip() if err_str != '' and 'WARN' not in err_str and \ 'NotFound' not in err_str: raise RuntimeError(f'Failed to delete cluster {self.name}: {err_str}') def preload_images(self, images: list) -> None: """ Overview: Preload images. """ if self.type != K8sType.K3s or len(images) == 0: return args = ['k3d', 'image', 'import', f'--cluster={self.name}'] args += images proc = subprocess.Popen(args, stderr=subprocess.PIPE) _, err = proc.communicate() err_str = err.decode('utf-8').strip() if err_str != '' and 'WARN' not in err_str: raise RuntimeError(f'Failed to preload images: {err_str}')