import subprocess import time from ding.utils import K8sLauncher from .default_helper import one_time_warning class OrchestratorLauncher(object): """ Overview: Object to manage di-orchestrator in existing k8s cluster Interfaces: ``__init__``, ``create_orchestrator``, ``delete_orchestrator`` """ def __init__( self, version: str, name: str = 'di-orchestrator', cluster: K8sLauncher = None, registry: str = 'diorchestrator', cert_manager_version: str = 'v1.3.1', cert_manager_registry: str = 'quay.io/jetstack' ) -> None: """ Overview: Initialize the OrchestratorLauncher object. Arguments: - version (:obj:`str`): The version of di-orchestrator. - name (:obj:`str`): The name of di-orchestrator. - cluster (:obj:`K8sLauncher`): The k8s cluster to deploy di-orchestrator. - registry (:obj:`str`): The docker registry to pull images. - cert_manager_version (:obj:`str`): The version of cert-manager. - cert_manager_registry (:obj:`str`): The docker registry to pull cert-manager images. """ self.name = name self.version = version self.cluster = cluster self.registry = registry self.cert_manager_version = cert_manager_version self.cert_manager_registry = cert_manager_registry self._namespace = 'di-system' self._webhook = 'di-webhook' self._cert_manager_namespace = 'cert-manager' self._cert_manager_webhook = 'cert-manager-webhook' self.installer = 'https://raw.githubusercontent.com/opendilab/' + \ f'DI-orchestrator/{self.version}/config/di-manager.yaml' self.cert_manager = 'https://github.com/jetstack/' + \ f'cert-manager/releases/download/{self.cert_manager_version}/cert-manager.yaml' self._images = [ f'{self.registry}/di-operator:{self.version}', f'{self.registry}/di-webhook:{self.version}', f'{self.registry}/di-server:{self.version}', f'{self.cert_manager_registry}/cert-manager-cainjector:{self.cert_manager_version}', f'{self.cert_manager_registry}/cert-manager-controller:{self.cert_manager_version}', f'{self.cert_manager_registry}/cert-manager-webhook:{self.cert_manager_version}', ] self._check_kubectl_tools() def _check_kubectl_tools(self) -> None: """ Overview: Check if kubectl tools is installed. """ args = ['which', 'kubectl'] proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, _ = proc.communicate() if out.decode('utf-8') == '': raise FileNotFoundError( "No kubectl tools found, please install by executing ./ding/scripts/install-k8s-tools.sh" ) def create_orchestrator(self) -> None: """ Overview: Create di-orchestrator in k8s cluster. """ print('Creating orchestrator...') if self.cluster is not None: self.cluster.preload_images(self._images) # create and wait for cert-manager to be available create_components_from_config(self.cert_manager) wait_to_be_ready(self._cert_manager_namespace, self._cert_manager_webhook) # create and wait for di-orchestrator to be available create_components_from_config(self.installer) wait_to_be_ready(self._namespace, self._webhook) def delete_orchestrator(self) -> None: """ Overview: Delete di-orchestrator in k8s cluster. """ print('Deleting orchestrator...') for item in [self.cert_manager, self.installer]: args = ['kubectl', 'delete', '-f', f'{item}'] proc = subprocess.Popen(args, stderr=subprocess.PIPE) _, err = proc.communicate() err_str = err.decode('utf-8').strip() if err_str != '' and 'WARN' not in err_str and \ 'NotFound' not in err_str: raise RuntimeError(f'Failed to delete di-orchestrator: {err_str}') def create_components_from_config(config: str) -> None: """ Overview: Create components from config file. Arguments: - config (:obj:`str`): The config file. """ args = ['kubectl', 'create', '-f', f'{config}'] proc = subprocess.Popen(args, stderr=subprocess.PIPE) _, err = proc.communicate() err_str = err.decode('utf-8').strip() if err_str != '' and 'WARN' not in err_str: if 'already exists' in err_str: print(f'Components already exists: {config}') else: raise RuntimeError(f'Failed to launch components: {err_str}') def wait_to_be_ready(namespace: str, component: str, timeout: int = 120) -> None: """ Overview: Wait for the component to be ready. Arguments: - namespace (:obj:`str`): The namespace of the component. - component (:obj:`str`): The name of the component. - timeout (:obj:`int`): The timeout of waiting. """ try: from kubernetes import config, client, watch except ModuleNotFoundError: one_time_warning("You have not installed kubernetes package! Please try 'pip install DI-engine[k8s]'.") exit(-1) config.load_kube_config() appv1 = client.AppsV1Api() w = watch.Watch() for event in w.stream(appv1.list_namespaced_deployment, namespace, timeout_seconds=timeout): # print("Event: %s %s %s" % (event['type'], event['object'].kind, event['object'].metadata.name)) if event['object'].metadata.name.startswith(component) and \ event['object'].status.ready_replicas is not None and \ event['object'].status.ready_replicas >= 1: print(f'component {component} is ready for serving') w.stop()