|
import gradio as gr |
|
import requests |
|
import pandas as pd |
|
from sklearn.linear_model import LogisticRegression |
|
from fastai.vision.all import * |
|
import os |
|
from datetime import datetime, timedelta |
|
|
|
|
|
learn = load_learner('fog_classifier.pkl') |
|
labels = learn.dls.vocab |
|
|
|
API_KEY = os.environ.get("OPENWEATHER_API_KEY") |
|
BASE_URL = 'https://api.openweathermap.org/data/2.5/' |
|
|
|
def predict_image(img): |
|
"""Predict fog conditions from image and return confidence scores""" |
|
img = PILImage.create(img) |
|
img = img.resize((512, 512)) |
|
pred, pred_idx, probs = learn.predict(img) |
|
return {labels[i]: float(probs[i]) for i in range(len(labels))} |
|
|
|
def calculate_fog_risk_score(weather_data): |
|
"""Calculate a fog risk score (0-1) based on weather conditions""" |
|
|
|
weights = { |
|
'humidity': 0.3, |
|
'dew_point_temp_diff': 0.3, |
|
'visibility': 0.2, |
|
'wind_speed': 0.1, |
|
'pressure_change': 0.1 |
|
} |
|
|
|
|
|
dew_point = weather_data['temperature'] - ((100 - weather_data['humidity']) / 5.0) |
|
dew_point_temp_diff = abs(weather_data['temperature'] - dew_point) |
|
|
|
|
|
humidity_score = min(weather_data['humidity'] / 100, 1) |
|
dew_point_score = 1 - min(dew_point_temp_diff / 5, 1) |
|
visibility_score = 1 - min(weather_data['visibility'] / 10, 1) |
|
wind_score = 1 - min(weather_data['wind_speed'] / 10, 1) |
|
pressure_score = min(abs(weather_data['pressure'] - 1013.25) / 50, 1) |
|
|
|
|
|
fog_risk = ( |
|
weights['humidity'] * humidity_score + |
|
weights['dew_point_temp_diff'] * dew_point_score + |
|
weights['visibility'] * visibility_score + |
|
weights['wind_speed'] * wind_score + |
|
weights['pressure_change'] * pressure_score |
|
) |
|
|
|
return fog_risk |
|
|
|
def get_weather_data(location): |
|
"""Get current weather data with enhanced error handling""" |
|
try: |
|
current_weather_url = f'{BASE_URL}weather?q={location}&appid={API_KEY}&units=metric' |
|
response = requests.get(current_weather_url) |
|
response.raise_for_status() |
|
data = response.json() |
|
|
|
return { |
|
'temperature': data['main'].get('temp', 0), |
|
'feels_like': data['main'].get('feels_like', 0), |
|
'description': data['weather'][0].get('description', ''), |
|
'wind_speed': data['wind'].get('speed', 0), |
|
'pressure': data['main'].get('pressure', 0), |
|
'humidity': data['main'].get('humidity', 0), |
|
'visibility': data.get('visibility', 10000) / 1000, |
|
'timestamp': datetime.fromtimestamp(data['dt']) |
|
} |
|
except requests.exceptions.RequestException as e: |
|
raise Exception(f"Failed to fetch weather data: {str(e)}") |
|
|
|
def get_forecast_data(location): |
|
"""Get 5-day forecast with enhanced error handling""" |
|
try: |
|
forecast_url = f'{BASE_URL}forecast?q={location}&appid={API_KEY}&units=metric' |
|
response = requests.get(forecast_url) |
|
response.raise_for_status() |
|
data = response.json() |
|
|
|
forecasts = [] |
|
for item in data['list']: |
|
forecasts.append({ |
|
'temperature': item['main'].get('temp', 0), |
|
'humidity': item['main'].get('humidity', 0), |
|
'description': item['weather'][0].get('description', ''), |
|
'wind_speed': item['wind'].get('speed', 0), |
|
'pressure': item['main'].get('pressure', 0), |
|
'visibility': item.get('visibility', 10000) / 1000, |
|
'timestamp': datetime.fromtimestamp(item['dt']) |
|
}) |
|
return forecasts |
|
except requests.exceptions.RequestException as e: |
|
raise Exception(f"Failed to fetch forecast data: {str(e)}") |
|
|
|
def format_duration(duration): |
|
"""Format timedelta into days and hours string""" |
|
total_hours = duration.total_seconds() / 3600 |
|
days = int(total_hours // 24) |
|
hours = int(total_hours % 24) |
|
|
|
if days > 0: |
|
return f"{days} days and {hours} hours" |
|
return f"{hours} hours" |
|
|
|
def determine_transmission_power(image_prediction, weather_data, forecast_data=None): |
|
""" |
|
Determine transmission power based on current conditions and forecast |
|
Returns: (power_level, duration, explanation) |
|
""" |
|
|
|
image_fog_confidence = max( |
|
image_prediction.get('Dense_Fog', 0), |
|
image_prediction.get('Moderate_Fog', 0) * 0.6 |
|
) |
|
|
|
|
|
current_fog_risk = calculate_fog_risk_score(weather_data) |
|
|
|
|
|
|
|
combined_fog_risk = (image_fog_confidence * 0.6) + (current_fog_risk * 0.4) |
|
|
|
|
|
explanation = [] |
|
|
|
|
|
if combined_fog_risk > 0.7: |
|
power_level = "High" |
|
explanation.append(f"High fog risk detected (Risk score: {combined_fog_risk:.2f})") |
|
elif combined_fog_risk > 0.4: |
|
power_level = "Medium" |
|
explanation.append(f"Moderate fog risk detected (Risk score: {combined_fog_risk:.2f})") |
|
else: |
|
power_level = "Normal" |
|
explanation.append(f"Low fog risk detected (Risk score: {combined_fog_risk:.2f})") |
|
|
|
|
|
duration = timedelta(hours=1) |
|
if forecast_data: |
|
future_risks = [] |
|
for forecast in forecast_data[:40]: |
|
risk = calculate_fog_risk_score(forecast) |
|
future_risks.append(risk) |
|
|
|
|
|
high_risk_periods = [risk > 0.6 for risk in future_risks] |
|
if any(high_risk_periods): |
|
|
|
last_high_risk_idx = len(high_risk_periods) - 1 - high_risk_periods[::-1].index(True) |
|
duration = forecast_data[last_high_risk_idx]['timestamp'] - weather_data['timestamp'] |
|
|
|
explanation.append(f"High fog risk predicted to continue for {format_duration(duration)}") |
|
|
|
|
|
if sum(high_risk_periods) / len(high_risk_periods) > 0.5: |
|
power_level = "High" |
|
explanation.append("Power level set to High due to sustained fog risk in forecast") |
|
|
|
return power_level, duration, explanation |
|
|
|
def integrated_prediction(image, location): |
|
"""Main function to process image and weather data""" |
|
try: |
|
|
|
image_prediction = predict_image(image) |
|
|
|
|
|
current_weather = get_weather_data(location) |
|
|
|
|
|
forecast_data = get_forecast_data(location) |
|
|
|
|
|
power_level, duration, explanation = determine_transmission_power( |
|
image_prediction, |
|
current_weather, |
|
forecast_data |
|
) |
|
|
|
|
|
result = [ |
|
f"Current Conditions ({current_weather['timestamp'].strftime('%Y-%m-%d %H:%M')})", |
|
f"Temperature: {current_weather['temperature']:.1f}°C", |
|
f"Humidity: {current_weather['humidity']}%", |
|
f"Visibility: {current_weather['visibility']:.1f} km", |
|
f"Wind Speed: {current_weather['wind_speed']} m/s", |
|
"", |
|
"Analysis Results:", |
|
*explanation, |
|
"", |
|
f"Recommended Power Level: {power_level}", |
|
f"Duration: {format_duration(duration)}", |
|
"", |
|
"5-Day Forecast Summary:" |
|
] |
|
|
|
|
|
current_date = current_weather['timestamp'].date() |
|
for day in range(5): |
|
forecast_date = current_date + timedelta(days=day) |
|
day_forecasts = [f for f in forecast_data if f['timestamp'].date() == forecast_date] |
|
|
|
if day_forecasts: |
|
avg_risk = sum(calculate_fog_risk_score(f) for f in day_forecasts) / len(day_forecasts) |
|
result.append(f"{forecast_date.strftime('%Y-%m-%d')}: " |
|
f"Fog Risk: {'High' if avg_risk > 0.6 else 'Moderate' if avg_risk > 0.3 else 'Low'} " |
|
f"({avg_risk:.2f})") |
|
|
|
return "\n".join(result) |
|
|
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Enhanced Fog Prediction and Transmission Power System") |
|
|
|
with gr.Row(): |
|
image_input = gr.Image(label="Upload Current Conditions Image") |
|
location_input = gr.Textbox(label="Enter Location") |
|
|
|
predict_button = gr.Button("Analyze and Determine Transmission Power") |
|
output = gr.Textbox(label="Analysis Results", lines=15) |
|
|
|
predict_button.click(integrated_prediction, inputs=[image_input, location_input], outputs=output) |
|
|
|
demo.launch() |