import contextlib import faulthandler import io import multiprocessing import os import platform import signal import random import subprocess import tempfile import gzip import json from typing import * def dicts_to_jsonl(data_list: list, filename: str, compress: bool = True) -> None: """ Method saves list of dicts into jsonl file. :param data: (list) list of dicts to be stored, :param filename: (str) path to the output file. If suffix .jsonl is not given then methods appends .jsonl suffix into the file. :param compress: (bool) should file be compressed into a gzip archive? """ sjsonl = '.jsonl' sgz = '.gz' # Check filename if not filename.endswith(sjsonl): filename = filename + sjsonl # Save data if compress: filename = filename + sgz with gzip.open(filename, 'w') as compressed: for ddict in data_list: jout = json.dumps(ddict) + '\n' jout = jout.encode('utf-8') compressed.write(jout) else: with open(filename, 'w') as out: for ddict in data_list: jout = json.dumps(ddict) + '\n' out.write(jout) def check_correctness( full_code: str, timeout: float = 3.0, tmp_dir: str = "tmp", ) -> Dict: """ Evaluates the functional correctness of a completion by running the test suite provided in the problem. """ def unsafe_execute(tmp_dir): with create_tempdir(): # These system calls are needed when cleaning up tempdir. import os import shutil rmtree = shutil.rmtree rmdir = os.rmdir chdir = os.chdir # Disable functionalities that can make destructive changes to the test. reliability_guard() try: exec_globals = {} with swallow_io(): with time_limit(timeout): exec(full_code, exec_globals) result.append("passed") except TimeoutException: result.append("timed out") except AssertionError as e: result.append(f"failed: AssertionError") except BaseException as e: result.append(f"failed: {e}") # Needed for cleaning up. shutil.rmtree = rmtree os.rmdir = rmdir os.chdir = chdir manager = multiprocessing.Manager() result = manager.list() p = multiprocessing.Process(target=unsafe_execute, args=(tmp_dir,)) p.start() p.join(timeout=timeout + 1) if p.is_alive(): p.kill() if not result: result.append("timed out") return { "result" : result[0], "passed" : result[0] == "passed", } # Copyright (c) OpenAI (https://openai.com) # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # ============================================================================ @contextlib.contextmanager def time_limit(seconds: float): def signal_handler(signum, frame): raise TimeoutException("Timed out!") signal.setitimer(signal.ITIMER_REAL, seconds) signal.signal(signal.SIGALRM, signal_handler) try: yield finally: signal.setitimer(signal.ITIMER_REAL, 0) @contextlib.contextmanager def swallow_io(): stream = WriteOnlyStringIO() with contextlib.redirect_stdout(stream): with contextlib.redirect_stderr(stream): with redirect_stdin(stream): yield @contextlib.contextmanager def create_tempdir(): with tempfile.TemporaryDirectory() as dirname: with chdir(dirname): yield dirname class TimeoutException(Exception): pass class WriteOnlyStringIO(io.StringIO): """ StringIO that throws an exception when it's read from """ def read(self, *args, **kwargs): raise IOError def readline(self, *args, **kwargs): raise IOError def readlines(self, *args, **kwargs): raise IOError def readable(self, *args, **kwargs): """ Returns True if the IO object can be read. """ return False class redirect_stdin(contextlib._RedirectStream): # type: ignore _stream = 'stdin' @contextlib.contextmanager def chdir(root): if root == ".": yield return cwd = os.getcwd() os.chdir(root) try: yield except BaseException as exc: raise exc finally: os.chdir(cwd) def reliability_guard(maximum_memory_bytes: Optional[int] = None): """ This disables various destructive functions and prevents the generated code from interfering with the test (e.g. fork bomb, killing other processes, removing filesystem files, etc.) WARNING This function is NOT a security sandbox. Untrusted code, including, model- generated code, should not be blindly executed outside of one. See the Codex paper for more information about OpenAI's code sandbox, and proceed with caution. """ if maximum_memory_bytes is not None: import resource resource.setrlimit(resource.RLIMIT_AS, (maximum_memory_bytes, maximum_memory_bytes)) resource.setrlimit(resource.RLIMIT_DATA, (maximum_memory_bytes, maximum_memory_bytes)) if not platform.uname().system == 'Darwin': resource.setrlimit(resource.RLIMIT_STACK, (maximum_memory_bytes, maximum_memory_bytes)) faulthandler.disable() import builtins builtins.exit = None builtins.quit = None import os os.environ['OMP_NUM_THREADS'] = '1' os.kill = None os.system = None os.putenv = None os.remove = None os.removedirs = None os.rmdir = None os.fchdir = None os.setuid = None os.fork = None os.forkpty = None os.killpg = None os.rename = None os.renames = None os.truncate = None os.replace = None os.unlink = None os.fchmod = None os.fchown = None os.chmod = None os.chown = None os.chroot = None os.fchdir = None os.lchflags = None os.lchmod = None os.lchown = None os.getcwd = None os.chdir = None import shutil shutil.rmtree = None shutil.move = None shutil.chown = None import subprocess subprocess.Popen = None # type: ignore __builtins__['help'] = None import sys sys.modules['ipdb'] = None sys.modules['joblib'] = None sys.modules['resource'] = None sys.modules['psutil'] = None sys.modules['tkinter'] = None