import gradio as gr import torch from transformers import Qwen2VLForConditionalGeneration, AutoProcessor from qwen_vl_utils import process_vision_info from PIL import Image import os from datetime import datetime import numpy as np import json import math import spaces import logging from functools import lru_cache import requests from io import BytesIO logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) TEST_REPORT_URL = "https://www.eyenews.uk.com/media/9694/eyeam18-case-in-point-figure-2.png" DESCRIPTION = """ # IOL Report Analyzer Upload an IOL Master biometry report for AI-powered analysis and IOL calculations. """ DEFAULT_SETTINGS = { "manufacturer": "Alcon", "lens_model": "SN60WF", "a_constant": 118.7, "target_refraction": 0.0 } IOL_CONSTANTS = { "Alcon": { "SN60WF": 118.7, "SA60AT": 118.4, "MA60MA": 118.9, "SN6AT": 119.1 }, "Johnson & Johnson": { "ZCB00": 119.3, "PCB00": 119.3, "ZA9003": 119.1, "AR40e": 118.7 }, "Zeiss": { "CT LUCIA 611P": 118.6, "AT LISA tri": 118.3, "AT TORBI": 118.2 }, "Bausch & Lomb": { "enVista": 119.1, "LI61AO": 118.0, "EyeCee One": 118.9 } } FORMULAS_DOC = """ # IOL Power Calculation Formulas ## 1. SRK/T Formula A complex vergence formula that considers: - Corneal height - Effective lens position (ELP) - Retinal thickness - Adjusted axial length Key variables: AL, K readings, A-constant ## 2. Barrett Universal II Formula A proprietary formula developed by Dr. Graham Barrett that considers: - Adjusted axial length - Lens factor - Effective lens position - Optional ACD measurement Note: Implementation uses published approximations ## 3. Haigis Formula ELP = a0 + (a1 × ACD) + (a2 × AL) P = (n_vitreous * (AL - ELP)) - (n_aqueous * ELP) / (ELP * (AL - ELP)) Where: - ELP = Effective lens position - ACD = Anterior chamber depth - AL = Axial length ## 4. Holladay 1 Formula P = (1336 / (AL - ELP)) - (1336 / (337.5 / K)) Where: - P = IOL power (D) - AL = Axial length (mm) - ELP = Effective lens position - K = Mean keratometry (D) ## 5. EVO Formula A proprietary formula developed by VSY Biotechnology that considers: - Axial length - Keratometry - Anterior chamber depth - Lens thickness - Target refraction Note: Implementation uses published approximations ## 6. Kane Formula A proprietary formula developed by Dr. Graham Barrett that considers: - Axial length - Keratometry - Anterior chamber depth - Lens thickness - Gender (optional) Note: Implementation uses published approximations """ expected_measurements = {'axial_length': ('Axial Length', 'mm'), 'k1': ('K1', 'D'), 'k2': ('K2', 'D'), 'acd': ('ACD', 'mm'), 'lens_thickness': ('Lens Thickness', 'mm'), 'white_to_white': ('WTW', 'mm'), 'pupil_size': ('Pupil Size', 'mm'), 'astigmatism': ('Astigmatism', 'D'), 'axis': ('Axis', '°')} class IOLCalculator: @staticmethod def srkt_formula(al, k1, k2, a_const, target_ref=0.0): try: k_avg = (k1 + k2) / 2 r = 337.5 / k_avg h = r - math.sqrt(r**2 - (0.0725**2)) acd = 0.62467 * r - 6.8 rt = 0.65696 - 0.02029 * al elp = h + acd - rt v = 1336.3 / (337.5 / k_avg) iol_power = (1.336 / (0.001 * (al - elp - 0.1))) + (1336.3 - v) / (v * (al - elp - 0.1)) return round(iol_power + (a_const - 118.4) - (target_ref * 1.458), 2) except: return None @staticmethod def barrett_universal_2(al, k1, k2, acd=None, lcf=1.67, target_ref=0.0): try: k_avg = (k1 + k2) / 2 r = 337.5 / k_avg acd = acd if acd is not None else (3.0 + (0.1 * (al - 23.5)) + (0.05 * (k_avg - 43.5)) + (0.1 * (lcf - 1.67))) lfa = lcf * (al / 23.5) * (1 + 0.02 * abs(al - 23.5)) lfa *= 0.98 if al > 26 else 1.02 if al < 22 else 1 elp = (lfa * acd + 0.1 * k_avg - 3.4) elp *= 0.97 if k_avg > 46 else 1.03 if k_avg < 42 else 1 rt = 0.65696 - 0.02029 * al iol_power = ((1.336 * 1000 / (al - elp - rt)) - ((1.3375 - 1) / (r / 1000)) - (target_ref * 1.458)) iol_power *= 0.98 if al > 25 else 1.02 if al < 22 else 1 return round(iol_power, 2) except: return None @staticmethod def haigis_formula(al, acd, k_avg, a0=0.87, a1=0.2, a2=0.4, target_ref=0.0): try: n_aqueous = 1.336 n_vitreous = 1.336 elp = a0 + (a1 * acd) + (a2 * al) p = ((n_vitreous * (al - elp)) - (n_aqueous * elp)) / (elp * (al - elp)) return round(p * 1000 - (target_ref * 1.458), 2) except: return None @staticmethod def holladay1_formula(al, k_avg, a_const, target_ref=0.0): try: sf = a_const - 68.4; acd = 0.56 + (sf * 0.65) + (0.4 * math.log(k_avg)) r = 337.5 / k_avg; h = r - math.sqrt(r**2 - (0.0725**2)); elp = h + sf return round(((1336 / (al - elp)) - (1336 / (337.5 / k_avg))) * 1000 - (target_ref * 1.458), 2) except: return None @staticmethod def get_barrett_lcf(a_const): return (a_const - 115.8) / 1.2 @staticmethod def calculate_retinal_thickness(al): return 0.65696 - 0.02029 * al @staticmethod def calculate_corneal_height(k_avg): r = 337.5 / k_avg; return r - math.sqrt(r**2 - (0.0725**2)) @staticmethod def evo_formula(al, k1, k2, acd, lt, target_ref=0.0): """ EVO Formula implementation with input validation and error handling Args: al (float): Axial length in mm (valid: 20-30) k1 (float): K1 reading in diopters (valid: 39-48) k2 (float): K2 reading in diopters (valid: 39-48) acd (float): Anterior chamber depth in mm (valid: 2.0-4.5) lt (float): Lens thickness in mm (valid: 3.0-6.0) target_ref (float): Target refraction in diopters Returns: float or None: Calculated IOL power in diopters """ try: # Input validation if any(x is None for x in [al, k1, k2, acd, lt]): return None # Type conversion and numpy array handling al = float(al) k1 = float(k1) k2 = float(k2) acd = float(acd) lt = float(lt) target_ref = float(target_ref) # Range validation if not (20 <= al <= 30): print("AL outside valid range") return None if not (39 <= k1 <= 48) or not (39 <= k2 <= 48): print("K readings outside valid range") return None if not (2.0 <= acd <= 4.5): print("ACD outside valid range") return None if not (3.0 <= lt <= 6.0): print("LT outside valid range") return None # Constants n_lens = 1.47 n_cornea = 1.376 # Calculate TCP (Total Corneal Power) tcp = (k1 + k2) / 2 # Calculate ELP (Effective Lens Position) elp = (acd + (0.4 * lt)) * (1.0 - (0.02 * abs(al - 23.5))) # Apply the formula p_iol = ((n_lens - n_cornea) / (al - elp)) * (tcp - target_ref) return round(p_iol * 1000, 2) except Exception as e: print(f"Error in EVO formula calculation: {str(e)}") return None @staticmethod def kane_formula(al, k1, k2, acd, lt, gender='neutral', target_ref=0.0): try: avg_k = (k1 + k2) / 2 gender_factor = {'male': 1.12, 'female': 1.06, 'neutral': 1.09}.get(gender.lower(), 1.09) # Calculate adjustments al_adj = 0.96 if al > 25 else 1.04 if al < 22 else 1.0 k_adj = 0.98 if avg_k > 46 else 1.02 if avg_k < 42 else 1.0 acd_adj = 0.95 if acd > 3.5 else 1.05 if acd < 2.8 else 1.0 # Calculate power power = ((al * 0.67 * al_adj) + (avg_k * 0.84 * k_adj * gender_factor) - (acd * 0.42 * acd_adj) - (lt * 0.12) - (target_ref * 1.458)) return round(power, 2) except: return None def calculate_iol_powers(eye_data, settings): required_fields = ['axial_length', 'k1', 'k2'] if not all(eye_data.get(field) is not None for field in required_fields): return {"error": "Missing required measurements"} calculator = IOLCalculator() results = {"measurements": eye_data, "calculations": {}, "recommendations": {"warnings": []}} calculations = {} lcf = calculator.get_barrett_lcf(settings['a_constant']) barrett = calculator.barrett_universal_2( eye_data['axial_length'], eye_data['k1'], eye_data['k2'], acd=eye_data.get('acd'), lcf=lcf, target_ref=settings['target_refraction'] ) if barrett is not None: calculations['barrett'] = barrett # Rest of the function remains the same... def ensure_temp_directory(): """Create temp directory if it doesn't exist""" os.makedirs("temp", exist_ok=True) @spaces.GPU def check_environment(): """Check and verify basic environment setup for model execution""" try: # 1. Basic PyTorch check print(f"PyTorch version: {torch.__version__}") # 2. Device check and info if torch.cuda.is_available(): device_info = f"GPU: {torch.cuda.get_device_name(0)}" else: device_info = "CPU (CUDA not available)" print(f"Running on: {device_info}") # 3. Model environment verification model_id = "Qwen/Qwen2-VL-7B-Instruct" processor = AutoProcessor.from_pretrained(model_id) print("✓ Processor loaded successfully") return True except Exception as e: print(f"Environment check failed: {str(e)}") return False @lru_cache(maxsize=1) def get_model(model_id): try: if not torch.cuda.is_available(): print("Warning: CUDA not available, using CPU") device_map = "cpu" else: device_map = "auto" model = Qwen2VLForConditionalGeneration.from_pretrained( model_id, torch_dtype=torch.float16, device_map=device_map ) return model.eval() except Exception as e: error_msg = f"Error loading model: {str(e)}" print(error_msg) raise RuntimeError(error_msg) def get_processor(model_name): """Load the processor with error handling""" try: return AutoProcessor.from_pretrained(model_name) except Exception as e: raise RuntimeError(f"Failed to load processor {model_name}: {str(e)}") def generate_extraction_prompt(): """Generate the prompt for measurement extraction""" return """Please analyze this IOL Master report and extract the measurements in JSON format with the following structure: { "right_eye": { "axial_length": float, "k1": float, "k2": float, "acd": float, "lens_thickness": float, "white_to_white": float, "pupil_size": float, "astigmatism": float, "axis": float }, "left_eye": { "axial_length": float, "k1": float, "k2": float, "acd": float, "lens_thickness": float, "white_to_white": float, "pupil_size": float, "astigmatism": float, "axis": float } }""" def format_eye_data(measurements, results): """Simplified formatting""" if not measurements or not results: return "No data available", "No calculations available", "" # Format measurements meas_text = "| Parameter | Value | Unit |\n|-----------|--------|------|\n" for key, (label, unit) in expected_measurements.items(): if key in measurements: value = measurements[key] meas_text += f"| {label} | {value:.2f} | {unit} |\n" # Format calculations calc_text = "| Formula | Power |\n|----------|--------|\n" if 'calculations' in results: for formula, power in results['calculations'].items(): calc_text += f"| {formula.title()} | {power} D |\n" # Format warnings warnings = "" if 'recommendations' in results and results['recommendations'].get('warnings'): warnings = "\n### ⚠️ Warnings\n" + "\n".join(f"- {w}" for w in results['recommendations']['warnings']) return meas_text, calc_text, warnings MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB SUPPORTED_FORMATS = ['.jpg', '.jpeg', '.png'] MAX_GPU_MEMORY_THRESHOLD = 2 * 1024 * 1024 * 1024 # 2GB def validate_inputs(image, manufacturer, lens_model, a_constant, target_refraction): if image is None: raise ValueError("No image provided") # For numpy arrays, we need to check the shape and content if isinstance(image, np.ndarray): if len(image.shape) < 2: # Must be at least 2D raise ValueError("Invalid image format: Image must be 2D or 3D array") if image.shape[0] == 0 or image.shape[1] == 0: raise ValueError("Invalid image dimensions") # Check if array is empty using any() if not np.any(image): raise ValueError("Empty image array") # Validate image format and size using proper file attributes if hasattr(image, 'format'): if image.format.lower() not in [fmt.strip('.') for fmt in SUPPORTED_FORMATS]: raise ValueError(f"Unsupported image format. Supported formats: {', '.join(SUPPORTED_FORMATS)}") # Check file size if available if hasattr(image, 'size_bytes'): if image.size_bytes > MAX_IMAGE_SIZE: raise ValueError(f"Image size exceeds maximum allowed size of {MAX_IMAGE_SIZE/1024/1024}MB") # Validate numeric inputs if not isinstance(target_refraction, (int, float)): raise ValueError("Target refraction must be a number") if not isinstance(a_constant, (int, float)): raise ValueError("A-constant must be a number") # Validate a_constant range if not 110 <= a_constant <= 125: raise ValueError("A-constant must be between 110 and 125") if manufacturer not in IOL_CONSTANTS: raise ValueError("Invalid manufacturer selected") if not lens_model or lens_model not in IOL_CONSTANTS[manufacturer]: raise ValueError("Invalid lens model selected") @spaces.GPU def run_analysis(image, manufacturer, lens_model, a_constant, target_refraction): try: settings = { "manufacturer": manufacturer, "lens_model": lens_model, "a_constant": a_constant, "target_refraction": target_refraction } # Define image_path before use image_path = f"temp/image_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" # Create temp directory and save image try: if not isinstance(image, (np.ndarray, list)): return "Error: Invalid image format" Image.fromarray(np.uint8(image)).save(image_path) except Exception as e: return f"Error handling image: {str(e)}" # Load model and processor model = get_model("Qwen/Qwen2-VL-7B-Instruct") processor = get_processor("Qwen/Qwen2-VL-7B-Instruct") # Prepare messages for analysis messages = [{ "role": "user", "content": [ {"type": "image", "image": image_path}, {"type": "text", "text": generate_extraction_prompt()} ] }] # Process the image text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) image_inputs, video_inputs = process_vision_info(messages) inputs = processor(text=[text], images=image_inputs, videos=video_inputs, return_tensors="pt").to(model.device) # Generate analysis generated_ids = model.generate( **inputs, max_new_tokens=1024, temperature=0.1, do_sample=False ) # Process output ai_output = processor.batch_decode([generated_ids[0][len(inputs.input_ids[0]):]], skip_special_tokens=True)[0] try: # Parse measurements measurements = json.loads(ai_output) if not isinstance(measurements, dict): return "Error: Invalid measurements format" # Calculate results for both eyes right_results = calculate_iol_powers(measurements.get('right_eye', {}), settings) if measurements.get('right_eye') else None right_meas, right_calc, right_warn = format_eye_data(measurements.get('right_eye'), right_results) left_results = calculate_iol_powers(measurements.get('left_eye', {}), settings) if measurements.get('left_eye') else None left_meas, left_calc, left_warn = format_eye_data(measurements.get('left_eye'), left_results) # Format report report = f"""# IOL Analysis Report Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')} ## Right Eye Analysis ### 📊 Measurements {right_meas} ### 💡 IOL Calculations {right_calc} {right_warn} ## Left Eye Analysis ### 📊 Measurements {left_meas} ### 💡 IOL Calculations {left_calc} {left_warn} ------------------- ### Device Settings | Parameter | Value | |:----------|:-------| | Manufacturer | {manufacturer} | | Lens Model | {lens_model} | | A-Constant | {a_constant} | | Target Refraction | {target_refraction} D | ------------------- *Report generated by [Auto-IOL AI Tool](https://luigi12345-auto-iol.hf.space)*""" return report except json.JSONDecodeError: return "Error: Could not parse AI output" finally: # Cleanup if 'image_path' in locals() and os.path.exists(image_path): try: os.remove(image_path) except: pass except Exception as e: return f"Error: {str(e)}" with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="indigo")) as demo: gr.Markdown(DESCRIPTION) with gr.Tabs() as tabs: with gr.Tab("📊 Analysis", id="analysis"): with gr.Row(): with gr.Column(scale=1): input_img = gr.Image(label="IOL Report Image", type="numpy", value=TEST_REPORT_URL, height=350, width="100%", sources=["upload", "webcam", "clipboard"]) with gr.Group(): with gr.Row(): manufacturer = gr.Dropdown(choices=list(IOL_CONSTANTS.keys()), value=DEFAULT_SETTINGS["manufacturer"], label="Manufacturer", info="Select IOL manufacturer", scale=1) lens_model = gr.Dropdown(choices=list(IOL_CONSTANTS[DEFAULT_SETTINGS["manufacturer"]].keys()), value=DEFAULT_SETTINGS["lens_model"], label="Lens Model", info="Select specific lens model", scale=1) with gr.Row(): a_constant = gr.Number(value=DEFAULT_SETTINGS["a_constant"], label="A-Constant", info="Auto-updated based on lens selection", interactive=False, scale=1) target_refraction = gr.Number(value=DEFAULT_SETTINGS["target_refraction"], label="Target Refraction (D)", info="Desired postoperative refraction", minimum=-10, maximum=10, step=0.25, scale=1) analyze_btn = gr.Button("Analyze Report", variant="primary", size="lg", icon="🔍") with gr.Column(scale=2): output_text = gr.Markdown(label="Analysis Results", show_label=True, value="Upload an IOL report image and click 'Analyze Report' to begin...") with gr.Tab("📚 Documentation", id="docs"): gr.Markdown(FORMULAS_DOC) analyze_btn.click(fn=run_analysis, inputs=[input_img, manufacturer, lens_model, a_constant, target_refraction], outputs=output_text, api_name="analyze", show_progress="full") gr.Examples([[TEST_REPORT_URL, DEFAULT_SETTINGS["manufacturer"], DEFAULT_SETTINGS["lens_model"], DEFAULT_SETTINGS["a_constant"], DEFAULT_SETTINGS["target_refraction"]]], inputs=[input_img, manufacturer, lens_model, a_constant, target_refraction], outputs=output_text, fn=run_analysis, cache_examples=True, label="Example Report") manufacturer.change(fn=lambda m: list(IOL_CONSTANTS[m].keys()), inputs=[manufacturer], outputs=[lens_model]) lens_model.change(fn=lambda m, l: IOL_CONSTANTS[m][l], inputs=[manufacturer, lens_model], outputs=[a_constant]) if __name__ == "__main__": # Initialize environment check_environment() ensure_temp_directory() # Launch the demo demo.queue(max_size=1, api_open=False) demo.launch(debug=True, show_error=True, share=False, server_name="0.0.0.0", server_port=7860)