import os import tempfile import shutil from zipfile import ZipFile import logging import psutil import subprocess from flask import Flask, request, jsonify, render_template, send_file from mpi4py import MPI # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize Flask app app = Flask(__name__) connected_cpus = {"localhost": {"cpu_count": psutil.cpu_count(logical=False), "usage": psutil.cpu_percent(interval=1)}} # Define the target function for MPI def target_function(script_path, folder_path): output_log = tempfile.TemporaryFile(mode='w+t') try: result = subprocess.run(['python', script_path], cwd=folder_path, stdout=output_log, stderr=subprocess.STDOUT) output_log.seek(0) log_output = output_log.read() except Exception as e: log_output = str(e) finally: output_log.close() return log_output # Endpoint to handle file uploads and script execution @app.route('/upload', methods=['POST']) def handle_upload(): try: if 'file' not in request.files or 'script_content' not in request.form: return jsonify({"status": "error", "message": "File or script content not provided"}), 400 files = request.files.getlist('file') script_content = request.form['script_content'] # Create a temporary directory to store uploaded files temp_dir = tempfile.mkdtemp() # Save the uploaded files to the temporary directory folder_path = os.path.join(temp_dir, 'uploaded_folder') os.makedirs(folder_path, exist_ok=True) for file_obj in files: file_path = os.path.join(folder_path, file_obj.filename) file_obj.save(file_path) # Save the script content to a file script_path = os.path.join(folder_path, 'user_script.py') with open(script_path, 'w') as script_file: script_file.write(script_content) # Run the script using MPI log_output = run_script_with_mpi(script_path, folder_path) # Create a zip file of the entire folder zip_path = os.path.join(temp_dir, 'output_folder.zip') with ZipFile(zip_path, 'w') as zipf: for root, _, files in os.walk(folder_path): for file in files: zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), folder_path)) return jsonify({"status": "success", "log_output": log_output, "download_url": f"/download/{os.path.basename(zip_path)}"}) except Exception as e: logger.error(f"Error in handle_upload: {e}") return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/download/') def download_file(filename): try: return send_file(os.path.join(tempfile.gettempdir(), filename), as_attachment=True) except Exception as e: logger.error(f"Error in download_file: {e}") return jsonify({"status": "error", "message": str(e)}), 500 # Endpoint to get connected CPUs information @app.route('/cpu_info', methods=['GET']) def get_cpu_info(): try: info = [] for host, data in connected_cpus.items(): usage = psutil.cpu_percent(interval=1) if host == "localhost" else data['usage'] info.append(f"{host}: {data['cpu_count']} CPUs, {usage}% usage") return jsonify({"status": "success", "cpu_info": "\n".join(info)}) except Exception as e: logger.error(f"Error in get_cpu_info: {e}") return jsonify({"status": "error", "message": str(e)}), 500 # Endpoint to execute commands @app.route('/execute_command', methods=['POST']) def execute_command(): try: command = request.form['command'] if not command: return jsonify({"status": "error", "message": "No command provided"}), 400 # Ensure commands are executed in a safe environment allowed_commands = ['pip install'] if not any(command.startswith(cmd) for cmd in allowed_commands): return jsonify({"status": "error", "message": "Command not allowed"}), 400 output_log = tempfile.TemporaryFile(mode='w+t') try: result = subprocess.run(command.split(), stdout=output_log, stderr=subprocess.STDOUT) output_log.seek(0) log_output = output_log.read() except Exception as e: log_output = str(e) finally: output_log.close() return jsonify({"status": "success", "log_output": log_output}) except Exception as e: logger.error(f"Error in execute_command: {e}") return jsonify({"status": "error", "message": str(e)}), 500 # Endpoint to donate CPU resources @app.route('/donate_cpu', methods=['POST']) def donate_cpu(): try: data = request.json host = data['host'] cpu_count = data['cpu_count'] connected_cpus[host] = {"cpu_count": cpu_count, "usage": 0.0} return jsonify({"status": "success", "message": f"CPU resources from {host} donated successfully."}) except Exception as e: logger.error(f"Error in donate_cpu: {e}") return jsonify({"status": "error", "message": str(e)}), 500 # Endpoint to update CPU usage @app.route('/update_cpu_usage', methods=['POST']) def update_cpu_usage(): try: data = request.json host = data['host'] usage = data['usage'] if host in connected_cpus: connected_cpus[host]['usage'] = usage return jsonify({"status": "success", "message": f"CPU usage from {host} updated successfully."}) else: return jsonify({"status": "error", "message": f"Host {host} not found."}), 404 except Exception as e: logger.error(f"Error in update_cpu_usage: {e}") return jsonify({"status": "error", "message": str(e)}), 500 # Main interface @app.route('/') def index(): return render_template('index.html') def run_script_with_mpi(script_path, folder_path): # Create a temporary directory for MPI processes mpi_temp_dir = tempfile.mkdtemp() mpi_script_path = os.path.join(mpi_temp_dir, 'mpi_script.py') # Write the MPI script to the temporary directory with open(mpi_script_path, 'w') as mpi_script_file: mpi_script_file.write(f""" import os import tempfile import subprocess from mpi4py import MPI def target_function(script_path, folder_path): output_log = tempfile.TemporaryFile(mode='w+t') try: result = subprocess.run(['python', script_path], cwd=folder_path, stdout=output_log, stderr=subprocess.STDOUT) output_log.seek(0) log_output = output_log.read() except Exception as e: log_output = str(e) finally: output_log.close() return log_output def run_script(script_path, folder_path): comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() if rank == 0: # Master process log_outputs = [] for i in range(1, size): log_output = comm.recv(source=i, tag=11) log_outputs.append(log_output) with open(os.path.join(folder_path, 'mpi_log_output.txt'), 'w') as log_file: log_file.write('\\n'.join(log_outputs)) else: # Worker process log_output = target_function(script_path, folder_path) comm.send(log_output, dest=0, tag=11) if __name__ == "__main__": run_script('{script_path}', '{folder_path}') """) # Run the MPI script using subprocess result = subprocess.run(['mpiexec', '-n', str(psutil.cpu_count(logical=False)), 'python', mpi_script_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Read the log output from the file log_output_path = os.path.join(folder_path, 'mpi_log_output.txt') with open(log_output_path, 'r') as log_file: log_output = log_file.read() # Clean up the temporary directory shutil.rmtree(mpi_temp_dir) return log_output if __name__ == "__main__": app.run(host='0.0.0.0', port=7860, threaded=True)