Spaces:
Running
Running
File size: 4,022 Bytes
8f809e2 3573a39 3a0ee14 8f809e2 3a0ee14 3573a39 3a0ee14 3573a39 3a0ee14 9e4233f 3a0ee14 3573a39 3a0ee14 be473e6 3a0ee14 be473e6 3a0ee14 3573a39 3a0ee14 9e4233f 3a0ee14 3573a39 3a0ee14 be473e6 3a0ee14 3573a39 3a0ee14 3573a39 be473e6 3a0ee14 3573a39 9e4233f 8f809e2 9e4233f 3573a39 9e4233f 8f809e2 9e4233f 3573a39 3a0ee14 9e4233f 8f809e2 3573a39 8f809e2 3573a39 8f809e2 3573a39 8f809e2 3573a39 8f809e2 3573a39 8f809e2 3573a39 8f809e2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
import os
import subprocess
import yaml
YAML_PATH = "./config.yaml"
PIPE_PATH = "./tmp/pipe"
class Dumper(yaml.Dumper):
def increase_indent(self, flow=False, *args, **kwargs):
return super().increase_indent(flow=flow, indentless=False)
# read scanners from yaml file
# return a list of scanners
def read_scanners(path):
scanners = []
with open(path, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
scanners = config.get("detectors", [])
return scanners
# convert a list of scanners to yaml file
def write_scanners(scanners):
print(scanners)
with open(YAML_PATH, "r+") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
if config:
config["detectors"] = scanners
# save scanners to detectors in yaml
yaml.dump(config, f, Dumper=Dumper)
# read model_type from yaml file
def read_inference_type(path):
inference_type = ""
with open(path, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
inference_type = config.get("inference_type", "")
return inference_type
# write model_type to yaml file
def write_inference_type(use_inference):
with open(YAML_PATH, "r+") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
if use_inference:
config["inference_type"] = "hf_inference_api"
else:
config["inference_type"] = "hf_pipeline"
# save inference_type to inference_type in yaml
yaml.dump(config, f, Dumper=Dumper)
# read column mapping from yaml file
def read_column_mapping(path):
column_mapping = {}
with open(path, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
if config:
column_mapping = config.get("column_mapping", dict())
return column_mapping
# write column mapping to yaml file
def write_column_mapping(mapping):
with open(YAML_PATH, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
if config is None:
return
if mapping is None and "column_mapping" in config.keys():
del config["column_mapping"]
else:
config["column_mapping"] = mapping
with open(YAML_PATH, "w") as f:
# save column_mapping to column_mapping in yaml
yaml.dump(config, f, Dumper=Dumper)
# convert column mapping dataframe to json
def convert_column_mapping_to_json(df, label=""):
column_mapping = {}
column_mapping[label] = []
for _, row in df.iterrows():
column_mapping[label].append(row.tolist())
return column_mapping
def get_logs_file(uid):
try:
file = open(f"./tmp/{uid}_log", "r")
return file.read()
except Exception:
return "Log file does not exist"
def write_log_to_user_file(id, log):
with open(f"./tmp/{id}_log", "a") as f:
f.write(log)
def save_job_to_pipe(id, job, lock):
if not os.path.exists("./tmp"):
os.makedirs("./tmp")
job = [str(i) for i in job]
job = ",".join(job)
print(job)
with lock:
with open(PIPE_PATH, "a") as f:
# write each element in job
f.write(f"{id}@{job}\n")
def pop_job_from_pipe():
if not os.path.exists(PIPE_PATH):
return
with open(PIPE_PATH, "r") as f:
job = f.readline().strip()
remaining = f.readlines()
f.close()
print(job, remaining, ">>>>")
with open(PIPE_PATH, "w") as f:
f.write("\n".join(remaining))
f.close()
if len(job) == 0:
return
job_info = job.split("\n")[0].split("@")
if len(job_info) != 2:
raise ValueError("Invalid job info: ", job_info)
write_log_to_user_file(job_info[0], f"Running job {job_info}")
command = job_info[1].split(",")
write_log_to_user_file(job_info[0], f"Running command {command}")
log_file = open(f"./tmp/{job_info[0]}_log", "a")
subprocess.Popen(
command,
cwd=os.path.join(os.path.dirname(os.path.realpath(__file__)), "cicd"),
stdout=log_file,
stderr=log_file,
)
|