import os import sys import subprocess import signal import pickle from ditk import logging import time from threading import Thread from easydict import EasyDict import numpy as np from ding.worker import Coordinator, create_comm_collector, create_comm_learner, LearnerAggregator from ding.config import read_config_with_system, compile_config_parallel from ding.utils import set_pkg_seed, DEFAULT_K8S_AGGREGATOR_SLAVE_PORT, pod_exec_command def dist_prepare_config( filename: str, seed: int, platform: str, coordinator_host: str, learner_host: str, collector_host: str, coordinator_port: int, learner_port: int, collector_port, ) -> str: set_pkg_seed(seed) main_cfg, create_cfg, system_cfg = read_config_with_system(filename) config = compile_config_parallel( main_cfg, create_cfg=create_cfg, system_cfg=system_cfg, seed=seed, platform=platform, coordinator_host=coordinator_host, learner_host=learner_host, collector_host=collector_host, coordinator_port=coordinator_port, learner_port=learner_port, collector_port=collector_port, ) # Pickle dump config to disk for later use. real_filename = filename + '.pkl' with open(real_filename, 'wb') as f: pickle.dump(config, f) return real_filename def dist_launch_coordinator( filename: str, seed: int, coordinator_port: int, disable_flask_log: bool, enable_total_log: bool = False ) -> None: set_pkg_seed(seed) # Disable some part of DI-engine log if not enable_total_log: coordinator_log = logging.getLogger('coordinator_logger') coordinator_log.disabled = True if disable_flask_log: log = logging.getLogger('werkzeug') log.disabled = True with open(filename, 'rb') as f: config = pickle.load(f) # CLI > ENV VARIABLE > CONFIG if coordinator_port is not None: config.system.coordinator.port = coordinator_port elif os.environ.get('COORDINATOR_PORT', None): port = os.environ['COORDINATOR_PORT'] if port.isdigit(): config.system.coordinator.port = int(port) else: # use config pre-defined value assert 'port' in config.system.coordinator and np.isscalar(config.system.coordinator.port) coordinator = Coordinator(config) coordinator.start() # Monitor thread: Coordinator will remain running until its ``system_shutdown_flag`` is set to False. def shutdown_monitor(): while True: time.sleep(3) if coordinator.system_shutdown_flag: coordinator.close() break shutdown_monitor_thread = Thread(target=shutdown_monitor, args=(), daemon=True, name='shutdown_monitor') shutdown_monitor_thread.start() shutdown_monitor_thread.join() print("[DI-engine dist pipeline]Your RL agent is converged, you can refer to 'log' and 'tensorboard' for details") def dist_launch_learner( filename: str, seed: int, learner_port: int, name: str = None, disable_flask_log: bool = True ) -> None: set_pkg_seed(seed) if disable_flask_log: log = logging.getLogger('werkzeug') log.disabled = True if name is None: name = 'learner' with open(filename, 'rb') as f: config = pickle.load(f).system[name] # CLI > ENV VARIABLE > CONFIG if learner_port is not None: config.port = learner_port elif os.environ.get('LEARNER_PORT', None): port = os.environ['LEARNER_PORT'] if port.isdigit(): config.port = int(port) else: # use config pre-defined value assert 'port' in config and np.isscalar(config.port) learner = create_comm_learner(config) learner.start() def dist_launch_collector( filename: str, seed: int, collector_port: int, name: str = None, disable_flask_log: bool = True ) -> None: set_pkg_seed(seed) if disable_flask_log: log = logging.getLogger('werkzeug') log.disabled = True if name is None: name = 'collector' with open(filename, 'rb') as f: config = pickle.load(f).system[name] # CLI > ENV VARIABLE > CONFIG if collector_port is not None: config.port = collector_port elif os.environ.get('COLLECTOR_PORT', None): port = os.environ['COLLECTOR_PORT'] if port.isdigit(): config.port = int(port) else: # use config pre-defined value assert 'port' in config and np.isscalar(config.port) collector = create_comm_collector(config) collector.start() def dist_launch_learner_aggregator( filename: str, seed: int, aggregator_host: str, aggregator_port: int, name: str = None, disable_flask_log: bool = True ) -> None: set_pkg_seed(seed) if disable_flask_log: log = logging.getLogger('werkzeug') log.disabled = True if filename is not None: if name is None: name = 'learner_aggregator' with open(filename, 'rb') as f: config = pickle.load(f).system[name] else: # start without config (create a fake one) host, port = aggregator_host, DEFAULT_K8S_AGGREGATOR_SLAVE_PORT if aggregator_port is not None: port = aggregator_port elif os.environ.get('AGGREGATOR_PORT', None): _port = os.environ['AGGREGATOR_PORT'] if _port.isdigit(): port = int(_port) config = dict( master=dict(host=host, port=port + 1), slave=dict(host=host, port=port + 0), learner={}, ) config = EasyDict(config) learner_aggregator = LearnerAggregator(config) learner_aggregator.start() def dist_launch_spawn_learner( filename: str, seed: int, learner_port: int, name: str = None, disable_flask_log: bool = True ) -> None: current_env = os.environ.copy() local_world_size = int(os.environ.get('LOCAL_WORLD_SIZE', 1)) processes = [] for local_rank in range(0, local_world_size): dist_rank = int(os.environ.get('START_RANK', 0)) + local_rank current_env["RANK"] = str(dist_rank) current_env["LOCAL_RANK"] = str(local_rank) executable = subprocess.getoutput('which ding') assert len(executable) > 0, "cannot find executable \"ding\"" cmd = [executable, '-m', 'dist', '--module', 'learner'] if filename is not None: cmd += ['-c', f'{filename}'] if seed is not None: cmd += ['-s', f'{seed}'] if learner_port is not None: cmd += ['-lp', f'{learner_port}'] if name is not None: cmd += ['--module-name', f'{name}'] if disable_flask_log is not None: cmd += ['--disable-flask-log', f'{int(disable_flask_log)}'] sig_names = {2: "SIGINT", 15: "SIGTERM"} last_return_code = None def sigkill_handler(signum, frame): for process in processes: print(f"Killing subprocess {process.pid}") try: process.kill() except Exception: pass if last_return_code is not None: raise subprocess.CalledProcessError(returncode=last_return_code, cmd=cmd) if signum in sig_names: print(f"Main process received {sig_names[signum]}, exiting") sys.exit(1) # pass SIGINT/SIGTERM to children if the parent is being terminated signal.signal(signal.SIGINT, sigkill_handler) signal.signal(signal.SIGTERM, sigkill_handler) process = subprocess.Popen(cmd, env=current_env, stdout=None, stderr=None) processes.append(process) try: alive_processes = set(processes) while len(alive_processes): finished_processes = [] for process in alive_processes: if process.poll() is None: # the process is still running continue else: if process.returncode != 0: last_return_code = process.returncode # for sigkill_handler sigkill_handler(signal.SIGTERM, None) # not coming back else: # exited cleanly finished_processes.append(process) alive_processes = set(alive_processes) - set(finished_processes) time.sleep(1) finally: # close open file descriptors pass def dist_add_replicas( replicas_type: str, kubeconfig: str, replicas: int, coordinator_name: str, namespace: str, cpus: int, gpus: int, memory: str, ) -> None: assert coordinator_name and namespace, "Please provide --coordinator-name or --namespace" import json data = { "namespace": namespace, "coordinator": coordinator_name, } res = {"replicas": replicas} if cpus > 0: res['cpus'] = cpus if gpus > 0: res['gpus'] = gpus if memory: res['memory'] = memory if replicas_type == 'collector': data['collectors'] = res elif replicas_type == 'learner': data['learners'] = res cmd = 'curl -X POST $KUBERNETES_SERVER_URL/v1alpha1/replicas ' \ '-H "content-type: application/json" ' \ f'-d \'{json.dumps(data)}\'' ret, msg = pod_exec_command(kubeconfig, coordinator_name, namespace, cmd) if ret == 0: print(f'{replicas_type} add successfully') else: print(f'Failed to add {replicas_type}, return code: {ret}, message: {msg}') def dist_delete_replicas( replicas_type: str, kubeconfig: str, replicas: int, coordinator_name: str, namespace: str ) -> None: assert coordinator_name and namespace, "Please provide --coordinator-name or --namespace" import json data = { "namespace": namespace, "coordinator": coordinator_name, } if replicas_type == 'collector': data['collectors'] = {"replicas": replicas} elif replicas_type == 'learner': data['learners'] = {"replicas": replicas} cmd = 'curl -X DELETE $KUBERNETES_SERVER_URL/v1alpha1/replicas ' \ '-H "content-type: application/json" ' \ f'-d \'{json.dumps(data)}\'' ret, msg = pod_exec_command(kubeconfig, coordinator_name, namespace, cmd) if ret == 0: print(f'{replicas_type} delete successfully') else: print(f'Failed to delete {replicas_type}, return code: {ret}, message: {msg}') def dist_restart_replicas( replicas_type: str, kubeconfig: str, coordinator_name: str, namespace: str, restart_pod_name: str ) -> None: assert coordinator_name and namespace, "Please provide --coordinator-name or --namespace" import json data = { "namespace": namespace, "coordinator": coordinator_name, } assert restart_pod_name, "Please provide restart pod name with --restart-pod-name" if replicas_type == 'collector': data['collectors'] = [restart_pod_name] elif replicas_type == 'learner': data['learners'] = [restart_pod_name] cmd = 'curl -X POST $KUBERNETES_SERVER_URL/v1alpha1/replicas/failed ' \ '-H "content-type: application/json" ' \ f'-d \'{json.dumps(data)}\'' ret, msg = pod_exec_command(kubeconfig, coordinator_name, namespace, cmd) if ret == 0: print(f'{replicas_type} restart successfully') else: print(f'Failed to restart {replicas_type}, return code: {ret}, message: {msg}')