|
import os |
|
import re |
|
import gradio as gr |
|
import requests |
|
import nest_asyncio |
|
import sys |
|
import boto3 |
|
|
|
from pathlib import Path |
|
from bs4 import BeautifulSoup |
|
from llama_index.core import ( |
|
Settings, |
|
) |
|
|
|
from llama_index.retrievers.bm25 import BM25Retriever |
|
from llama_index.core.retrievers import QueryFusionRetriever |
|
|
|
|
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
|
|
Settings.similarity_top_k = 20 |
|
|
|
|
|
BUCKET_NAME = "legal-position" |
|
PREFIX_RETRIEVER = "Save_Index/" |
|
LOCAL_DIR = Path("Save_Index_Local") |
|
|
|
|
|
|
|
s3_client = boto3.client( |
|
"s3", |
|
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), |
|
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), |
|
region_name="eu-north-1" |
|
) |
|
|
|
|
|
LOCAL_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
def download_s3_file(bucket_name, s3_key, local_path): |
|
s3_client.download_file(bucket_name, s3_key, str(local_path)) |
|
print(f"Завантажено: {s3_key} -> {local_path}") |
|
|
|
|
|
def download_s3_folder(bucket_name, prefix, local_dir): |
|
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) |
|
if 'Contents' in response: |
|
for obj in response['Contents']: |
|
s3_key = obj['Key'] |
|
|
|
if s3_key.endswith('/'): |
|
continue |
|
|
|
local_file_path = local_dir / Path(s3_key).relative_to(prefix) |
|
local_file_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
s3_client.download_file(bucket_name, s3_key, str(local_file_path)) |
|
print(f"Завантажено: {s3_key} -> {local_file_path}") |
|
|
|
|
|
download_s3_folder(BUCKET_NAME, PREFIX_RETRIEVER, LOCAL_DIR) |
|
|
|
|
|
nest_asyncio.apply() |
|
|
|
state_nodes = gr.State() |
|
|
|
|
|
def parse_doc_ids(doc_ids): |
|
if doc_ids is None: |
|
return [] |
|
if isinstance(doc_ids, list): |
|
return [str(id).strip('[]') for id in doc_ids] |
|
if isinstance(doc_ids, str): |
|
cleaned = doc_ids.strip('[]').replace(' ', '') |
|
if cleaned: |
|
return [id.strip() for id in cleaned.split(',')] |
|
return [] |
|
|
|
def get_links_html(doc_ids): |
|
parsed_ids = parse_doc_ids(doc_ids) |
|
if not parsed_ids: |
|
return "" |
|
links = [f"[Рішення ВС: {doc_id}](https://reyestr.court.gov.ua/Review/{doc_id})" |
|
for doc_id in parsed_ids] |
|
return ", ".join(links) |
|
|
|
def parse_lp_ids(lp_ids): |
|
if lp_ids is None: |
|
return [] |
|
if isinstance(lp_ids, (str, int)): |
|
cleaned = str(lp_ids).strip('[]').replace(' ', '') |
|
if cleaned: |
|
return [cleaned] |
|
return [] |
|
|
|
def get_links_html_lp(lp_ids): |
|
parsed_ids = parse_lp_ids(lp_ids) |
|
if not parsed_ids: |
|
return "" |
|
links = [f"[Правова позиція ВС: {lp_id}](https://lpd.court.gov.ua/home/search/{lp_id})" for lp_id in parsed_ids] |
|
return ", ".join(links) |
|
|
|
|
|
def initialize_components(): |
|
try: |
|
persist_path = Path("Save_Index_Local") |
|
|
|
if not persist_path.exists(): |
|
raise FileNotFoundError(f"Directory not found: {persist_path}") |
|
|
|
required_files = ['docstore_es_filter.json', 'bm25_retriever_es'] |
|
missing_files = [f for f in required_files if not (persist_path / f).exists()] |
|
|
|
if missing_files: |
|
raise FileNotFoundError(f"Missing required files: {', '.join(missing_files)}") |
|
|
|
global retriever_bm25 |
|
|
|
bm25_retriever = BM25Retriever.from_persist_dir(str(persist_path / "bm25_retriever_es")) |
|
|
|
retriever_bm25 = QueryFusionRetriever( |
|
[ |
|
bm25_retriever, |
|
], |
|
similarity_top_k=Settings.similarity_top_k, |
|
num_queries=1, |
|
use_async=True, |
|
) |
|
return True |
|
except Exception as e: |
|
print(f"Error initializing components: {str(e)}", file=sys.stderr) |
|
return False |
|
|
|
|
|
def extract_court_decision_text(url): |
|
response = requests.get(url) |
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
unwanted_texts = [ |
|
"Доступ до Реєстру здійснюється в тестовому (обмеженому) режимі.", |
|
"З метою упередження перешкоджанню стабільній роботі Реєстру" |
|
] |
|
|
|
decision_text = "" |
|
for paragraph in soup.find_all('p'): |
|
text = paragraph.get_text(separator="\n").strip() |
|
if not any(unwanted_text in text for unwanted_text in unwanted_texts): |
|
decision_text += text + "\n" |
|
return decision_text.strip() |
|
|
|
|
|
async def search_without_ai_action(url): |
|
try: |
|
court_decision_text = extract_court_decision_text(url) |
|
nodes = await retriever_bm25.aretrieve(court_decision_text) |
|
|
|
search_output_content = f"**Результати пошуку (наявні правові позиції ВС) за посиланням:** \n\n" |
|
for index, node in enumerate(nodes, start=1): |
|
source_title = node.node.metadata.get('title', 'Невідомий заголовок') |
|
doc_ids = node.node.metadata.get('doc_id') |
|
lp_ids = node.node.metadata.get('lp_id') |
|
links = get_links_html(doc_ids) |
|
links_lp = get_links_html_lp(lp_ids) |
|
|
|
search_output_content += f"\n[{index}] *{source_title}* ⚖️ {links_lp} | {links} 👉 Score: {node.score} \n" |
|
|
|
return search_output_content, nodes |
|
except Exception as e: |
|
return f"Error during search: {str(e)}", None |
|
|
|
async def search_without_ai_action_text(question_input): |
|
try: |
|
nodes = await retriever_bm25.aretrieve(question_input) |
|
|
|
search_output_content = f"**Результати пошуку (наявні правові позиції ВС) за текстовим запитом:** \n\n" |
|
for index, node in enumerate(nodes, start=1): |
|
source_title = node.node.metadata.get('title', 'Невідомий заголовок') |
|
doc_ids = node.node.metadata.get('doc_id') |
|
lp_ids = node.node.metadata.get('lp_id') |
|
links = get_links_html(doc_ids) |
|
links_lp = get_links_html_lp(lp_ids) |
|
search_output_content += f"\n[{index}] *{source_title}* ⚖️ {links_lp} | {links} 👉 Score: {node.score} \n" |
|
|
|
|
|
return search_output_content, nodes |
|
except Exception as e: |
|
return f"Error during search: {str(e)}", None |
|
|
|
|
|
def create_gradio_interface(): |
|
with gr.Blocks() as app: |
|
gr.Markdown("# Знаходьте правові позиції Верховного Суду") |
|
|
|
input_field = gr.Textbox(label="Введіть текст або посилання на судове рішення", lines=1) |
|
search_button = gr.Button("Пошук", interactive=False) |
|
warning_message = gr.Markdown(visible=False) |
|
|
|
search_output = gr.Markdown(label="Результат пошуку") |
|
|
|
state_nodes = gr.State() |
|
|
|
async def search_action(input_text): |
|
if re.match(r"^https://reyestr\.court\.gov\.ua/Review/\d+$", input_text.strip()): |
|
return await search_without_ai_action(input_text) |
|
else: |
|
return await search_without_ai_action_text(input_text) |
|
|
|
def update_button_state(text): |
|
text = text.strip() |
|
if not text: |
|
return gr.update(value="Пошук", interactive=False), gr.update(visible=False) |
|
elif re.match(r"^https://reyestr\.court\.gov\.ua/Review/\d+$", text): |
|
return gr.update(value="Пошук за URL", interactive=True), gr.update(visible=False) |
|
elif text.startswith("http"): |
|
return gr.update(value="Пошук", interactive=False), gr.update(value="Неправильний формат URL. Використовуйте посилання формату https://reyestr.court.gov.ua/Review/{doc_id}", visible=True) |
|
else: |
|
return gr.update(value="Пошук за текстом", interactive=True), gr.update(visible=False) |
|
|
|
search_button.click( |
|
fn=search_action, |
|
inputs=input_field, |
|
outputs=[search_output, state_nodes] |
|
) |
|
|
|
input_field.change( |
|
fn=update_button_state, |
|
inputs=input_field, |
|
outputs=[search_button, warning_message] |
|
) |
|
|
|
return app |
|
|
|
if __name__ == "__main__": |
|
if initialize_components(): |
|
print("Components initialized successfully!") |
|
app = create_gradio_interface() |
|
app.launch(share=True) |
|
else: |
|
print("Failed to initialize components. Please check the paths and try again.", file=sys.stderr) |
|
sys.exit(1) |
|
|