fog / app.py
CatherineProtas's picture
Update app.py
1713d64 verified
import gradio as gr
import requests
import pandas as pd
from sklearn.linear_model import LogisticRegression
from fastai.vision.all import *
import os
from datetime import datetime, timedelta
# Load the trained model for image-based fog classification
learn = load_learner('fog_classifier.pkl')
labels = learn.dls.vocab
API_KEY = os.environ.get("OPENWEATHER_API_KEY")
BASE_URL = 'https://api.openweathermap.org/data/2.5/'
def predict_image(img):
"""Predict fog conditions from image and return confidence scores"""
img = PILImage.create(img)
img = img.resize((512, 512))
pred, pred_idx, probs = learn.predict(img)
return {labels[i]: float(probs[i]) for i in range(len(labels))}
def calculate_fog_risk_score(weather_data):
"""Calculate a fog risk score (0-1) based on weather conditions"""
# Normalized weights for each factor
weights = {
'humidity': 0.3,
'dew_point_temp_diff': 0.3,
'visibility': 0.2,
'wind_speed': 0.1,
'pressure_change': 0.1
}
# Calculate dew point
dew_point = weather_data['temperature'] - ((100 - weather_data['humidity']) / 5.0)
dew_point_temp_diff = abs(weather_data['temperature'] - dew_point)
# Normalize each factor to 0-1 scale
humidity_score = min(weather_data['humidity'] / 100, 1)
dew_point_score = 1 - min(dew_point_temp_diff / 5, 1) # Closer to dew point = higher score
visibility_score = 1 - min(weather_data['visibility'] / 10, 1) # Lower visibility = higher score
wind_score = 1 - min(weather_data['wind_speed'] / 10, 1) # Lower wind = higher score
pressure_score = min(abs(weather_data['pressure'] - 1013.25) / 50, 1) # Deviation from normal pressure
# Calculate weighted score
fog_risk = (
weights['humidity'] * humidity_score +
weights['dew_point_temp_diff'] * dew_point_score +
weights['visibility'] * visibility_score +
weights['wind_speed'] * wind_score +
weights['pressure_change'] * pressure_score
)
return fog_risk
def get_weather_data(location):
"""Get current weather data with enhanced error handling"""
try:
current_weather_url = f'{BASE_URL}weather?q={location}&appid={API_KEY}&units=metric'
response = requests.get(current_weather_url)
response.raise_for_status()
data = response.json()
return {
'temperature': data['main'].get('temp', 0),
'feels_like': data['main'].get('feels_like', 0),
'description': data['weather'][0].get('description', ''),
'wind_speed': data['wind'].get('speed', 0),
'pressure': data['main'].get('pressure', 0),
'humidity': data['main'].get('humidity', 0),
'visibility': data.get('visibility', 10000) / 1000, # Still needed for calculations
'timestamp': datetime.fromtimestamp(data['dt'])
}
except requests.exceptions.RequestException as e:
raise Exception(f"Failed to fetch weather data: {str(e)}")
def get_forecast_data(location):
"""Get 5-day forecast with enhanced error handling"""
try:
forecast_url = f'{BASE_URL}forecast?q={location}&appid={API_KEY}&units=metric'
response = requests.get(forecast_url)
response.raise_for_status()
data = response.json()
forecasts = []
for item in data['list']:
forecasts.append({
'temperature': item['main'].get('temp', 0),
'humidity': item['main'].get('humidity', 0),
'description': item['weather'][0].get('description', ''),
'wind_speed': item['wind'].get('speed', 0),
'pressure': item['main'].get('pressure', 0),
'visibility': item.get('visibility', 10000) / 1000, # Still needed for calculations
'timestamp': datetime.fromtimestamp(item['dt'])
})
return forecasts
except requests.exceptions.RequestException as e:
raise Exception(f"Failed to fetch forecast data: {str(e)}")
def format_duration(duration):
"""Format timedelta into days and hours string"""
total_hours = duration.total_seconds() / 3600
days = int(total_hours // 24)
hours = int(total_hours % 24)
if days > 0:
return f"{days} days and {hours} hours"
return f"{hours} hours"
def determine_transmission_power(image_prediction, weather_data, forecast_data=None):
"""
Determine transmission power based on current conditions and forecast
Returns: (power_level, duration, explanation)
"""
# Get fog confidence from image
image_fog_confidence = max(
image_prediction.get('Dense_Fog', 0),
image_prediction.get('Moderate_Fog', 0) * 0.6
)
# Calculate weather-based fog risk
current_fog_risk = calculate_fog_risk_score(weather_data)
# Combine image and weather predictions with weighted average
combined_fog_risk = (image_fog_confidence * 0.6) + (current_fog_risk * 0.4)
# Initialize explanation
explanation = []
# Determine base power level from current conditions
if combined_fog_risk > 0.7:
power_level = "High"
explanation.append(f"High fog risk detected (Risk score: {combined_fog_risk:.2f})")
elif combined_fog_risk > 0.4:
power_level = "Medium"
explanation.append(f"Moderate fog risk detected (Risk score: {combined_fog_risk:.2f})")
else:
power_level = "Normal"
explanation.append(f"Low fog risk detected (Risk score: {combined_fog_risk:.2f})")
# Analyze forecast data if available
duration = timedelta(hours=1) # Default duration
if forecast_data:
future_risks = []
for forecast in forecast_data[:40]: # 5 days of 3-hour forecasts
risk = calculate_fog_risk_score(forecast)
future_risks.append(risk)
# Find periods of high risk
high_risk_periods = [risk > 0.6 for risk in future_risks]
if any(high_risk_periods):
last_high_risk_idx = len(high_risk_periods) - 1 - high_risk_periods[::-1].index(True)
duration = forecast_data[last_high_risk_idx]['timestamp'] - weather_data['timestamp']
explanation.append(f"High fog risk predicted to continue for {format_duration(duration)}")
# Adjust power level based on forecast
if sum(high_risk_periods) / len(high_risk_periods) > 0.5:
power_level = "High"
explanation.append("Power level set to High due to sustained fog risk in forecast")
return power_level, duration, explanation
def integrated_prediction(image, location):
"""Main function to process image and weather data"""
try:
# Get image prediction
image_prediction = predict_image(image)
# Get current weather
current_weather = get_weather_data(location)
# Get forecast
forecast_data = get_forecast_data(location)
# Determine transmission power
power_level, duration, explanation = determine_transmission_power(
image_prediction,
current_weather,
forecast_data
)
# Format result
result = [
f"Current Conditions ({current_weather['timestamp'].strftime('%Y-%m-%d %H:%M')})",
f"Temperature: {current_weather['temperature']:.1f}°C",
f"Humidity: {current_weather['humidity']}%",
f"Wind Speed: {current_weather['wind_speed']} m/s",
"",
"Analysis Results:",
*explanation,
"",
f"Recommended Power Level: {power_level}",
f"Duration: {format_duration(duration)}",
"",
"5-Day Forecast Summary:"
]
# Add daily forecast summary
current_date = current_weather['timestamp'].date()
for day in range(5):
forecast_date = current_date + timedelta(days=day)
day_forecasts = [f for f in forecast_data if f['timestamp'].date() == forecast_date]
if day_forecasts:
avg_risk = sum(calculate_fog_risk_score(f) for f in day_forecasts) / len(day_forecasts)
result.append(f"{forecast_date.strftime('%Y-%m-%d')}: "
f"Fog Risk: {'High' if avg_risk > 0.6 else 'Moderate' if avg_risk > 0.3 else 'Low'} "
f"({avg_risk:.2f})")
return "\n".join(result)
except Exception as e:
return f"Error: {str(e)}"
# Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# Enhanced Fog Prediction and Transmission Power System")
with gr.Row():
image_input = gr.Image(label="Upload Current Conditions Image")
location_input = gr.Textbox(label="Enter Location")
predict_button = gr.Button("Analyze and Determine Transmission Power")
output = gr.Textbox(label="Analysis Results", lines=15)
predict_button.click(integrated_prediction, inputs=[image_input, location_input], outputs=output)
demo.launch()