File size: 9,341 Bytes
d45488b
 
 
 
 
 
ce1be2b
d45488b
 
237d44d
d45488b
 
 
 
 
 
ce1be2b
d45488b
 
 
 
 
ce1be2b
 
 
 
 
 
 
 
 
d45488b
ce1be2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d45488b
ce1be2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d45488b
ce1be2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
134fb84
 
 
 
 
 
 
 
 
 
ce1be2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
134fb84
ce1be2b
 
 
 
 
 
 
d45488b
ce1be2b
 
d45488b
ce1be2b
 
 
 
d45488b
ce1be2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
134fb84
ce1be2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d45488b
 
 
 
 
ce1be2b
 
d45488b
ce1be2b
d45488b
ce1be2b
 
 
 
d45488b
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import gradio as gr
import requests
import pandas as pd
from sklearn.linear_model import LogisticRegression
from fastai.vision.all import *
import os
from datetime import datetime, timedelta

# Load the trained model for image-based fog classification
learn = load_learner('fog_classifier.pkl')
labels = learn.dls.vocab

API_KEY = os.environ.get("OPENWEATHER_API_KEY")
BASE_URL = 'https://api.openweathermap.org/data/2.5/'

def predict_image(img):
    """Predict fog conditions from image and return confidence scores"""
    img = PILImage.create(img)
    img = img.resize((512, 512))
    pred, pred_idx, probs = learn.predict(img)
    return {labels[i]: float(probs[i]) for i in range(len(labels))}

def calculate_fog_risk_score(weather_data):
    """Calculate a fog risk score (0-1) based on weather conditions"""
    # Normalized weights for each factor
    weights = {
        'humidity': 0.3,
        'dew_point_temp_diff': 0.3,
        'visibility': 0.2,
        'wind_speed': 0.1,
        'pressure_change': 0.1
    }
    
    # Calculate dew point
    dew_point = weather_data['temperature'] - ((100 - weather_data['humidity']) / 5.0)
    dew_point_temp_diff = abs(weather_data['temperature'] - dew_point)
    
    # Normalize each factor to 0-1 scale
    humidity_score = min(weather_data['humidity'] / 100, 1)
    dew_point_score = 1 - min(dew_point_temp_diff / 5, 1)  # Closer to dew point = higher score
    visibility_score = 1 - min(weather_data['visibility'] / 10, 1)  # Lower visibility = higher score
    wind_score = 1 - min(weather_data['wind_speed'] / 10, 1)  # Lower wind = higher score
    pressure_score = min(abs(weather_data['pressure'] - 1013.25) / 50, 1)  # Deviation from normal pressure
    
    # Calculate weighted score
    fog_risk = (
        weights['humidity'] * humidity_score +
        weights['dew_point_temp_diff'] * dew_point_score +
        weights['visibility'] * visibility_score +
        weights['wind_speed'] * wind_score +
        weights['pressure_change'] * pressure_score
    )
    
    return fog_risk

def get_weather_data(location):
    """Get current weather data with enhanced error handling"""
    try:
        current_weather_url = f'{BASE_URL}weather?q={location}&appid={API_KEY}&units=metric'
        response = requests.get(current_weather_url)
        response.raise_for_status()
        data = response.json()
        
        return {
            'temperature': data['main'].get('temp', 0),
            'feels_like': data['main'].get('feels_like', 0),
            'description': data['weather'][0].get('description', ''),
            'wind_speed': data['wind'].get('speed', 0),
            'pressure': data['main'].get('pressure', 0),
            'humidity': data['main'].get('humidity', 0),
            'visibility': data.get('visibility', 10000) / 1000,
            'timestamp': datetime.fromtimestamp(data['dt'])
        }
    except requests.exceptions.RequestException as e:
        raise Exception(f"Failed to fetch weather data: {str(e)}")

def get_forecast_data(location):
    """Get 5-day forecast with enhanced error handling"""
    try:
        forecast_url = f'{BASE_URL}forecast?q={location}&appid={API_KEY}&units=metric'
        response = requests.get(forecast_url)
        response.raise_for_status()
        data = response.json()
        
        forecasts = []
        for item in data['list']:
            forecasts.append({
                'temperature': item['main'].get('temp', 0),
                'humidity': item['main'].get('humidity', 0),
                'description': item['weather'][0].get('description', ''),
                'wind_speed': item['wind'].get('speed', 0),
                'pressure': item['main'].get('pressure', 0),
                'visibility': item.get('visibility', 10000) / 1000,
                'timestamp': datetime.fromtimestamp(item['dt'])
            })
        return forecasts
    except requests.exceptions.RequestException as e:
        raise Exception(f"Failed to fetch forecast data: {str(e)}")

