litagin's picture
init
2916d61
import json
import sys
import threading
import traceback
from pathlib import Path
from typing import Dict, List, Optional
from uuid import UUID, uuid4
import numpy as np
import pyopenjtalk
from fastapi import HTTPException
from .word_model import UserDictWord, WordTypes
# from ..utility.mutex_utility import mutex_wrapper
# from ..utility.path_utility import engine_root, get_save_dir
from .part_of_speech_data import MAX_PRIORITY, MIN_PRIORITY, part_of_speech_data
from common.constants import USER_DICT_DIR
# root_dir = engine_root()
# save_dir = get_save_dir()
root_dir = Path(USER_DICT_DIR)
save_dir = Path(USER_DICT_DIR)
if not save_dir.is_dir():
save_dir.mkdir(parents=True)
default_dict_path = root_dir / "default.csv" # VOICEVOXデフォルト辞書ファイルのパス
user_dict_path = save_dir / "user_dict.json" # ユーザー辞書ファイルのパス
compiled_dict_path = save_dir / "user.dic" # コンパイル済み辞書ファイルのパス
# # 同時書き込みの制御
# mutex_user_dict = threading.Lock()
# mutex_openjtalk_dict = threading.Lock()
# @mutex_wrapper(mutex_user_dict)
def _write_to_json(user_dict: Dict[str, UserDictWord], user_dict_path: Path) -> None:
"""
ユーザー辞書ファイルへのユーザー辞書データ書き込み
Parameters
----------
user_dict : Dict[str, UserDictWord]
ユーザー辞書データ
user_dict_path : Path
ユーザー辞書ファイルのパス
"""
converted_user_dict = {}
for word_uuid, word in user_dict.items():
word_dict = word.dict()
word_dict["cost"] = _priority2cost(
word_dict["context_id"], word_dict["priority"]
)
del word_dict["priority"]
converted_user_dict[word_uuid] = word_dict
# 予めjsonに変換できることを確かめる
user_dict_json = json.dumps(converted_user_dict, ensure_ascii=False)
# ユーザー辞書ファイルへの書き込み
user_dict_path.write_text(user_dict_json, encoding="utf-8")
# @mutex_wrapper(mutex_openjtalk_dict)
def update_dict(
default_dict_path: Path = default_dict_path,
user_dict_path: Path = user_dict_path,
compiled_dict_path: Path = compiled_dict_path,
) -> None:
"""
辞書の更新
Parameters
----------
default_dict_path : Path
デフォルト辞書ファイルのパス
user_dict_path : Path
ユーザー辞書ファイルのパス
compiled_dict_path : Path
コンパイル済み辞書ファイルのパス
"""
random_string = uuid4()
tmp_csv_path = compiled_dict_path.with_suffix(
f".dict_csv-{random_string}.tmp"
) # csv形式辞書データの一時保存ファイル
tmp_compiled_path = compiled_dict_path.with_suffix(
f".dict_compiled-{random_string}.tmp"
) # コンパイル済み辞書データの一時保存ファイル
try:
# 辞書.csvを作成
csv_text = ""
# デフォルト辞書データの追加
if not default_dict_path.is_file():
print("Warning: Cannot find default dictionary.", file=sys.stderr)
return
default_dict = default_dict_path.read_text(encoding="utf-8")
if default_dict == default_dict.rstrip():
default_dict += "\n"
csv_text += default_dict
# ユーザー辞書データの追加
user_dict = read_dict(user_dict_path=user_dict_path)
for word_uuid in user_dict:
word = user_dict[word_uuid]
csv_text += (
"{surface},{context_id},{context_id},{cost},{part_of_speech},"
+ "{part_of_speech_detail_1},{part_of_speech_detail_2},"
+ "{part_of_speech_detail_3},{inflectional_type},"
+ "{inflectional_form},{stem},{yomi},{pronunciation},"
+ "{accent_type}/{mora_count},{accent_associative_rule}\n"
).format(
surface=word.surface,
context_id=word.context_id,
cost=_priority2cost(word.context_id, word.priority),
part_of_speech=word.part_of_speech,
part_of_speech_detail_1=word.part_of_speech_detail_1,
part_of_speech_detail_2=word.part_of_speech_detail_2,
part_of_speech_detail_3=word.part_of_speech_detail_3,
inflectional_type=word.inflectional_type,
inflectional_form=word.inflectional_form,
stem=word.stem,
yomi=word.yomi,
pronunciation=word.pronunciation,
accent_type=word.accent_type,
mora_count=word.mora_count,
accent_associative_rule=word.accent_associative_rule,
)
# 辞書データを辞書.csv へ一時保存
tmp_csv_path.write_text(csv_text, encoding="utf-8")
# 辞書.csvをOpenJTalk用にコンパイル
# pyopenjtalk.create_user_dict(str(tmp_csv_path), str(tmp_compiled_path))
pyopenjtalk.mecab_dict_index(str(tmp_csv_path), str(tmp_compiled_path))
if not tmp_compiled_path.is_file():
raise RuntimeError("辞書のコンパイル時にエラーが発生しました。")
# コンパイル済み辞書の置き換え・読み込み
pyopenjtalk.unset_user_dict()
tmp_compiled_path.replace(compiled_dict_path)
if compiled_dict_path.is_file():
# pyopenjtalk.set_user_dict(str(compiled_dict_path.resolve(strict=True)))
pyopenjtalk.update_global_jtalk_with_user_dict(str(compiled_dict_path))
except Exception as e:
print("Error: Failed to update dictionary.", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
raise e
finally:
# 後処理
if tmp_csv_path.exists():
tmp_csv_path.unlink()
if tmp_compiled_path.exists():
tmp_compiled_path.unlink()
# @mutex_wrapper(mutex_user_dict)
def read_dict(user_dict_path: Path = user_dict_path) -> Dict[str, UserDictWord]:
"""
ユーザー辞書の読み出し
Parameters
----------
user_dict_path : Path
ユーザー辞書ファイルのパス
Returns
-------
result : Dict[str, UserDictWord]
ユーザー辞書
"""
# 指定ユーザー辞書が存在しない場合、空辞書を返す
if not user_dict_path.is_file():
return {}
with user_dict_path.open(encoding="utf-8") as f:
result: Dict[str, UserDictWord] = {}
for word_uuid, word in json.load(f).items():
# cost2priorityで変換を行う際にcontext_idが必要となるが、
# 0.12以前の辞書は、context_idがハードコーディングされていたためにユーザー辞書内に保管されていない
# ハードコーディングされていたcontext_idは固有名詞を意味するものなので、固有名詞のcontext_idを補完する
if word.get("context_id") is None:
word["context_id"] = part_of_speech_data[
WordTypes.PROPER_NOUN
].context_id
word["priority"] = _cost2priority(word["context_id"], word["cost"])
del word["cost"]
result[str(UUID(word_uuid))] = UserDictWord(**word)
return result
def _create_word(
surface: str,
pronunciation: str,
accent_type: int,
word_type: Optional[WordTypes] = None,
priority: Optional[int] = None,
) -> UserDictWord:
"""
単語オブジェクトの生成
Parameters
----------
surface : str
単語情報
pronunciation : str
単語情報
accent_type : int
単語情報
word_type : Optional[WordTypes]
品詞
priority : Optional[int]
優先度
Returns
-------
: UserDictWord
単語オブジェクト
"""
if word_type is None:
word_type = WordTypes.PROPER_NOUN
if word_type not in part_of_speech_data.keys():
raise HTTPException(status_code=422, detail="不明な品詞です")
if priority is None:
priority = 5
if not MIN_PRIORITY <= priority <= MAX_PRIORITY:
raise HTTPException(status_code=422, detail="優先度の値が無効です")
pos_detail = part_of_speech_data[word_type]
return UserDictWord(
surface=surface,
context_id=pos_detail.context_id,
priority=priority,
part_of_speech=pos_detail.part_of_speech,
part_of_speech_detail_1=pos_detail.part_of_speech_detail_1,
part_of_speech_detail_2=pos_detail.part_of_speech_detail_2,
part_of_speech_detail_3=pos_detail.part_of_speech_detail_3,
inflectional_type="*",
inflectional_form="*",
stem="*",
yomi=pronunciation,
pronunciation=pronunciation,
accent_type=accent_type,
accent_associative_rule="*",
)
def apply_word(
surface: str,
pronunciation: str,
accent_type: int,
word_type: Optional[WordTypes] = None,
priority: Optional[int] = None,
user_dict_path: Path = user_dict_path,
compiled_dict_path: Path = compiled_dict_path,
) -> str:
"""
新規単語の追加
Parameters
----------
surface : str
単語情報
pronunciation : str
単語情報
accent_type : int
単語情報
word_type : Optional[WordTypes]
品詞
priority : Optional[int]
優先度
user_dict_path : Path
ユーザー辞書ファイルのパス
compiled_dict_path : Path
コンパイル済み辞書ファイルのパス
Returns
-------
word_uuid : UserDictWord
追加された単語に発行されたUUID
"""
# 新規単語の追加による辞書データの更新
word = _create_word(
surface=surface,
pronunciation=pronunciation,
accent_type=accent_type,
word_type=word_type,
priority=priority,
)
user_dict = read_dict(user_dict_path=user_dict_path)
word_uuid = str(uuid4())
user_dict[word_uuid] = word
# 更新された辞書データの保存と適用
_write_to_json(user_dict, user_dict_path)
update_dict(user_dict_path=user_dict_path, compiled_dict_path=compiled_dict_path)
return word_uuid
def rewrite_word(
word_uuid: str,
surface: str,
pronunciation: str,
accent_type: int,
word_type: Optional[WordTypes] = None,
priority: Optional[int] = None,
user_dict_path: Path = user_dict_path,
compiled_dict_path: Path = compiled_dict_path,
) -> None:
"""
既存単語の上書き更新
Parameters
----------
word_uuid : str
単語UUID
surface : str
単語情報
pronunciation : str
単語情報
accent_type : int
単語情報
word_type : Optional[WordTypes]
品詞
priority : Optional[int]
優先度
user_dict_path : Path
ユーザー辞書ファイルのパス
compiled_dict_path : Path
コンパイル済み辞書ファイルのパス
"""
word = _create_word(
surface=surface,
pronunciation=pronunciation,
accent_type=accent_type,
word_type=word_type,
priority=priority,
)
# 既存単語の上書きによる辞書データの更新
user_dict = read_dict(user_dict_path=user_dict_path)
if word_uuid not in user_dict:
raise HTTPException(
status_code=422, detail="UUIDに該当するワードが見つかりませんでした"
)
user_dict[word_uuid] = word
# 更新された辞書データの保存と適用
_write_to_json(user_dict, user_dict_path)
update_dict(user_dict_path=user_dict_path, compiled_dict_path=compiled_dict_path)
def delete_word(
word_uuid: str,
user_dict_path: Path = user_dict_path,
compiled_dict_path: Path = compiled_dict_path,
) -> None:
"""
単語の削除
Parameters
----------
word_uuid : str
単語UUID
user_dict_path : Path
ユーザー辞書ファイルのパス
compiled_dict_path : Path
コンパイル済み辞書ファイルのパス
"""
# 既存単語の削除による辞書データの更新
user_dict = read_dict(user_dict_path=user_dict_path)
if word_uuid not in user_dict:
raise HTTPException(
status_code=422, detail="IDに該当するワードが見つかりませんでした"
)
del user_dict[word_uuid]
# 更新された辞書データの保存と適用
_write_to_json(user_dict, user_dict_path)
update_dict(user_dict_path=user_dict_path, compiled_dict_path=compiled_dict_path)
def import_user_dict(
dict_data: Dict[str, UserDictWord],
override: bool = False,
user_dict_path: Path = user_dict_path,
default_dict_path: Path = default_dict_path,
compiled_dict_path: Path = compiled_dict_path,
) -> None:
"""
ユーザー辞書のインポート
Parameters
----------
dict_data : Dict[str, UserDictWord]
インポートするユーザー辞書のデータ
override : bool
重複したエントリがあった場合、上書きするかどうか
user_dict_path : Path
ユーザー辞書ファイルのパス
default_dict_path : Path
デフォルト辞書ファイルのパス
compiled_dict_path : Path
コンパイル済み辞書ファイルのパス
"""
# インポートする辞書データのバリデーション
for word_uuid, word in dict_data.items():
UUID(word_uuid)
assert isinstance(word, UserDictWord)
for pos_detail in part_of_speech_data.values():
if word.context_id == pos_detail.context_id:
assert word.part_of_speech == pos_detail.part_of_speech
assert (
word.part_of_speech_detail_1 == pos_detail.part_of_speech_detail_1
)
assert (
word.part_of_speech_detail_2 == pos_detail.part_of_speech_detail_2
)
assert (
word.part_of_speech_detail_3 == pos_detail.part_of_speech_detail_3
)
assert (
word.accent_associative_rule in pos_detail.accent_associative_rules
)
break
else:
raise ValueError("対応していない品詞です")
# 既存辞書の読み出し
old_dict = read_dict(user_dict_path=user_dict_path)
# 辞書データの更新
# 重複エントリの上書き
if override:
new_dict = {**old_dict, **dict_data}
# 重複エントリの保持
else:
new_dict = {**dict_data, **old_dict}
# 更新された辞書データの保存と適用
_write_to_json(user_dict=new_dict, user_dict_path=user_dict_path)
update_dict(
default_dict_path=default_dict_path,
user_dict_path=user_dict_path,
compiled_dict_path=compiled_dict_path,
)
def _search_cost_candidates(context_id: int) -> List[int]:
for value in part_of_speech_data.values():
if value.context_id == context_id:
return value.cost_candidates
raise HTTPException(status_code=422, detail="品詞IDが不正です")
def _cost2priority(context_id: int, cost: int) -> int:
assert -32768 <= cost <= 32767
cost_candidates = _search_cost_candidates(context_id)
# cost_candidatesの中にある値で最も近い値を元にpriorityを返す
# 参考: https://qiita.com/Krypf/items/2eada91c37161d17621d
# この関数とpriority2cost関数によって、辞書ファイルのcostを操作しても最も近いpriorityのcostに上書きされる
return MAX_PRIORITY - np.argmin(np.abs(np.array(cost_candidates) - cost)).item()
def _priority2cost(context_id: int, priority: int) -> int:
assert MIN_PRIORITY <= priority <= MAX_PRIORITY
cost_candidates = _search_cost_candidates(context_id)
return cost_candidates[MAX_PRIORITY - priority]