File size: 9,341 Bytes
d45488b ce1be2b d45488b 237d44d d45488b ce1be2b d45488b ce1be2b d45488b ce1be2b d45488b ce1be2b d45488b ce1be2b 134fb84 ce1be2b 134fb84 ce1be2b d45488b ce1be2b d45488b ce1be2b d45488b ce1be2b 134fb84 ce1be2b d45488b ce1be2b d45488b ce1be2b d45488b ce1be2b d45488b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
import gradio as gr
import requests
import pandas as pd
from sklearn.linear_model import LogisticRegression
from fastai.vision.all import *
import os
from datetime import datetime, timedelta
# Load the trained model for image-based fog classification
learn = load_learner('fog_classifier.pkl')
labels = learn.dls.vocab
API_KEY = os.environ.get("OPENWEATHER_API_KEY")
BASE_URL = 'https://api.openweathermap.org/data/2.5/'
def predict_image(img):
"""Predict fog conditions from image and return confidence scores"""
img = PILImage.create(img)
img = img.resize((512, 512))
pred, pred_idx, probs = learn.predict(img)
return {labels[i]: float(probs[i]) for i in range(len(labels))}
def calculate_fog_risk_score(weather_data):
"""Calculate a fog risk score (0-1) based on weather conditions"""
# Normalized weights for each factor
weights = {
'humidity': 0.3,
'dew_point_temp_diff': 0.3,
'visibility': 0.2,
'wind_speed': 0.1,
'pressure_change': 0.1
}
# Calculate dew point
dew_point = weather_data['temperature'] - ((100 - weather_data['humidity']) / 5.0)
dew_point_temp_diff = abs(weather_data['temperature'] - dew_point)
# Normalize each factor to 0-1 scale
humidity_score = min(weather_data['humidity'] / 100, 1)
dew_point_score = 1 - min(dew_point_temp_diff / 5, 1) # Closer to dew point = higher score
visibility_score = 1 - min(weather_data['visibility'] / 10, 1) # Lower visibility = higher score
wind_score = 1 - min(weather_data['wind_speed'] / 10, 1) # Lower wind = higher score
pressure_score = min(abs(weather_data['pressure'] - 1013.25) / 50, 1) # Deviation from normal pressure
# Calculate weighted score
fog_risk = (
weights['humidity'] * humidity_score +
weights['dew_point_temp_diff'] * dew_point_score +
weights['visibility'] * visibility_score +
weights['wind_speed'] * wind_score +
weights['pressure_change'] * pressure_score
)
return fog_risk
def get_weather_data(location):
"""Get current weather data with enhanced error handling"""
try:
current_weather_url = f'{BASE_URL}weather?q={location}&appid={API_KEY}&units=metric'
response = requests.get(current_weather_url)
response.raise_for_status()
data = response.json()
return {
'temperature': data['main'].get('temp', 0),
'feels_like': data['main'].get('feels_like', 0),
'description': data['weather'][0].get('description', ''),
'wind_speed': data['wind'].get('speed', 0),
'pressure': data['main'].get('pressure', 0),
'humidity': data['main'].get('humidity', 0),
'visibility': data.get('visibility', 10000) / 1000,
'timestamp': datetime.fromtimestamp(data['dt'])
}
except requests.exceptions.RequestException as e:
raise Exception(f"Failed to fetch weather data: {str(e)}")
def get_forecast_data(location):
"""Get 5-day forecast with enhanced error handling"""
try:
forecast_url = f'{BASE_URL}forecast?q={location}&appid={API_KEY}&units=metric'
response = requests.get(forecast_url)
response.raise_for_status()
data = response.json()
forecasts = []
for item in data['list']:
forecasts.append({
'temperature': item['main'].get('temp', 0),
'humidity': item['main'].get('humidity', 0),
'description': item['weather'][0].get('description', ''),
'wind_speed': item['wind'].get('speed', 0),
'pressure': item['main'].get('pressure', 0),
'visibility': item.get('visibility', 10000) / 1000,
'timestamp': datetime.fromtimestamp(item['dt'])
})
return forecasts
except requests.exceptions.RequestException as e:
raise Exception(f"Failed to fetch forecast data: {str(e)}")
def format_duration(duration):
"""Format timedelta into days and hours string"""
total_hours = duration.total_seconds() / 3600
days = int(total_hours // 24)
hours = int(total_hours % 24)
if days > 0:
return f"{days} days and {hours} hours"
return f"{hours} hours"
def determine_transmission_power(image_prediction, weather_data, forecast_data=None):
"""
Determine transmission power based on current conditions and forecast
Returns: (power_level, duration, explanation)
"""
# Get fog confidence from image
image_fog_confidence = max(
image_prediction.get('Dense_Fog', 0),
image_prediction.get('Moderate_Fog', 0) * 0.6
)
# Calculate weather-based fog risk
current_fog_risk = calculate_fog_risk_score(weather_data)
# Combine image and weather predictions with weighted average
# Give slightly more weight to image prediction as it's more reliable
combined_fog_risk = (image_fog_confidence * 0.6) + (current_fog_risk * 0.4)
# Initialize explanation
explanation = []
# Determine base power level from current conditions
if combined_fog_risk > 0.7:
power_level = "High"
explanation.append(f"High fog risk detected (Risk score: {combined_fog_risk:.2f})")
elif combined_fog_risk > 0.4:
power_level = "Medium"
explanation.append(f"Moderate fog risk detected (Risk score: {combined_fog_risk:.2f})")
else:
power_level = "Normal"
explanation.append(f"Low fog risk detected (Risk score: {combined_fog_risk:.2f})")
# Analyze forecast data if available
duration = timedelta(hours=1) # Default duration
if forecast_data:
future_risks = []
for forecast in forecast_data[:40]: # 5 days of 3-hour forecasts
risk = calculate_fog_risk_score(forecast)
future_risks.append(risk)
# Find periods of high risk
high_risk_periods = [risk > 0.6 for risk in future_risks]
if any(high_risk_periods):
# Find the last high-risk timestamp
last_high_risk_idx = len(high_risk_periods) - 1 - high_risk_periods[::-1].index(True)
duration = forecast_data[last_high_risk_idx]['timestamp'] - weather_data['timestamp']
explanation.append(f"High fog risk predicted to continue for {format_duration(duration)}")
# Adjust power level based on forecast
if sum(high_risk_periods) / len(high_risk_periods) > 0.5:
power_level = "High"
explanation.append("Power level set to High due to sustained fog risk in forecast")
return power_level, duration, explanation
def integrated_prediction(image, location):
"""Main function to process image and weather data"""
try:
# Get image prediction
image_prediction = predict_image(image)
# Get current weather
current_weather = get_weather_data(location)
# Get forecast
forecast_data = get_forecast_data(location)
# Determine transmission power
power_level, duration, explanation = determine_transmission_power(
image_prediction,
current_weather,
forecast_data
)
# Format result
result = [
f"Current Conditions ({current_weather['timestamp'].strftime('%Y-%m-%d %H:%M')})",
f"Temperature: {current_weather['temperature']:.1f}°C",
f"Humidity: {current_weather['humidity']}%",
f"Visibility: {current_weather['visibility']:.1f} km",
f"Wind Speed: {current_weather['wind_speed']} m/s",
"",
"Analysis Results:",
*explanation,
"",
f"Recommended Power Level: {power_level}",
f"Duration: {format_duration(duration)}",
"",
"5-Day Forecast Summary:"
]
# Add daily forecast summary
current_date = current_weather['timestamp'].date()
for day in range(5):
forecast_date = current_date + timedelta(days=day)
day_forecasts = [f for f in forecast_data if f['timestamp'].date() == forecast_date]
if day_forecasts:
avg_risk = sum(calculate_fog_risk_score(f) for f in day_forecasts) / len(day_forecasts)
result.append(f"{forecast_date.strftime('%Y-%m-%d')}: "
f"Fog Risk: {'High' if avg_risk > 0.6 else 'Moderate' if avg_risk > 0.3 else 'Low'} "
f"({avg_risk:.2f})")
return "\n".join(result)
except Exception as e:
return f"Error: {str(e)}"
# Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# Enhanced Fog Prediction and Transmission Power System")
with gr.Row():
image_input = gr.Image(label="Upload Current Conditions Image")
location_input = gr.Textbox(label="Enter Location")
predict_button = gr.Button("Analyze and Determine Transmission Power")
output = gr.Textbox(label="Analysis Results", lines=15)
predict_button.click(integrated_prediction, inputs=[image_input, location_input], outputs=output)
demo.launch() |