def format_duration(duration):
    """Format timedelta into days and hours string"""
    total_hours = duration.total_seconds() / 3600
    days = int(total_hours // 24)
    hours = int(total_hours % 24)
    
    if days > 0:
        return f"{days} days and {hours} hours"
    return f"{hours} hours"

def determine_transmission_power(image_prediction, weather_data, forecast_data=None):
    """
    Determine transmission power based on current conditions and forecast
    Returns: (power_level, duration, explanation)
    """
    # Get fog confidence from image
    image_fog_confidence = max(
        image_prediction.get('Dense_Fog', 0),
        image_prediction.get('Moderate_Fog', 0) * 0.6
    )
    
    # Calculate weather-based fog risk
    current_fog_risk = calculate_fog_risk_score(weather_data)
    
    # Combine image and weather predictions with weighted average
    # Give slightly more weight to image prediction as it's more reliable
    combined_fog_risk = (image_fog_confidence * 0.6) + (current_fog_risk * 0.4)
    
    # Initialize explanation
    explanation = []
    
    # Determine base power level from current conditions
    if combined_fog_risk > 0.7:
        power_level = "High"
        explanation.append(f"High fog risk detected (Risk score: {combined_fog_risk:.2f})")
    elif combined_fog_risk > 0.4:
        power_level = "Medium"
        explanation.append(f"Moderate fog risk detected (Risk score: {combined_fog_risk:.2f})")
    else:
        power_level = "Normal"
        explanation.append(f"Low fog risk detected (Risk score: {combined_fog_risk:.2f})")
    
    # Analyze forecast data if available
    duration = timedelta(hours=1)  # Default duration
    if forecast_data:
        future_risks = []
        for forecast in forecast_data[:40]:  # 5 days of 3-hour forecasts
            risk = calculate_fog_risk_score(forecast)
            future_risks.append(risk)
            
        # Find periods of high risk
        high_risk_periods = [risk > 0.6 for risk in future_risks]
        if any(high_risk_periods):
            # Find the last high-risk timestamp
            last_high_risk_idx = len(high_risk_periods) - 1 - high_risk_periods[::-1].index(True)
            duration = forecast_data[last_high_risk_idx]['timestamp'] - weather_data['timestamp']
            
            explanation.append(f"High fog risk predicted to continue for {format_duration(duration)}")
            
            # Adjust power level based on forecast
            if sum(high_risk_periods) / len(high_risk_periods) > 0.5:
                power_level = "High"
                explanation.append("Power level set to High due to sustained fog risk in forecast")
    
    return power_level, duration, explanation

def integrated_prediction(image, location):
    """Main function to process image and weather data"""
    try:
        # Get image prediction
        image_prediction = predict_image(image)
        
        # Get current weather
        current_weather = get_weather_data(location)
        
        # Get forecast
        forecast_data = get_forecast_data(location)
        
        # Determine transmission power
        power_level, duration, explanation = determine_transmission_power(
            image_prediction,
            current_weather,
            forecast_data
        )
        
        # Format result
        result = [
            f"Current Conditions ({current_weather['timestamp'].strftime('%Y-%m-%d %H:%M')})",
            f"Temperature: {current_weather['temperature']:.1f}°C",
            f"Humidity: {current_weather['humidity']}%",
            f"Visibility: {current_weather['visibility']:.1f} km",
            f"Wind Speed: {current_weather['wind_speed']} m/s",
            "",
            "Analysis Results:",
            *explanation,
            "",
            f"Recommended Power Level: {power_level}",
            f"Duration: {format_duration(duration)}",
            "",
            "5-Day Forecast Summary:"
        ]
        
        # Add daily forecast summary
        current_date = current_weather['timestamp'].date()
        for day in range(5):
            forecast_date = current_date + timedelta(days=day)
            day_forecasts = [f for f in forecast_data if f['timestamp'].date() == forecast_date]
            
            if day_forecasts:
                avg_risk = sum(calculate_fog_risk_score(f) for f in day_forecasts) / len(day_forecasts)
                result.append(f"{forecast_date.strftime('%Y-%m-%d')}: "
                            f"Fog Risk: {'High' if avg_risk > 0.6 else 'Moderate' if avg_risk > 0.3 else 'Low'} "
                            f"({avg_risk:.2f})")
        
        return "\n".join(result)
        
    except Exception as e:
        return f"Error: {str(e)}"

# Gradio interface
with gr.Blocks() as demo:
    gr.Markdown("# Enhanced Fog Prediction and Transmission Power System")
    
    with gr.Row():
        image_input = gr.Image(label="Upload Current Conditions Image")
        location_input = gr.Textbox(label="Enter Location")
    
    predict_button = gr.Button("Analyze and Determine Transmission Power")
    output = gr.Textbox(label="Analysis Results", lines=15)
    
    predict_button.click(integrated_prediction, inputs=[image_input, location_input], outputs=output)

demo.launch()