import gradio as gr import requests import pandas as pd from sklearn.linear_model import LogisticRegression from fastai.vision.all import * import os from datetime import datetime, timedelta # Load the trained model for image-based fog classification learn = load_learner('fog_classifier.pkl') labels = learn.dls.vocab API_KEY = os.environ.get("OPENWEATHER_API_KEY") BASE_URL = 'https://api.openweathermap.org/data/2.5/' def predict_image(img): """Predict fog conditions from image and return confidence scores""" img = PILImage.create(img) img = img.resize((512, 512)) pred, pred_idx, probs = learn.predict(img) return {labels[i]: float(probs[i]) for i in range(len(labels))} def calculate_fog_risk_score(weather_data): """Calculate a fog risk score (0-1) based on weather conditions""" # Normalized weights for each factor weights = { 'humidity': 0.3, 'dew_point_temp_diff': 0.3, 'visibility': 0.2, 'wind_speed': 0.1, 'pressure_change': 0.1 } # Calculate dew point dew_point = weather_data['temperature'] - ((100 - weather_data['humidity']) / 5.0) dew_point_temp_diff = abs(weather_data['temperature'] - dew_point) # Normalize each factor to 0-1 scale humidity_score = min(weather_data['humidity'] / 100, 1) dew_point_score = 1 - min(dew_point_temp_diff / 5, 1) # Closer to dew point = higher score visibility_score = 1 - min(weather_data['visibility'] / 10, 1) # Lower visibility = higher score wind_score = 1 - min(weather_data['wind_speed'] / 10, 1) # Lower wind = higher score pressure_score = min(abs(weather_data['pressure'] - 1013.25) / 50, 1) # Deviation from normal pressure # Calculate weighted score fog_risk = ( weights['humidity'] * humidity_score + weights['dew_point_temp_diff'] * dew_point_score + weights['visibility'] * visibility_score + weights['wind_speed'] * wind_score + weights['pressure_change'] * pressure_score ) return fog_risk def get_weather_data(location): """Get current weather data with enhanced error handling""" try: current_weather_url = f'{BASE_URL}weather?q={location}&appid={API_KEY}&units=metric' response = requests.get(current_weather_url) response.raise_for_status() data = response.json() return { 'temperature': data['main'].get('temp', 0), 'feels_like': data['main'].get('feels_like', 0), 'description': data['weather'][0].get('description', ''), 'wind_speed': data['wind'].get('speed', 0), 'pressure': data['main'].get('pressure', 0), 'humidity': data['main'].get('humidity', 0), 'visibility': data.get('visibility', 10000) / 1000, 'timestamp': datetime.fromtimestamp(data['dt']) } except requests.exceptions.RequestException as e: raise Exception(f"Failed to fetch weather data: {str(e)}") def get_forecast_data(location): """Get 5-day forecast with enhanced error handling""" try: forecast_url = f'{BASE_URL}forecast?q={location}&appid={API_KEY}&units=metric' response = requests.get(forecast_url) response.raise_for_status() data = response.json() forecasts = [] for item in data['list']: forecasts.append({ 'temperature': item['main'].get('temp', 0), 'humidity': item['main'].get('humidity', 0), 'description': item['weather'][0].get('description', ''), 'wind_speed': item['wind'].get('speed', 0), 'pressure': item['main'].get('pressure', 0), 'visibility': item.get('visibility', 10000) / 1000, 'timestamp': datetime.fromtimestamp(item['dt']) }) return forecasts except requests.exceptions.RequestException as e: raise Exception(f"Failed to fetch forecast data: {str(e)}") def determine_transmission_power(image_prediction, weather_data, forecast_data=None): """ Determine transmission power based on current conditions and forecast Returns: (power_level, duration, explanation) """ # Get fog confidence from image image_fog_confidence = max( image_prediction.get('Dense_Fog', 0), image_prediction.get('Moderate_Fog', 0) * 0.6 ) # Calculate weather-based fog risk current_fog_risk = calculate_fog_risk_score(weather_data) # Combine image and weather predictions with weighted average # Give slightly more weight to image prediction as it's more reliable combined_fog_risk = (image_fog_confidence * 0.6) + (current_fog_risk * 0.4) # Initialize explanation explanation = [] # Determine base power level from current conditions if combined_fog_risk > 0.7: power_level = "High" explanation.append(f"High fog risk detected (Risk score: {combined_fog_risk:.2f})") elif combined_fog_risk > 0.4: power_level = "Medium" explanation.append(f"Moderate fog risk detected (Risk score: {combined_fog_risk:.2f})") else: power_level = "Normal" explanation.append(f"Low fog risk detected (Risk score: {combined_fog_risk:.2f})") # Analyze forecast data if available duration = timedelta(hours=1) # Default duration if forecast_data: future_risks = [] for forecast in forecast_data[:40]: # 5 days of 3-hour forecasts risk = calculate_fog_risk_score(forecast) future_risks.append(risk) # Find periods of high risk high_risk_periods = [risk > 0.6 for risk in future_risks] if any(high_risk_periods): # Find the last high-risk timestamp last_high_risk_idx = len(high_risk_periods) - 1 - high_risk_periods[::-1].index(True) duration = forecast_data[last_high_risk_idx]['timestamp'] - weather_data['timestamp'] explanation.append(f"High fog risk predicted to continue for {duration.days} days and {duration.hours} hours") # Adjust power level based on forecast if sum(high_risk_periods) / len(high_risk_periods) > 0.5: power_level = "High" explanation.append("Power level set to High due to sustained fog risk in forecast") return power_level, duration, explanation def integrated_prediction(image, location): """Main function to process image and weather data""" try: # Get image prediction image_prediction = predict_image(image) # Get current weather current_weather = get_weather_data(location) # Get forecast forecast_data = get_forecast_data(location) # Determine transmission power power_level, duration, explanation = determine_transmission_power( image_prediction, current_weather, forecast_data ) # Format result result = [ f"Current Conditions ({current_weather['timestamp'].strftime('%Y-%m-%d %H:%M')})", f"Temperature: {current_weather['temperature']:.1f}°C", f"Humidity: {current_weather['humidity']}%", f"Visibility: {current_weather['visibility']:.1f} km", f"Wind Speed: {current_weather['wind_speed']} m/s", "", "Analysis Results:", *explanation, "", f"Recommended Power Level: {power_level}", f"Duration: {duration.days} days and {duration.hours} hours", "", "5-Day Forecast Summary:" ] # Add daily forecast summary current_date = current_weather['timestamp'].date() for day in range(5): forecast_date = current_date + timedelta(days=day) day_forecasts = [f for f in forecast_data if f['timestamp'].date() == forecast_date] if day_forecasts: avg_risk = sum(calculate_fog_risk_score(f) for f in day_forecasts) / len(day_forecasts) result.append(f"{forecast_date.strftime('%Y-%m-%d')}: " f"Fog Risk: {'High' if avg_risk > 0.6 else 'Moderate' if avg_risk > 0.3 else 'Low'} " f"({avg_risk:.2f})") return "\n".join(result) except Exception as e: return f"Error: {str(e)}" # Gradio interface with gr.Blocks() as demo: gr.Markdown("# Enhanced Fog Prediction and Transmission Power System") with gr.Row(): image_input = gr.Image(label="Upload Current Conditions Image") location_input = gr.Textbox(label="Enter Location") predict_button = gr.Button("Analyze and Determine Transmission Power") output = gr.Textbox(label="Analysis Results", lines=15) predict_button.click(integrated_prediction, inputs=[image_input, location_input], outputs=output) demo.launch()