Kpenciler's picture
Upload 53 files
88435ed verified
raw
history blame
2.48 kB
import csv
import glob
import json
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Callable, TypeVar
_T = TypeVar("_T")
def execute_parallel(func: Callable[..., _T], kwargs_list: list[dict[str, Any]], max_workers: int) -> list[_T]:
"""並行処理を行う
Args:
func (Callable): 並行処理したい関数
kwargs_list (list[dict[str, Any]]): 関数の引数(dict型)のリスト
max_workers (int): 並行処理数
Returns:
list[Any]: 関数の戻り値のリスト
"""
response_list: list[Future[_T]] = []
with ThreadPoolExecutor(max_workers=max_workers) as e:
for kwargs in kwargs_list:
response: Future[_T] = e.submit(func, **kwargs)
response_list.append(response)
return [r.result() for r in response_list]
def _load_json_file(file_path: str) -> Any:
# TODO: Docstring追加
with open(file_path, "r", encoding="utf-8") as json_file:
data = json.load(json_file)
return data
def make_log_csv(log_dir: str, csv_file_name: str = "log.csv") -> None:
"""ログデータのcsvを保存
Args:
log_dir (str): ログデータが保存されているディレクトリ
csv_file_name (str, optional): 保存するcsvファイル名. Defaults to "log.csv".
"""
# ディレクトリ内のJSONファイルのリストを取得
# TODO: エラーキャッチ
json_files = sorted([f for f in glob.glob(f"{log_dir}/*.json")], key=lambda x: int(x.split("/")[-1].split(".")[0]))
# すべてのJSONファイルからユニークなキーを取得
columns = []
data_list: list[dict[Any, Any]] = []
keys_set = set()
for json_file in json_files:
data = _load_json_file(json_file)
if isinstance(data, dict):
for key in data.keys():
if key not in keys_set:
keys_set.add(key)
columns.append(key)
data_list.append(data)
# CSVファイルを作成し、ヘッダーを書き込む
with open(csv_file_name, "w", encoding="utf-8", newline="") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(columns)
# JSONファイルからデータを読み取り、CSVファイルに書き込む
for data in data_list:
row = [data.get(key, "") for key in columns]
writer.writerow(row)
print(f"saved csv file: {csv_file_name}")