|
import os |
|
|
|
import yaml |
|
|
|
YAML_PATH = "./cicd/configs" |
|
LOG_FILE = "temp_log" |
|
|
|
|
|
class Dumper(yaml.Dumper): |
|
def increase_indent(self, flow=False, *args, **kwargs): |
|
return super().increase_indent(flow=flow, indentless=False) |
|
|
|
|
|
def get_yaml_path(uid): |
|
if not os.path.exists(YAML_PATH): |
|
os.makedirs(YAML_PATH) |
|
if not os.path.exists(f"{YAML_PATH}/{uid}_config.yaml"): |
|
os.system(f"cp config.yaml {YAML_PATH}/{uid}_config.yaml") |
|
return f"{YAML_PATH}/{uid}_config.yaml" |
|
|
|
|
|
|
|
|
|
def read_scanners(uid): |
|
scanners = [] |
|
with open(get_yaml_path(uid), "r") as f: |
|
config = yaml.load(f, Loader=yaml.FullLoader) |
|
scanners = config.get("detectors", []) |
|
return scanners |
|
|
|
|
|
|
|
def write_scanners(scanners, uid): |
|
with open(get_yaml_path(uid), "r") as f: |
|
config = yaml.load(f, Loader=yaml.FullLoader) |
|
if config: |
|
config["detectors"] = scanners |
|
|
|
with open(get_yaml_path(uid), "w") as f: |
|
yaml.dump(config, f, Dumper=Dumper) |
|
|
|
|
|
|
|
def read_inference_type(uid): |
|
inference_type = "" |
|
with open(get_yaml_path(uid), "r") as f: |
|
config = yaml.load(f, Loader=yaml.FullLoader) |
|
inference_type = config.get("inference_type", "") |
|
return inference_type |
|
|
|
|
|
|
|
def write_inference_type(use_inference, inference_token, uid): |
|
with open(get_yaml_path(uid), "r") as f: |
|
config = yaml.load(f, Loader=yaml.FullLoader) |
|
if use_inference: |
|
config["inference_type"] = "hf_inference_api" |
|
config["inference_token"] = inference_token |
|
else: |
|
config["inference_type"] = "hf_pipeline" |
|
|
|
config["inference_token"] = "" |
|
|
|
with open(get_yaml_path(uid), "w") as f: |
|
yaml.dump(config, f, Dumper=Dumper) |
|
|
|
|
|
|
|
def read_column_mapping(uid): |
|
column_mapping = {} |
|
with open(get_yaml_path(uid), "r") as f: |
|
config = yaml.load(f, Loader=yaml.FullLoader) |
|
if config: |
|
column_mapping = config.get("column_mapping", dict()) |
|
return column_mapping |
|
|
|
|
|
|
|
def write_column_mapping(mapping, uid): |
|
with open(get_yaml_path(uid), "r") as f: |
|
config = yaml.load(f, Loader=yaml.FullLoader) |
|
|
|
if config is None: |
|
return |
|
if mapping is None and "column_mapping" in config.keys(): |
|
del config["column_mapping"] |
|
else: |
|
config["column_mapping"] = mapping |
|
with open(get_yaml_path(uid), "w") as f: |
|
|
|
yaml.dump(config, f, Dumper=Dumper, sort_keys=False) |
|
|
|
|
|
|
|
def convert_column_mapping_to_json(df, label=""): |
|
column_mapping = {} |
|
column_mapping[label] = [] |
|
for _, row in df.iterrows(): |
|
column_mapping[label].append(row.tolist()) |
|
return column_mapping |
|
|
|
|
|
def get_log_file_with_uid(uid): |
|
try: |
|
print(f"Loading {uid}.log") |
|
with open(f"./tmp/{uid}.log", "a") as file: |
|
return file.read() |
|
except Exception: |
|
return "Log file does not exist" |
|
|
|
|
|
def get_logs_file(): |
|
try: |
|
with open(LOG_FILE, "r") as file: |
|
return file.read() |
|
except Exception: |
|
return "Log file does not exist" |
|
|
|
|
|
def write_log_to_user_file(task_id, log): |
|
with open(f"./tmp/{task_id}.log", "a") as f: |
|
f.write(log) |
|
|