File size: 9,341 Bytes
d45488b ce1be2b d45488b 237d44d d45488b ce1be2b d45488b ce1be2b d45488b ce1be2b d45488b ce1be2b d45488b ce1be2b 134fb84 ce1be2b 134fb84 ce1be2b d45488b ce1be2b d45488b ce1be2b d45488b ce1be2b 134fb84 ce1be2b d45488b ce1be2b d45488b ce1be2b d45488b ce1be2b d45488b |
|
import gradio as gr
import requests
import pandas as pd
from sklearn.linear_model import LogisticRegression
from fastai.vision.all import *
import os
from datetime import datetime, timedelta
# Load the trained model for image-based fog classification
learn = load_learner('fog_classifier.pkl')
labels = learn.dls.vocab
API_KEY = os.environ.get("OPENWEATHER_API_KEY")
BASE_URL = 'https://api.openweathermap.org/data/2.5/'
def predict_image(img):
"""Predict fog conditions from image and return confidence scores"""
img = PILImage.create(img)
img = img.resize((512, 512))
pred, pred_idx, probs = learn.predict(img)
return {labels[i]: float(probs[i]) for i in range(len(labels))}
def calculate_fog_risk_score(weather_data):
"""Calculate a fog risk score (0-1) based on weather conditions"""
# Normalized weights for each factor
weights = {
'humidity': 0.3,
'dew_point_temp_diff': 0.3,
'visibility': 0.2,
'wind_speed': 0.1,
'pressure_change': 0.1
}
# Calculate dew point
dew_point = weather_data['temperature'] - ((100 - weather_data['humidity']) / 5.0)
dew_point_temp_diff = abs(weather_data['temperature'] - dew_point)
# Normalize each factor to 0-1 scale
humidity_score = min(weather_data['humidity'] / 100, 1)
dew_point_score = 1 - min(dew_point_temp_diff / 5, 1) # Closer to dew point = higher score
visibility_score = 1 - min(weather_data['visibility'] / 10, 1) # Lower visibility = higher score
wind_score = 1 - min(weather_data['wind_speed'] / 10, 1) # Lower wind = higher score
pressure_score = min(abs(weather_data['pressure'] - 1013.25) / 50, 1) # Deviation from normal pressure
# Calculate weighted score
fog_risk = (
weights['humidity'] * humidity_score +
weights['dew_point_temp_diff'] * dew_point_score +
weights['visibility'] * visibility_score +
weights['wind_speed'] * wind_score +
weights['pressure_change'] * pressure_score
)
return fog_risk
def get_weather_data(location):
"""Get current weather data with enhanced error handling"""
try:
current_weather_url = f'{BASE_URL}weather?q={location}&appid={API_KEY}&units=metric'
response = requests.get(current_weather_url)
response.raise_for_status()
data = response.json()
return {
'temperature': data['main'].get('temp', 0),
'feels_like': data['main'].get('feels_like', 0),
'description': data['weather'][0].get('description', ''),
'wind_speed': data['wind'].get('speed', 0),
'pressure': data['main'].get('pressure', 0),
'humidity': data['main'].get('humidity', 0),
'visibility': data.get('visibility', 10000) / 1000,
'timestamp': datetime.fromtimestamp(data['dt'])
}
except requests.exceptions.RequestException as e:
raise Exception(f"Failed to fetch weather data: {str(e)}")
def get_forecast_data(location):
"""Get 5-day forecast with enhanced error handling"""
try:
forecast_url = f'{BASE_URL}forecast?q={location}&appid={API_KEY}&units=metric'
response = requests.get(forecast_url)
response.raise_for_status()
data = response.json()
forecasts = []
for item in data['list']:
forecasts.append({
'temperature': item['main'].get('temp', 0),
'humidity': item['main'].get('humidity', 0),
'description': item['weather'][0].get('description', ''),
'wind_speed': item['wind'].get('speed', 0),
'pressure': item['main'].get('pressure', 0),
'visibility': item.get('visibility', 10000) / 1000,
'timestamp': datetime.fromtimestamp(item['dt'])
})
return forecasts
except requests.exceptions.RequestException as e:
raise Exception(f"Failed to fetch forecast data: {str(e)}")
def format_duration(duration):
"""Format timedelta into days and hours string"""
total_hours = duration.total_seconds() / 3600
days = int(total_hours // 24)
hours = int(total_hours % 24)
if days > 0:
return f"{days} days and {hours} hours"
return f"{hours} hours"
def determine_transmission_power(image_prediction, weather_data, forecast_data=None):
"""
Determine transmission power based on current conditions and forecast
Returns: (power_level, duration, explanation)
"""
# Get fog confidence from image
image_fog_confidence = max(
image_prediction.get('Dense_Fog', 0),
image_prediction.get('Moderate_Fog', 0) * 0.6
)
# Calculate weather-based fog risk
current_fog_risk = calculate_fog_risk_score(weather_data)
# Combine image and weather predictions with weighted average
# Give slightly more weight to image prediction as it's more reliable
combined_fog_risk = (image_fog_confidence * 0.6) + (current_fog_risk * 0.4)
# Initialize explanation
explanation = []
# Determine base power level from current conditions
if combined_fog_risk > 0.7:
power_level = "High"
explanation.append(f"High fog risk detected (Risk score: {combined_fog_risk:.2f})")
elif combined_fog_risk > 0.4:
power_level = "Medium"
explanation.append(f"Moderate fog risk detected (Risk score: {combined_fog_risk:.2f})")
else:
power_level = "Normal"
explanation.append(f"Low fog risk detected (Risk score: {combined_fog_risk:.2f})")
# Analyze forecast data if available
duration = timedelta(hours=1) # Default duration
if forecast_data:
future_risks = []
for forecast in forecast_data[:40]: # 5 days of 3-hour forecasts
risk = calculate_fog_risk_score(forecast)
future_risks.append(risk)
# Find periods of high risk
high_risk_periods = [risk > 0.6 for risk in future_risks]
if any(high_risk_periods):
# Find the last high-risk timestamp
last_high_risk_idx = len(high_risk_periods) - 1 - high_risk_periods[::-1].index(True)
duration = forecast_data[last_high_risk_idx]['timestamp'] - weather_data['timestamp']
explanation.append(f"High fog risk predicted to continue for {format_duration(duration)}")
# Adjust power level based on forecast
if sum(high_risk_periods) / len(high_risk_periods) > 0.5:
power_level = "High"
explanation.append("Power level set to High due to sustained fog risk in forecast")
return power_level, duration, explanation
def integrated_prediction(image, location):
"""Main function to process image and weather data"""
try:
# Get image prediction
image_prediction = predict_image(image)
# Get current weather
current_weather = get_weather_data(location)
# Get forecast
forecast_data = get_forecast_data(location)
# Determine transmission power
power_level, duration, explanation = determine_transmission_power(
image_prediction,
current_weather,
forecast_data
)
# Format result
result = [
f"Current Conditions ({current_weather['timestamp'].strftime('%Y-%m-%d %H:%M')})",
f"Temperature: {current_weather['temperature']:.1f}°C",
f"Humidity: {current_weather['humidity']}%",
f"Visibility: {current_weather['visibility']:.1f} km",
f"Wind Speed: {current_weather['wind_speed']} m/s",
"",
"Analysis Results:",
*explanation,
"",
f"Recommended Power Level: {power_level}",
f"Duration: {format_duration(duration)}",
"",
"5-Day Forecast Summary:"
]
# Add daily forecast summary
current_date = current_weather['timestamp'].date()
for day in range(5):
forecast_date = current_date + timedelta(days=day)
day_forecasts = [f for f in forecast_data if f['timestamp'].date() == forecast_date]
if day_forecasts:
avg_risk = sum(calculate_fog_risk_score(f) for f in day_forecasts) / len(day_forecasts)
result.append(f"{forecast_date.strftime('%Y-%m-%d')}: "
f"Fog Risk: {'High' if avg_risk > 0.6 else 'Moderate' if avg_risk > 0.3 else 'Low'} "
f"({avg_risk:.2f})")
return "\n".join(result)
except Exception as e:
return f"Error: {str(e)}"
# Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# Enhanced Fog Prediction and Transmission Power System")
with gr.Row():
image_input = gr.Image(label="Upload Current Conditions Image")
location_input = gr.Textbox(label="Enter Location")
predict_button = gr.Button("Analyze and Determine Transmission Power")
output = gr.Textbox(label="Analysis Results", lines=15)
predict_button.click(integrated_prediction, inputs=[image_input, location_input], outputs=output)
demo.launch() |