soni / soni_translate /text_multiformat_processor.py
r3gm's picture
v0.5.0
b152010
from .logging_setup import logger
from whisperx.utils import get_writer
from .utils import remove_files, run_command, remove_directory_contents
from typing import List
import srt
import re
import os
import copy
import string
import soundfile as sf
from PIL import Image, ImageOps, ImageDraw, ImageFont
punctuation_list = list(
string.punctuation + "¡¿«»„”“”‚‘’「」『』《》()【】〈〉〔〕〖〗〘〙〚〛⸤⸥⸨⸩"
)
symbol_list = punctuation_list + ["", "..", "..."]
def extract_from_srt(file_path):
with open(file_path, "r", encoding="utf-8") as file:
srt_content = file.read()
subtitle_generator = srt.parse(srt_content)
srt_content_list = list(subtitle_generator)
return srt_content_list
def clean_text(text):
# Remove content within square brackets
text = re.sub(r'\[.*?\]', '', text)
# Add pattern to remove content within <comment> tags
text = re.sub(r'<comment>.*?</comment>', '', text)
# Remove HTML tags
text = re.sub(r'<.*?>', '', text)
# Remove "♫" and "♪" content
text = re.sub(r'♫.*?♫', '', text)
text = re.sub(r'♪.*?♪', '', text)
# Replace newline characters with an empty string
text = text.replace("\n", ". ")
# Remove double quotation marks
text = text.replace('"', '')
# Collapse multiple spaces and replace with a single space
text = re.sub(r"\s+", " ", text)
# Normalize spaces around periods
text = re.sub(r"[\s\.]+(?=\s)", ". ", text)
# Check if there are ♫ or ♪ symbols present
if '♫' in text or '♪' in text:
return ""
text = text.strip()
# Valid text
return text if text not in symbol_list else ""
def srt_file_to_segments(file_path, speaker=False):
try:
srt_content_list = extract_from_srt(file_path)
except Exception as error:
logger.error(str(error))
fixed_file = "fixed_sub.srt"
remove_files(fixed_file)
fix_sub = f'ffmpeg -i "{file_path}" "{fixed_file}" -y'
run_command(fix_sub)
srt_content_list = extract_from_srt(fixed_file)
segments = []
for segment in srt_content_list:
text = clean_text(str(segment.content))
if text:
segments.append(
{
"text": text,
"start": float(segment.start.total_seconds()),
"end": float(segment.end.total_seconds()),
}
)
if not segments:
raise Exception("No data found in srt subtitle file")
if speaker:
segments = [{**seg, "speaker": "SPEAKER_00"} for seg in segments]
return {"segments": segments}
# documents
def dehyphenate(lines: List[str], line_no: int) -> List[str]:
next_line = lines[line_no + 1]
word_suffix = next_line.split(" ")[0]
lines[line_no] = lines[line_no][:-1] + word_suffix
lines[line_no + 1] = lines[line_no + 1][len(word_suffix):]
return lines
def remove_hyphens(text: str) -> str:
"""
This fails for:
* Natural dashes: well-known, self-replication, use-cases, non-semantic,
Post-processing, Window-wise, viewpoint-dependent
* Trailing math operands: 2 - 4
* Names: Lopez-Ferreras, VGG-19, CIFAR-100
"""
lines = [line.rstrip() for line in text.split("\n")]
# Find dashes
line_numbers = []
for line_no, line in enumerate(lines[:-1]):
if line.endswith("-"):
line_numbers.append(line_no)
# Replace
for line_no in line_numbers:
lines = dehyphenate(lines, line_no)
return "\n".join(lines)
def pdf_to_txt(pdf_file, start_page, end_page):
from pypdf import PdfReader
with open(pdf_file, "rb") as file:
reader = PdfReader(file)
logger.debug(f"Total pages: {reader.get_num_pages()}")
text = ""
start_page_idx = max((start_page-1), 0)
end_page_inx = min((end_page), (reader.get_num_pages()))
document_pages = reader.pages[start_page_idx:end_page_inx]
logger.info(
f"Selected pages from {start_page_idx} to {end_page_inx}: "
f"{len(document_pages)}"
)
for page in document_pages:
text += remove_hyphens(page.extract_text())
return text
def docx_to_txt(docx_file):
# https://github.com/AlJohri/docx2pdf update
from docx import Document
doc = Document(docx_file)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
return text
def replace_multiple_elements(text, replacements):
pattern = re.compile("|".join(map(re.escape, replacements.keys())))
replaced_text = pattern.sub(
lambda match: replacements[match.group(0)], text
)
# Remove multiple spaces
replaced_text = re.sub(r"\s+", " ", replaced_text)
return replaced_text
def document_preprocessor(file_path, is_string, start_page, end_page):
if not is_string:
file_ext = os.path.splitext(file_path)[1].lower()
if is_string:
text = file_path
elif file_ext == ".pdf":
text = pdf_to_txt(file_path, start_page, end_page)
elif file_ext == ".docx":
text = docx_to_txt(file_path)
elif file_ext == ".txt":
with open(
file_path, "r", encoding='utf-8', errors='replace'
) as file:
text = file.read()
else:
raise Exception("Unsupported file format")
# Add space to break segments more easily later
replacements = {
"、": "、 ",
"。": "。 ",
# "\n": " ",
}
text = replace_multiple_elements(text, replacements)
# Save text to a .txt file
# file_name = os.path.splitext(os.path.basename(file_path))[0]
txt_file_path = "./text_preprocessor.txt"
with open(
txt_file_path, "w", encoding='utf-8', errors='replace'
) as txt_file:
txt_file.write(text)
return txt_file_path, text
def split_text_into_chunks(text, chunk_size):
words = re.findall(r"\b\w+\b", text)
chunks = []
current_chunk = ""
for word in words:
if (
len(current_chunk) + len(word) + 1 <= chunk_size
): # Adding 1 for the space between words
if current_chunk:
current_chunk += " "
current_chunk += word
else:
chunks.append(current_chunk)
current_chunk = word
if current_chunk:
chunks.append(current_chunk)
return chunks
def determine_chunk_size(file_name):
patterns = {
re.compile(r".*-(Male|Female)$"): 1024, # by character
re.compile(r".* BARK$"): 100, # t 64 256
re.compile(r".* VITS$"): 500,
re.compile(
r".+\.(wav|mp3|ogg|m4a)$"
): 150, # t 250 400 api automatic split
re.compile(r".* VITS-onnx$"): 250, # automatic sentence split
re.compile(r".* OpenAI-TTS$"): 1024 # max charaters 4096
}
for pattern, chunk_size in patterns.items():
if pattern.match(file_name):
return chunk_size
# Default chunk size if the file doesn't match any pattern; max 1800
return 100
def plain_text_to_segments(result_text=None, chunk_size=None):
if not chunk_size:
chunk_size = 100
text_chunks = split_text_into_chunks(result_text, chunk_size)
segments_chunks = []
for num, chunk in enumerate(text_chunks):
chunk_dict = {
"text": chunk,
"start": (1.0 + num),
"end": (2.0 + num),
"speaker": "SPEAKER_00",
}
segments_chunks.append(chunk_dict)
result_diarize = {"segments": segments_chunks}
return result_diarize
def segments_to_plain_text(result_diarize):
complete_text = ""
for seg in result_diarize["segments"]:
complete_text += seg["text"] + " " # issue
# Save text to a .txt file
# file_name = os.path.splitext(os.path.basename(file_path))[0]
txt_file_path = "./text_translation.txt"
with open(
txt_file_path, "w", encoding='utf-8', errors='replace'
) as txt_file:
txt_file.write(complete_text)
return txt_file_path, complete_text
# doc to video
COLORS = {
"black": (0, 0, 0),
"white": (255, 255, 255),
"red": (255, 0, 0),
"green": (0, 255, 0),
"blue": (0, 0, 255),
"yellow": (255, 255, 0),
"light_gray": (200, 200, 200),
"light_blue": (173, 216, 230),
"light_green": (144, 238, 144),
"light_yellow": (255, 255, 224),
"light_pink": (255, 182, 193),
"lavender": (230, 230, 250),
"peach": (255, 218, 185),
"light_cyan": (224, 255, 255),
"light_salmon": (255, 160, 122),
"light_green_yellow": (173, 255, 47),
}
BORDER_COLORS = ["dynamic"] + list(COLORS.keys())
def calculate_average_color(img):
# Resize the image to a small size for faster processing
img_small = img.resize((50, 50))
# Calculate the average color
average_color = img_small.convert("RGB").resize((1, 1)).getpixel((0, 0))
return average_color
def add_border_to_image(
image_path,
target_width,
target_height,
border_color=None
):
img = Image.open(image_path)
# Calculate the width and height for the new image with borders
original_width, original_height = img.size
original_aspect_ratio = original_width / original_height
target_aspect_ratio = target_width / target_height
# Resize the image to fit the target resolution retaining aspect ratio
if original_aspect_ratio > target_aspect_ratio:
# Image is wider, calculate new height
new_height = int(target_width / original_aspect_ratio)
resized_img = img.resize((target_width, new_height))
else:
# Image is taller, calculate new width
new_width = int(target_height * original_aspect_ratio)
resized_img = img.resize((new_width, target_height))
# Calculate padding for borders
padding = (0, 0, 0, 0)
if resized_img.size[0] != target_width or resized_img.size[1] != target_height:
if original_aspect_ratio > target_aspect_ratio:
# Add borders vertically
padding = (0, (target_height - resized_img.size[1]) // 2, 0, (target_height - resized_img.size[1]) // 2)
else:
# Add borders horizontally
padding = ((target_width - resized_img.size[0]) // 2, 0, (target_width - resized_img.size[0]) // 2, 0)
# Add borders with specified color
if not border_color or border_color == "dynamic":
border_color = calculate_average_color(resized_img)
else:
border_color = COLORS.get(border_color, (0, 0, 0))
bordered_img = ImageOps.expand(resized_img, padding, fill=border_color)
bordered_img.save(image_path)
return image_path
def resize_and_position_subimage(
subimage,
max_width,
max_height,
subimage_position,
main_width,
main_height
):
subimage_width, subimage_height = subimage.size
# Resize subimage if it exceeds maximum dimensions
if subimage_width > max_width or subimage_height > max_height:
# Calculate scaling factor
width_scale = max_width / subimage_width
height_scale = max_height / subimage_height
scale = min(width_scale, height_scale)
# Resize subimage
subimage = subimage.resize(
(int(subimage_width * scale), int(subimage_height * scale))
)
# Calculate position to place the subimage
if subimage_position == "top-left":
subimage_x = 0
subimage_y = 0
elif subimage_position == "top-right":
subimage_x = main_width - subimage.width
subimage_y = 0
elif subimage_position == "bottom-left":
subimage_x = 0
subimage_y = main_height - subimage.height
elif subimage_position == "bottom-right":
subimage_x = main_width - subimage.width
subimage_y = main_height - subimage.height
else:
raise ValueError(
"Invalid subimage_position. Choose from 'top-left', 'top-right',"
" 'bottom-left', or 'bottom-right'."
)
return subimage, subimage_x, subimage_y
def create_image_with_text_and_subimages(
text,
subimages,
width,
height,
text_color,
background_color,
output_file
):
# Create an image with the specified resolution and background color
image = Image.new('RGB', (width, height), color=background_color)
# Initialize ImageDraw object
draw = ImageDraw.Draw(image)
# Load a font
font = ImageFont.load_default() # You can specify your font file here
# Calculate text size and position
text_bbox = draw.textbbox((0, 0), text, font=font)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
text_x = (width - text_width) / 2
text_y = (height - text_height) / 2
# Draw text on the image
draw.text((text_x, text_y), text, fill=text_color, font=font)
# Paste subimages onto the main image
for subimage_path, subimage_position in subimages:
# Open the subimage
subimage = Image.open(subimage_path)
# Convert subimage to RGBA mode if it doesn't have an alpha channel
if subimage.mode != 'RGBA':
subimage = subimage.convert('RGBA')
# Resize and position the subimage
subimage, subimage_x, subimage_y = resize_and_position_subimage(
subimage, width / 4, height / 4, subimage_position, width, height
)
# Paste the subimage onto the main image
image.paste(subimage, (int(subimage_x), int(subimage_y)), subimage)
image.save(output_file)
return output_file
def doc_to_txtximg_pages(
document,
width,
height,
start_page,
end_page,
bcolor
):
from pypdf import PdfReader
images_folder = "pdf_images/"
os.makedirs(images_folder, exist_ok=True)
remove_directory_contents(images_folder)
# First image
text_image = os.path.basename(document)[:-4]
subimages = [("./assets/logo.jpeg", "top-left")]
text_color = (255, 255, 255) if bcolor == "black" else (0, 0, 0) # w|b
background_color = COLORS.get(bcolor, (255, 255, 255)) # dynamic white
first_image = "pdf_images/0000_00_aaa.png"
create_image_with_text_and_subimages(
text_image,
subimages,
width,
height,
text_color,
background_color,
first_image
)
reader = PdfReader(document)
logger.debug(f"Total pages: {reader.get_num_pages()}")
start_page_idx = max((start_page-1), 0)
end_page_inx = min((end_page), (reader.get_num_pages()))
document_pages = reader.pages[start_page_idx:end_page_inx]
logger.info(
f"Selected pages from {start_page_idx} to {end_page_inx}: "
f"{len(document_pages)}"
)
data_doc = {}
for i, page in enumerate(document_pages):
count = 0
images = []
for image_file_object in page.images:
img_name = f"{images_folder}{i:04d}_{count:02d}_{image_file_object.name}"
images.append(img_name)
with open(img_name, "wb") as fp:
fp.write(image_file_object.data)
count += 1
img_name = add_border_to_image(img_name, width, height, bcolor)
data_doc[i] = {
"text": remove_hyphens(page.extract_text()),
"images": images
}
return data_doc
def page_data_to_segments(result_text=None, chunk_size=None):
if not chunk_size:
chunk_size = 100
segments_chunks = []
time_global = 0
for page, result_data in result_text.items():
# result_image = result_data["images"]
result_text = result_data["text"]
text_chunks = split_text_into_chunks(result_text, chunk_size)
if not text_chunks:
text_chunks = [" "]
for chunk in text_chunks:
chunk_dict = {
"text": chunk,
"start": (1.0 + time_global),
"end": (2.0 + time_global),
"speaker": "SPEAKER_00",
"page": page,
}
segments_chunks.append(chunk_dict)
time_global += 1
result_diarize = {"segments": segments_chunks}
return result_diarize
def update_page_data(result_diarize, doc_data):
complete_text = ""
current_page = result_diarize["segments"][0]["page"]
text_page = ""
for seg in result_diarize["segments"]:
text = seg["text"] + " " # issue
complete_text += text
page = seg["page"]
if page == current_page:
text_page += text
else:
doc_data[current_page]["text"] = text_page
# Next
text_page = text
current_page = page
if doc_data[current_page]["text"] != text_page:
doc_data[current_page]["text"] = text_page
return doc_data
def fix_timestamps_docs(result_diarize, audio_files):
current_start = 0.0
for seg, audio in zip(result_diarize["segments"], audio_files):
duration = round(sf.info(audio).duration, 2)
seg["start"] = current_start
current_start += duration
seg["end"] = current_start
return result_diarize
def create_video_from_images(
doc_data,
result_diarize
):
# First image path
first_image = "pdf_images/0000_00_aaa.png"
# Time segments and images
max_pages_idx = len(doc_data) - 1
current_page = result_diarize["segments"][0]["page"]
duration_page = 0.0
last_image = None
for seg in result_diarize["segments"]:
start = seg["start"]
end = seg["end"]
duration_seg = end - start
page = seg["page"]
if page == current_page:
duration_page += duration_seg
else:
images = doc_data[current_page]["images"]
if first_image:
images = [first_image] + images
first_image = None
if not doc_data[min(max_pages_idx, (current_page+1))]["text"].strip():
images = images + doc_data[min(max_pages_idx, (current_page+1))]["images"]
if not images and last_image:
images = [last_image]
# Calculate images duration
time_duration_per_image = round((duration_page / len(images)), 2)
doc_data[current_page]["time_per_image"] = time_duration_per_image
# Next values
doc_data[current_page]["images"] = images
last_image = images[-1]
duration_page = duration_seg
current_page = page
if "time_per_image" not in doc_data[current_page].keys():
images = doc_data[current_page]["images"]
if first_image:
images = [first_image] + images
if not images:
images = [last_image]
time_duration_per_image = round((duration_page / len(images)), 2)
doc_data[current_page]["time_per_image"] = time_duration_per_image
# Timestamped image video.
with open("list.txt", "w") as file:
for i, page in enumerate(doc_data.values()):
duration = page["time_per_image"]
for img in page["images"]:
if i == len(doc_data) - 1 and img == page["images"][-1]: # Check if it's the last item
file.write(f"file {img}\n")
file.write(f"outpoint {duration}")
else:
file.write(f"file {img}\n")
file.write(f"outpoint {duration}\n")
out_video = "video_from_images.mp4"
remove_files(out_video)
cm = f"ffmpeg -y -f concat -i list.txt -c:v libx264 -preset veryfast -crf 18 -pix_fmt yuv420p {out_video}"
cm_alt = f"ffmpeg -f concat -i list.txt -c:v libx264 -r 30 -pix_fmt yuv420p -y {out_video}"
try:
run_command(cm)
except Exception as error:
logger.error(str(error))
remove_files(out_video)
run_command(cm_alt)
return out_video
def merge_video_and_audio(video_doc, final_wav_file):
fixed_audio = "fixed_audio.mp3"
remove_files(fixed_audio)
cm = f"ffmpeg -i {final_wav_file} -c:a libmp3lame {fixed_audio}"
run_command(cm)
vid_out = "video_book.mp4"
remove_files(vid_out)
cm = f"ffmpeg -i {video_doc} -i {fixed_audio} -c:v copy -c:a copy -map 0:v -map 1:a -shortest {vid_out}"
run_command(cm)
return vid_out
# subtitles
def get_subtitle(
language,
segments_data,
extension,
filename=None,
highlight_words=False,
):
if not filename:
filename = "task_subtitle"
is_ass_extension = False
if extension == "ass":
is_ass_extension = True
extension = "srt"
sub_file = filename + "." + extension
support_name = filename + ".mp3"
remove_files(sub_file)
writer = get_writer(extension, output_dir=".")
word_options = {
"highlight_words": highlight_words,
"max_line_count": None,
"max_line_width": None,
}
# Get data subs
subtitle_data = copy.deepcopy(segments_data)
subtitle_data["language"] = (
"ja" if language in ["ja", "zh", "zh-TW"] else language
)
# Clean
if not highlight_words:
subtitle_data.pop("word_segments", None)
for segment in subtitle_data["segments"]:
for key in ["speaker", "chars", "words"]:
segment.pop(key, None)
writer(
subtitle_data,
support_name,
word_options,
)
if is_ass_extension:
temp_name = filename + ".ass"
remove_files(temp_name)
convert_sub = f'ffmpeg -i "{sub_file}" "{temp_name}" -y'
run_command(convert_sub)
sub_file = temp_name
return sub_file
def process_subtitles(
deep_copied_result,
align_language,
result_diarize,
output_format_subtitle,
TRANSLATE_AUDIO_TO,
):
name_ori = "sub_ori."
name_tra = "sub_tra."
remove_files(
[name_ori + output_format_subtitle, name_tra + output_format_subtitle]
)
writer = get_writer(output_format_subtitle, output_dir=".")
word_options = {
"highlight_words": False,
"max_line_count": None,
"max_line_width": None,
}
# original lang
subs_copy_result = copy.deepcopy(deep_copied_result)
subs_copy_result["language"] = (
"zh" if align_language == "zh-TW" else align_language
)
for segment in subs_copy_result["segments"]:
segment.pop("speaker", None)
try:
writer(
subs_copy_result,
name_ori[:-1] + ".mp3",
word_options,
)
except Exception as error:
logger.error(str(error))
if str(error) == "list indices must be integers or slices, not str":
logger.error(
"Related to poor word segmentation"
" in segments after alignment."
)
subs_copy_result["segments"][0].pop("words")
writer(
subs_copy_result,
name_ori[:-1] + ".mp3",
word_options,
)
# translated lang
subs_tra_copy_result = copy.deepcopy(result_diarize)
subs_tra_copy_result["language"] = (
"ja" if TRANSLATE_AUDIO_TO in ["ja", "zh", "zh-TW"] else align_language
)
subs_tra_copy_result.pop("word_segments", None)
for segment in subs_tra_copy_result["segments"]:
for key in ["speaker", "chars", "words"]:
segment.pop(key, None)
writer(
subs_tra_copy_result,
name_tra[:-1] + ".mp3",
word_options,
)
return name_tra + output_format_subtitle
def linguistic_level_segments(
result_base,
linguistic_unit="word", # word or char
):
linguistic_unit = linguistic_unit[:4]
linguistic_unit_key = linguistic_unit + "s"
result = copy.deepcopy(result_base)
if linguistic_unit_key not in result["segments"][0].keys():
raise ValueError("No alignment detected, can't process")
segments_by_unit = []
for segment in result["segments"]:
segment_units = segment[linguistic_unit_key]
# segment_speaker = segment.get("speaker", "SPEAKER_00")
for unit in segment_units:
text = unit[linguistic_unit]
if "start" in unit.keys():
segments_by_unit.append(
{
"start": unit["start"],
"end": unit["end"],
"text": text,
# "speaker": segment_speaker,
}
)
elif not segments_by_unit:
pass
else:
segments_by_unit[-1]["text"] += text
return {"segments": segments_by_unit}
def break_aling_segments(
result: dict,
break_characters: str = "", # ":|,|.|"
):
result_align = copy.deepcopy(result)
break_characters_list = break_characters.split("|")
break_characters_list = [i for i in break_characters_list if i != '']
if not break_characters_list:
logger.info("No valid break characters were specified.")
return result
logger.info(f"Redivide text segments by: {str(break_characters_list)}")
# create new with filters
normal = []
def process_chars(chars, letter_new_start, num, text):
start_key, end_key = "start", "end"
start_value = end_value = None
for char in chars:
if start_key in char:
start_value = char[start_key]
break
for char in reversed(chars):
if end_key in char:
end_value = char[end_key]
break
if not start_value or not end_value:
raise Exception(
f"Unable to obtain a valid timestamp for chars: {str(chars)}"
)
return {
"start": start_value,
"end": end_value,
"text": text,
"words": chars,
}
for i, segment in enumerate(result_align['segments']):
logger.debug(f"- Process segment: {i}, text: {segment['text']}")
# start = segment['start']
letter_new_start = 0
for num, char in enumerate(segment['chars']):
if char["char"] is None:
continue
# if "start" in char:
# start = char["start"]
# if "end" in char:
# end = char["end"]
# Break by character
if char['char'] in break_characters_list:
text = segment['text'][letter_new_start:num+1]
logger.debug(
f"Break in: {char['char']}, position: {num}, text: {text}"
)
chars = segment['chars'][letter_new_start:num+1]
if not text:
logger.debug("No text")
continue
if num == 0 and not text.strip():
logger.debug("blank space in start")
continue
if len(text) == 1:
logger.debug(f"Short char append, num: {num}")
normal[-1]["text"] += text
normal[-1]["words"].append(chars)
continue
# logger.debug(chars)
normal_dict = process_chars(chars, letter_new_start, num, text)
letter_new_start = num+1
normal.append(normal_dict)
# If we reach the end of the segment, add the last part of chars.
if num == len(segment["chars"]) - 1:
text = segment['text'][letter_new_start:num+1]
# If remain text len is not default len text
if num not in [len(text)-1, len(text)] and text:
logger.debug(f'Remaining text: {text}')
if not text:
logger.debug("No remaining text.")
continue
if len(text) == 1:
logger.debug(f"Short char append, num: {num}")
normal[-1]["text"] += text
normal[-1]["words"].append(chars)
continue
chars = segment['chars'][letter_new_start:num+1]
normal_dict = process_chars(chars, letter_new_start, num, text)
letter_new_start = num+1
normal.append(normal_dict)
# Rename char to word
for item in normal:
words_list = item['words']
for word_item in words_list:
if 'char' in word_item:
word_item['word'] = word_item.pop('char')
# Convert to dict default
break_segments = {"segments": normal}
msg_count = (
f"Segment count before: {len(result['segments'])}, "
f"after: {len(break_segments['segments'])}."
)
logger.info(msg_count)
return break_segments