import concurrent.futures import gradio as gr import subprocess import tempfile import shutil import traceback import importlib.util import time class CodeExecutor: def __init__(self): self.temp_dir = tempfile.TemporaryDirectory() self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) def _install_packages(self, packages): for package in packages: try: spec = importlib.util.find_spec(package) if spec is None: subprocess.check_call(["pip", "install", package], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=10) except subprocess.CalledProcessError as e: raise Exception(f"Error installing package {package}: {e}") except subprocess.TimeoutExpired: raise Exception(f"Timed out installing package {package}") def _execute_code(self, code, inputs): temp_file = f"{self.temp_dir.name}/temp.py" with open(temp_file, "w") as f: f.write(code) output_file = f"{self.temp_dir.name}/output.txt" error_file = f"{self.temp_dir.name}/error.txt" with open(output_file, "w") as output, open(error_file, "w") as error: try: process = subprocess.Popen(["python", temp_file], stdin=subprocess.PIPE, stdout=output, stderr=error) if inputs: for input_value in inputs: process.stdin.write(input_value.encode()) process.stdin.write(b"\n") process.stdin.close() process.wait(timeout=10) except subprocess.TimeoutExpired: raise Exception("Timed out executing code") except Exception as e: error.write(traceback.format_exc()) with open(output_file, "r") as output, open(error_file, "r") as error: output_text = output.read() error_text = error.read() if error_text: return f"User Code:\n{code}\n\nError:\n{error_text}" return f"User Code:\n{code}\n\nInputs:\n{', '.join(inputs)}\n\nOutput:\n{output_text}" def execute(self, code, inputs, packages): future = self.executor.submit(self._execute_code, code, inputs) try: output = future.result(timeout=10) except concurrent.futures.TimeoutError: raise Exception("Timed out waiting for code execution") except Exception as e: raise Exception(f"Error: {str(e)}") finally: if packages: self._install_packages(packages.split(",")) return output def __del__(self): self.executor.shutdown(wait=False) shutil.rmtree(self.temp_dir.name) def wrapper_execute(code, inputs, packages): executor = CodeExecutor() return executor.execute(code, inputs.split(","), packages) def create_interface(): with gr.Blocks() as demo: gr.Markdown("# Code Executor") gr.Markdown("Execute Python code with inputs and packages") gr.Markdown("### Code") code_input = gr.Textbox(label="Code", lines=20) gr.Markdown("### Inputs") inputs_input = gr.Textbox(label="Inputs (comma-separated)", lines=1) gr.Markdown("### Packages") packages_input = gr.Textbox(label="Packages (comma-separated)", lines=1) gr.Markdown("### Output") output_text = gr.Text(label="Output", lines=20) run_button = gr.Button("Run") run_button.click( wrapper_execute, inputs=[code_input, inputs_input, packages_input], outputs=output_text ) return demo if __name__ == "__main__": demo = create_interface() demo.launch()