Spaces:
Running
Running
# Anki.py | |
# Description: Functions for Anki card generation | |
# | |
# Imports | |
import json | |
import zipfile | |
import sqlite3 | |
import tempfile | |
import os | |
import shutil | |
import base64 | |
import time | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Dict, Tuple, Optional, Any, List | |
import re | |
from html.parser import HTMLParser | |
# | |
# External Imports | |
#from outlines import models, prompts | |
# Local Imports | |
# | |
############################################################################################################ | |
# | |
# Functions: | |
class HTMLImageExtractor(HTMLParser): | |
"""Extract and validate image tags from HTML content.""" | |
def __init__(self): | |
super().__init__() | |
self.images = [] | |
def handle_starttag(self, tag, attrs): | |
if tag == 'img': | |
attrs_dict = dict(attrs) | |
if 'src' in attrs_dict: | |
self.images.append(attrs_dict['src']) | |
def sanitize_html(content: str) -> str: | |
"""Sanitize HTML content while preserving valid image tags and basic formatting.""" | |
if not content: | |
return "" | |
# Allow basic formatting and image tags | |
allowed_tags = {'img', 'b', 'i', 'u', 'div', 'br', 'p', 'span'} | |
allowed_attrs = {'src', 'alt', 'class', 'style'} | |
# Remove potentially harmful attributes | |
content = re.sub(r'(on\w+)="[^"]*"', '', content) | |
content = re.sub(r'javascript:', '', content) | |
# Parse and rebuild HTML | |
parser = HTMLParser() | |
parser.feed(content) | |
return content | |
def extract_media_from_apkg(zip_path: Any, temp_dir: str) -> Dict[str, str]: | |
"""Extract and process media files from APKG.""" | |
media_files = {} | |
try: | |
# Handle file path whether it's a string or file object | |
if hasattr(zip_path, 'name'): | |
# It's a file object from Gradio | |
file_name = zip_path.name | |
else: | |
# It's a string path | |
file_name = str(zip_path) | |
with zipfile.ZipFile(file_name, 'r') as zip_ref: | |
if 'media' in zip_ref.namelist(): | |
media_json = json.loads(zip_ref.read('media').decode('utf-8')) | |
for file_id, filename in media_json.items(): | |
if str(file_id) in zip_ref.namelist(): | |
file_data = zip_ref.read(str(file_id)) | |
file_path = os.path.join(temp_dir, filename) | |
# Save file temporarily | |
with open(file_path, 'wb') as f: | |
f.write(file_data) | |
# Process supported image types | |
if any(filename.lower().endswith(ext) for ext in ['.jpg', '.jpeg', '.png', '.gif']): | |
try: | |
with open(file_path, 'rb') as f: | |
file_content = f.read() | |
file_ext = os.path.splitext(filename)[1].lower() | |
media_type = f"image/{file_ext[1:]}" | |
if file_ext == '.jpg': | |
media_type = "image/jpeg" | |
media_files[ | |
filename] = f"data:{media_type};base64,{base64.b64encode(file_content).decode('utf-8')}" | |
except Exception as e: | |
print(f"Error processing image {filename}: {str(e)}") | |
# Clean up temporary file | |
os.remove(file_path) | |
except Exception as e: | |
print(f"Error processing media: {str(e)}") | |
return media_files | |
def validate_card_content(card: Dict[str, Any], seen_ids: set) -> list: | |
"""Validate individual card content and structure.""" | |
issues = [] | |
# Check required fields | |
if 'id' not in card: | |
issues.append("Missing ID") | |
elif card['id'] in seen_ids: | |
issues.append("Duplicate ID") | |
else: | |
seen_ids.add(card['id']) | |
if 'type' not in card or card['type'] not in ['basic', 'cloze', 'reverse']: | |
issues.append("Invalid card type") | |
if 'front' not in card or not card['front'].strip(): | |
issues.append("Missing front content") | |
if 'back' not in card or not card['back'].strip(): | |
issues.append("Missing back content") | |
if 'tags' not in card or not card['tags']: | |
issues.append("Missing tags") | |
# Content-specific validation | |
if card.get('type') == 'cloze': | |
if '{{c1::' not in card['front']: | |
issues.append("Invalid cloze format") | |
# Image validation | |
for field in ['front', 'back']: | |
if '<img' in card[field]: | |
extractor = HTMLImageExtractor() | |
extractor.feed(card[field]) | |
for img_src in extractor.images: | |
if not (img_src.startswith('data:image/') or img_src.startswith('http')): | |
issues.append(f"Invalid image source in {field}") | |
return issues | |
def process_apkg_file(file_path: str) -> Tuple[Optional[Dict], Optional[Dict], str]: | |
"""Process APKG file with support for different Anki database versions.""" | |
if not file_path: | |
return None, None, "No file provided" | |
# Handle file path whether it's a string or file object | |
if hasattr(file_path, 'name'): | |
# It's a file object from Gradio | |
file_name = file_path.name | |
else: | |
# It's a string path | |
file_name = str(file_path) | |
temp_dir = None | |
db_conn = None | |
cursor = None | |
cards_data = {"cards": []} | |
deck_info = None | |
try: | |
# Create temporary directory | |
temp_dir = tempfile.mkdtemp() | |
# Extract media files first | |
media_files = extract_media_from_apkg(file_name, temp_dir) | |
# Extract APKG contents | |
with zipfile.ZipFile(file_name, 'r') as zip_ref: | |
zip_ref.extractall(temp_dir) | |
zip_ref.extractall(temp_dir) | |
db_path = os.path.join(temp_dir, 'collection.anki2') | |
# Process database with explicit connection management | |
db_conn = sqlite3.connect(db_path) | |
cursor = db_conn.cursor() | |
try: | |
# Get collection info | |
cursor.execute("SELECT decks, models FROM col") | |
decks_json, models_json = cursor.fetchone() | |
deck_info = { | |
"decks": json.loads(decks_json), | |
"models": json.loads(models_json) | |
} | |
# Check if we're dealing with an older or newer Anki version | |
try: | |
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='notetypes'") | |
has_notetypes = cursor.fetchone() is not None | |
if has_notetypes: | |
# New Anki version (2.1.28+) | |
cursor.execute(""" | |
SELECT | |
n.id, n.flds, n.tags, c.type, n.mid, | |
m.name, n.sfld, m.flds, m.tmpls | |
FROM notes n | |
JOIN cards c ON c.nid = n.id | |
JOIN notetypes m ON m.id = n.mid | |
""") | |
else: | |
# Older Anki version | |
cursor.execute(""" | |
SELECT | |
n.id, n.flds, n.tags, c.type, n.mid, | |
m.name, n.sfld, m.flds, m.tmpls | |
FROM notes n | |
JOIN cards c ON c.nid = n.id | |
JOIN col AS m ON m.id = 1 AND json_extract(m.models, '$.' || n.mid) IS NOT NULL | |
""") | |
rows = cursor.fetchall() | |
except sqlite3.Error as e: | |
# Fallback query for very old Anki versions | |
cursor.execute(""" | |
SELECT | |
n.id, n.flds, n.tags, c.type, n.mid, | |
'', n.sfld, '[]', '[]' | |
FROM notes n | |
JOIN cards c ON c.nid = n.id | |
""") | |
rows = cursor.fetchall() | |
finally: | |
cursor.close() | |
db_conn.close() | |
# Process the fetched data | |
for row in rows: | |
note_id, fields, tags, card_type, model_id = row[0:5] | |
model_name = row[5] if row[5] else "Unknown Model" | |
fields_list = fields.split('\x1f') | |
try: | |
fields_config = json.loads(row[7]) if row[7] else [] | |
templates = json.loads(row[8]) if row[8] else [] | |
except json.JSONDecodeError: | |
fields_config = [] | |
templates = [] | |
# Process fields with media | |
processed_fields = [] | |
for field in fields_list: | |
field_html = field | |
for filename, base64_data in media_files.items(): | |
field_html = field_html.replace( | |
f'<img src="{filename}"', | |
f'<img src="{base64_data}"' | |
) | |
processed_fields.append(sanitize_html(field_html)) | |
# Determine card type (simplified logic) | |
converted_type = 'basic' | |
if any('cloze' in str(t).lower() for t in templates): | |
converted_type = 'cloze' | |
elif any('{{FrontSide}}' in str(t) for t in templates): | |
converted_type = 'reverse' | |
card_data = { | |
"id": f"APKG_{note_id}", | |
"type": converted_type, | |
"front": processed_fields[0] if processed_fields else "", | |
"back": processed_fields[1] if len(processed_fields) > 1 else "", | |
"tags": tags.strip().split(" ") if tags.strip() else ["imported"], | |
"note": f"Imported from deck: {model_name}", | |
"has_media": any('<img' in field for field in processed_fields), | |
"model_name": model_name, | |
"field_names": [f.get('name', f'Field_{i}') for i, f in enumerate(fields_config)], | |
"template_names": [t.get('name', f'Template_{i}') for i, t in enumerate(templates)] | |
} | |
cards_data["cards"].append(card_data) | |
if not cards_data["cards"]: | |
return None, None, "No cards found in the APKG file" | |
return cards_data, deck_info, "APKG file processed successfully!" | |
except sqlite3.Error as e: | |
return None, None, f"Database error: {str(e)}" | |
except json.JSONDecodeError as e: | |
return None, None, f"JSON parsing error: {str(e)}" | |
except Exception as e: | |
return None, None, f"Error processing APKG file: {str(e)}" | |
finally: | |
# Clean up resources | |
if cursor: | |
try: | |
cursor.close() | |
except: | |
pass | |
if db_conn: | |
try: | |
db_conn.close() | |
except: | |
pass | |
if temp_dir and os.path.exists(temp_dir): | |
try: | |
time.sleep(0.1) | |
for root, dirs, files in os.walk(temp_dir): | |
for fname in files: | |
try: | |
os.chmod(os.path.join(root, fname), 0o777) | |
except: | |
pass | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
except Exception as e: | |
print(f"Warning: Could not remove temporary directory {temp_dir}: {str(e)}") | |
def validate_flashcards(content: str) -> Tuple[bool, str]: | |
"""Validate flashcard content with enhanced image support.""" | |
try: | |
data = json.loads(content) | |
validation_results = [] | |
is_valid = True | |
if not isinstance(data, dict) or 'cards' not in data: | |
return False, "Invalid JSON format. Must contain 'cards' array." | |
seen_ids = set() | |
for idx, card in enumerate(data['cards']): | |
card_issues = validate_card_content(card, seen_ids) | |
if card_issues: | |
is_valid = False | |
validation_results.append(f"Card {card['id']}: {', '.join(card_issues)}") | |
return is_valid, "\n".join(validation_results) if validation_results else "All cards are valid!" | |
except json.JSONDecodeError: | |
return False, "Invalid JSON format" | |
except Exception as e: | |
return False, f"Validation error: {str(e)}" | |
def enhanced_file_upload(file: Any, input_type: str) -> Tuple[Optional[str], Optional[Dict], str, List[str]]: | |
"""Enhanced file upload handler with better error handling.""" | |
if not file: | |
return None, None, "β No file uploaded", [] | |
try: | |
if input_type == "APKG": | |
cards_data, deck_info, message = process_apkg_file(file) | |
if cards_data: | |
content = json.dumps(cards_data, indent=2) | |
choices = update_card_choices(content) | |
# Validate the converted content | |
validation_msg = handle_validation(content, "APKG") | |
return content, deck_info, validation_msg, choices | |
return None, None, f"β {message}", [] | |
else: | |
# Original JSON file handling | |
content = file.read().decode('utf-8') | |
json.loads(content) # Validate JSON | |
return content, None, "β JSON file loaded successfully!", update_card_choices(content) | |
except Exception as e: | |
return None, None, f"β Error processing file: {str(e)}", [] | |
def handle_file_upload(file: Any, input_type: str) -> Tuple[Optional[str], Optional[Dict], str, List[str]]: | |
"""Handle file upload with proper validation message formatting and card choices update.""" | |
if not file: | |
return None, None, "β No file uploaded", [] | |
if input_type == "APKG": | |
cards_data, deck_info, message = process_apkg_file(file) | |
if cards_data: | |
content = json.dumps(cards_data, indent=2) | |
return ( | |
content, | |
deck_info, | |
f"β {message}", | |
update_card_choices(content) | |
) | |
return None, None, f"β {message}", [] | |
else: # JSON | |
try: | |
content = file.read().decode('utf-8') | |
json.loads(content) # Validate JSON | |
return ( | |
content, | |
None, | |
"β JSON file loaded successfully!", | |
update_card_choices(content) | |
) | |
except Exception as e: | |
return None, None, f"β Error loading JSON file: {str(e)}", [] | |
def update_card_choices(content: str) -> List[str]: | |
"""Update card choices for the dropdown.""" | |
try: | |
data = json.loads(content) | |
return [f"{card['id']} - {card['front'][:50]}..." for card in data['cards']] | |
except: | |
return [] | |
def update_card_content( | |
current_content: str, | |
card_id: str, | |
card_type: str, | |
front: str, | |
back: str, | |
tags: str, | |
notes: str | |
) -> Tuple[str, str]: | |
"""Update card content and return updated JSON and status message.""" | |
try: | |
data = json.loads(current_content) | |
for card in data['cards']: | |
if card['id'] == card_id: | |
# Sanitize input content | |
card['type'] = card_type | |
card['front'] = sanitize_html(front) | |
card['back'] = sanitize_html(back) | |
card['tags'] = [tag.strip() for tag in tags.split(',')] | |
card['note'] = notes | |
# Update media status | |
card['has_media'] = '<img' in front or '<img' in back | |
return json.dumps(data, indent=2), "Card updated successfully!" | |
return current_content, "Card not found!" | |
except Exception as e: | |
return current_content, f"Error updating card: {str(e)}" | |
def load_card_for_editing(card_selection: str, current_content: str) -> tuple: | |
""" | |
Load a card for editing and generate previews. | |
Args: | |
card_selection (str): Selected card ID and preview text | |
current_content (str): Current JSON content | |
Returns: | |
tuple: (card_type, front_content, back_content, tags, notes, front_preview, back_preview) | |
""" | |
if not card_selection or not current_content: | |
return "basic", "", "", "", "", "", "" | |
try: | |
data = json.loads(current_content) | |
selected_id = card_selection.split(" - ")[0] | |
for card in data['cards']: | |
if card['id'] == selected_id: | |
# Return all required fields with preview content | |
return ( | |
card['type'], | |
card['front'], | |
card['back'], | |
", ".join(card['tags']), | |
card.get('note', ''), | |
sanitize_html(card['front']), | |
sanitize_html(card['back']) | |
) | |
return "basic", "", "", "", "", "", "" | |
except Exception as e: | |
print(f"Error loading card: {str(e)}") | |
return "basic", "", "", "", "", "", "" | |
def export_cards(content: str, format_type: str) -> Tuple[str, Optional[Tuple[str, str, str]]]: | |
"""Export cards in the specified format.""" | |
try: | |
is_valid, validation_message = validate_flashcards(content) | |
if not is_valid: | |
return "Please fix validation issues before exporting.", None | |
data = json.loads(content) | |
if format_type == "Anki CSV": | |
output = "Front,Back,Tags,Type,Note\n" | |
for card in data['cards']: | |
output += f'"{card["front"]}","{card["back"]}","{" ".join(card["tags"])}","{card["type"]}","{card.get("note", "")}"\n' | |
return "Cards exported successfully!", ("anki_cards.csv", output, "text/csv") | |
elif format_type == "JSON": | |
return "Cards exported successfully!", ("anki_cards.json", content, "application/json") | |
else: # Plain Text | |
output = "" | |
for card in data['cards']: | |
# Replace image tags with placeholders | |
front = re.sub(r'<img[^>]+>', '[IMG]', card['front']) | |
back = re.sub(r'<img[^>]+>', '[IMG]', card['back']) | |
output += f"Q: {front}\nA: {back}\nTags: {', '.join(card['tags'])}\n\n" | |
return "Cards exported successfully!", ("anki_cards.txt", output, "text/plain") | |
except Exception as e: | |
return f"Export error: {str(e)}", None | |
def generate_card_choices(content: str) -> list: | |
"""Generate choices for card selector dropdown.""" | |
try: | |
data = json.loads(content) | |
return [f"{card['id']} - {card['front'][:50]}..." for card in data['cards']] | |
except: | |
return [] | |
def format_validation_result(content: str) -> str: | |
"""Format validation results for display in Markdown component.""" | |
try: | |
is_valid, message = validate_flashcards(content) | |
return f"β {message}" if is_valid else f"β {message}" | |
except Exception as e: | |
return f"β Error during validation: {str(e)}" | |
def validate_for_ui(content: str) -> str: | |
"""Validate flashcards and return a formatted string for UI display.""" | |
if not content or not content.strip(): | |
return "β No content to validate. Please enter some flashcard data." | |
try: | |
# First try to parse the JSON | |
try: | |
data = json.loads(content) | |
except json.JSONDecodeError as je: | |
# Provide more specific JSON error feedback | |
line_col = f" (line {je.lineno}, column {je.colno})" if hasattr(je, 'lineno') else "" | |
return f"β Invalid JSON format: {str(je)}{line_col}" | |
# Check basic structure | |
if not isinstance(data, dict): | |
return "β Invalid format: Root element must be a JSON object" | |
if "cards" not in data: | |
return 'β Invalid format: Missing "cards" array in root object' | |
if not isinstance(data["cards"], list): | |
return 'β Invalid format: "cards" must be an array' | |
if not data["cards"]: | |
return "β No cards found in the data" | |
# If we get here, perform the full validation | |
is_valid, message = validate_flashcards(content) | |
if is_valid: | |
return f"β {message}" | |
else: | |
return f"β {message}" | |
except Exception as e: | |
return f"β Validation error: {str(e)}" | |
def update_card_with_validation( | |
current_content: str, | |
card_selection: str, | |
card_type: str, | |
front: str, | |
back: str, | |
tags: str, | |
notes: str | |
) -> Tuple[str, str, List[str]]: | |
"""Update card and return properly formatted validation message and updated choices.""" | |
try: | |
# Unpack the tuple returned by update_card_content | |
updated_content, message = update_card_content( | |
current_content, | |
card_selection.split(" - ")[0], | |
card_type, | |
front, | |
back, | |
tags, | |
notes | |
) | |
if "successfully" in message: | |
return ( | |
updated_content, | |
f"β {message}", | |
update_card_choices(updated_content) | |
) | |
else: | |
return ( | |
current_content, | |
f"β {message}", | |
update_card_choices(current_content) | |
) | |
except Exception as e: | |
return ( | |
current_content, | |
f"β Error updating card: {str(e)}", | |
update_card_choices(current_content) | |
) | |
def handle_validation(content: str, input_format: str) -> str: | |
"""Handle validation for both JSON and APKG formats.""" | |
if not content or not content.strip(): | |
return "β No content to validate" | |
try: | |
data = json.loads(content) | |
if not isinstance(data, dict): | |
return "β Invalid format: Root element must be a JSON object" | |
if "cards" not in data: | |
return 'β Invalid format: Missing "cards" array in root object' | |
if not isinstance(data["cards"], list): | |
return 'β Invalid format: "cards" must be an array' | |
if not data["cards"]: | |
return "β No cards found in the data" | |
card_count = len(data["cards"]) | |
if input_format == "APKG": | |
return f"β Successfully imported and validated {card_count} cards from APKG file" | |
else: | |
# For JSON input, perform additional validation | |
is_valid, message = validate_flashcards(content) | |
return f"β {message}" if is_valid else f"β {message}" | |
except json.JSONDecodeError as je: | |
line_col = f" (line {je.lineno}, column {je.colno})" if hasattr(je, 'lineno') else "" | |
return f"β Invalid JSON format: {str(je)}{line_col}" | |
except Exception as e: | |
return f"β Validation error: {str(e)}" | |
# | |
# End of Anki.py | |
############################################################################################################ | |