Source code for Dust_Storm_Modules

"""
Dust_Storm_Modules
------------------
This module includes processors for MERRA-2 dust and AODANA data,
storm detection, fusion storm tracking, and monthly analysis.

Classes:
- MERRA2AODProcessor
- MERRA2AODANAProcessor
- MonthlyDustAnalyzer
- StormDetector
- FusionStormDetector
"""

import os
import numpy as np
import pandas as pd
import xarray as xr
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from tqdm import tqdm
import requests
from tqdm.auto import tqdm as auto_tqdm
from geopy.distance import geodesic
import os
import pandas as pd
from datetime import datetime
from tqdm import tqdm
import os
import pandas as pd
import numpy as np
from scipy.ndimage import label
from tqdm import tqdm
import os
import numpy as np
import pandas as pd
from tqdm import tqdm
from scipy.ndimage import label, center_of_mass
from skimage.measure import regionprops
import os
import subprocess
import shutil
import pandas as pd
from tqdm import tqdm

[docs] class MERRA2AODProcessor: def __init__(self, start_date, end_date, region_bounds, output_dir="data/merra2"): self.start_date = start_date self.end_date = end_date self.lat_min, self.lat_max, self.lon_min, self.lon_max = region_bounds if self.lon_min < 0: self.lon_min += 360 if self.lon_max < 0: self.lon_max += 360 self.output_dir = output_dir os.makedirs(self.output_dir, exist_ok=True) os.environ["NETRC"] = r"C:\\Users\\y46144ma\\login_netrc"
[docs] def download_files(self): print(" Downloading MERRA-2 NetCDF files...") home = os.path.expanduser("~") # Prefer Windows-style _netrc; fall back to .netrc if that's what you created netrc_candidates = [os.path.join(home, "_netrc"), os.path.join(home, ".netrc")] netrc_path = next((p for p in netrc_candidates if os.path.exists(p)), None) cookies_path = os.path.join(home, ".urs_cookies") os.makedirs(self.output_dir, exist_ok=True) env = os.environ.copy() env["NETRC"] = r"C:\Users\y46144ma\login_netrc" def have(cmd): return shutil.which(cmd) is not None use_curl = have("curl") use_wget = have("wget") if not use_curl and not use_wget: raise RuntimeError("Neither 'curl' nor 'wget' found on PATH.") if use_curl and netrc_path: raise RuntimeError( "No netrc file found. On Windows create C:\\Users\\<you>\\_netrc " "or use .netrc and pass --netrc-file." ) for date in tqdm(pd.date_range(self.start_date, self.end_date), desc="Downloading"): year, month, day = date.year, date.month, date.day base_url = "https://goldsmr4.gesdisc.eosdis.nasa.gov/data/MERRA2/M2T1NXAER.5.12.4" filename = f"MERRA2_400.tavg1_2d_aer_Nx.{year}{month:02d}{day:02d}.nc4" url = f"{base_url}/{year}/{month:02d}/{filename}" file_path = os.path.join(self.output_dir, filename) if os.path.exists(file_path): print(f" File already exists: {filename}") continue print(f" Downloading {filename} ...") try: if use_curl: # Use explicit --netrc-file to work with either _netrc or .netrc cmd = [ "curl", "-L", "--netrc-file",r"C:\Users\y46144ma\login_netrc", "--retry", "3", "--retry-delay", "2", "-c", cookies_path, "-b", cookies_path, url, "-o", file_path, ] else: cmd = [ "wget", "--load-cookies", cookies_path, "--save-cookies", cookies_path, "--keep-session-cookies", "--tries", "3", "--waitretry", "2", url, "-O", file_path, ] subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: print(f" Download failed for {filename}: {e}")
[docs] def convert_to_csv(self): print(" Converting NetCDF files to CSV...") files = sorted([f for f in os.listdir(self.output_dir) if f.endswith(".nc4")]) # Skip if the CSV already exists for filename in tqdm(files, desc="Converting"): try: output_csv = os.path.join( self.output_dir, f"{os.path.splitext(filename)[0]}.csv" ) if os.path.exists(output_csv): print(f"CSV already exists for {filename}, skipping.") continue file_path = os.path.join(self.output_dir, filename) ds = xr.open_dataset(file_path, engine="netcdf4") aod = ds["DUSMASS"][:, :, :] # shape: (time, lat, lon) aod_region = aod.sel( lat=slice(self.lat_min, self.lat_max), lon=slice(self.lon_min, self.lon_max), ) lat_vals = aod_region["lat"].values lon_vals = aod_region["lon"].values time_vals = aod_region["time"].values data = aod_region.values # shape: (time, lat, lon) rows = [] for t_idx, t in enumerate(time_vals): for i, lat in enumerate(lat_vals): for j, lon in enumerate(lon_vals): val = data[t_idx, i, j] if np.isfinite(val): rows.append( [ pd.to_datetime(str(t)).strftime( "%d/%m/%Y %H:%M" ), lat, lon, val, ] ) df = pd.DataFrame( rows, columns=["time", "latitude", "longitude", "dust_mass"], ) out_csv = os.path.join( self.output_dir, f"{os.path.splitext(filename)[0]}.csv" ) df.to_csv(out_csv, index=False) except Exception as e: print(f" Error processing {filename}: {e}")
[docs] class MERRA2AODANAProcessor: def __init__(self, start_date, end_date, region_bounds, output_dir): self.start_date = start_date self.end_date = end_date self.lat_min, self.lat_max, self.lon_min, self.lon_max = region_bounds if self.lon_min < 0: self.lon_min += 360 if self.lon_max < 0: self.lon_max += 360 self.output_dir = output_dir os.makedirs(self.output_dir, exist_ok=True) os.environ["NETRC"] = r"C:\\Users\\aqeel\\login_netrc"
[docs] def download_files(self): print(" Downloading MERRA-2 AODANA data (M2I3NXGAS)...") for date in tqdm( pd.date_range(self.start_date, self.end_date), desc="Downloading AODANA" ): year, month, day = date.year, date.month, date.day base_url = ( "https://goldsmr4.gesdisc.eosdis.nasa.gov/data/MERRA2/M2I3NXGAS.5.12.4" ) filename = f"MERRA2_400.inst3_2d_gas_Nx.{year}{month:02d}{day:02d}.nc4" url = f"{base_url}/{year}/{month:02d}/{filename}" file_path = os.path.join(self.output_dir, filename) if os.path.exists(file_path): continue try: with requests.Session() as session: session.trust_env = True response = session.get(url, stream=True, timeout=60) response.raise_for_status() with open(file_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) except Exception as e: print(f" Failed to download {filename}: {e}")
[docs] def process_files(self): print(" Processing AODANA data into CSV...") all_files = sorted( [f for f in os.listdir(self.output_dir) if f.endswith(".nc4")] ) for filename in tqdm(all_files, desc="Processing AOD Files"): output_csv = os.path.join( self.output_dir, f"{os.path.splitext(filename)[0]}.csv" ) if os.path.exists(output_csv): print(f"CSV already exists for {filename}, skipping conversion.") continue try: file_path = os.path.join(self.output_dir, filename) ds = xr.open_dataset(file_path) aod = ds["AODANA"][:, :, :] # time, lat, lon aod_region = aod.sel( lat=slice(self.lat_min, self.lat_max), lon=slice(self.lon_min, self.lon_max), ) time_vals = aod_region["time"].values lat_vals = aod_region["lat"].values lon_vals = aod_region["lon"].values data = aod_region.values # shape: (time, lat, lon) rows = [] for t_idx, t in enumerate(time_vals): for i, lat in enumerate(lat_vals): for j, lon in enumerate(lon_vals): val = data[t_idx, i, j] if pd.notna(val): rows.append( [ pd.to_datetime(str(t)).strftime( "%d/%m/%Y %H:%M" ), lat, lon, val, ] ) df = pd.DataFrame( rows, columns=["time", "latitude", "longitude", "AODANA"] ) csv_name = os.path.splitext(filename)[0] + ".csv" output_csv = os.path.join( self.output_dir, f"{os.path.splitext(filename)[0]}_AODANA.csv" ) if os.path.exists(output_csv): print(f"CSV already exists for {filename}, skipping conversion.") continue df.to_csv(os.path.join(self.output_dir, csv_name), index=False) except Exception as e: print(f" Error processing {filename}: {e}")
[docs] def run(self): self.download_files() self.process_files()
[docs] class MonthlyDustAnalyzer: def __init__(self, output_dir, project_name): self.output_dir = output_dir self.project_name = project_name self.all_data = []
[docs] def analyze(self, summary_csv_path): print("Analyzing dust data with time resolution...") for file in tqdm(sorted(os.listdir(self.output_dir))): if not file.endswith(".csv") or not file.startswith("MERRA2_400"): continue try: file_path = os.path.join(self.output_dir, file) df = pd.read_csv(file_path) if {"time", "latitude", "longitude", "dust_mass"}.issubset(df.columns): df["time"] = pd.to_datetime(df["time"], format="%d/%m/%Y %H:%M") self.all_data.append(df) except Exception as e: print(f"Error processing {file}: {e}") if not self.all_data: print("No valid CSVs found.") return # Combine all time-resolved entries full_df = pd.concat(self.all_data, ignore_index=True) # Extract hour + minute to preserve time-level patterns full_df["hour_minute"] = full_df["time"].dt.strftime("%H:%M") # Group by lat, lon, and time of day to calculate monthly mean and std stats_df = ( full_df.groupby(["latitude", "longitude", "hour_minute"])["dust_mass"] .agg(monthly_mean="mean", std_dev="std") .reset_index() ) # Merge back to get comparison at the same (lat, lon, hour:minute) full_df["hour_minute"] = full_df["time"].dt.strftime("%H:%M") merged = full_df.merge(stats_df, on=["latitude", "longitude", "hour_minute"]) # Apply 2× std threshold merged["threshold"] = merged["monthly_mean"] + 2 * merged["std_dev"] merged["threshold"] = 500/1e9 #This is setting a hard limit make sure to change this merged["above_monthly_avg"] = merged["dust_mass"] > merged["threshold"] # Save results output_filename = f"{self.project_name}_monthly_dust_summary.csv" output_path = os.path.join(os.path.dirname(summary_csv_path), output_filename) merged.to_csv(output_path, index=False) print(f" Time-resolved monthly summary saved to: {output_path}")
[docs] class StormDetector: def __init__(self, input_dir, project_name, airport_csv_path, csv_output_path): self.input_dir = input_dir self.project_name = project_name self.results = [] self.airport_csv_path = airport_csv_path self.csv_output_path = csv_output_path self.airports_df = pd.read_csv(airport_csv_path) self.airports_df = self.airports_df[self.airports_df["type"] == "large_airport"]
[docs] def detect_storms(self, threshold_factor=1): print("Detecting storm blobs and tracking across days...") storm_id_counter = 1 previous_day_blobs = [] for file in tqdm(sorted(os.listdir(self.input_dir))): if not file.endswith(".csv") or not file.startswith("MERRA2_400"): continue file_path = os.path.join(self.input_dir, file) df = pd.read_csv(file_path) if {"latitude", "longitude", "dust_mass"}.issubset(df.columns) is False: continue df["time"] = pd.to_datetime(df["time"], format="%d/%m/%Y %H:%M") grouped = ( df.groupby(["latitude", "longitude"])["dust_mass"].mean().reset_index() ) lat_vals = sorted(df["latitude"].unique()) lon_vals = sorted(df["longitude"].unique()) data_array = np.full((len(lat_vals), len(lon_vals)), np.nan) for _, row in grouped.iterrows(): i = lat_vals.index(row["latitude"]) j = lon_vals.index(row["longitude"]) data_array[i, j] = row["dust_mass"] mean = np.nanmean(data_array) std = np.nanstd(data_array) threshold = mean + threshold_factor * std mask = (data_array > threshold).astype(int) labeled, num_features = label(mask) day_results = [] for blob_id in range(1, num_features + 1): indices = np.argwhere(labeled == blob_id) if len(indices) == 0: continue lats = [lat_vals[i] for i, _ in indices] lons = [lon_vals[j] for _, j in indices] vals = [data_array[i, j] for i, j in indices] avg_mass = np.mean(vals) max_mass = np.max(vals) center_i, center_j = center_of_mass(mask, labeled, blob_id) center_lat = float(np.interp(center_i, range(len(lat_vals)), lat_vals)) center_lon = float(np.interp(center_j, range(len(lon_vals)), lon_vals)) airport_info = self._find_nearest_large_airport(center_lat, center_lon) if airport_info is None: continue icao, distance_km = airport_info if not (0.01 <= distance_km <= 80): continue best_match = None best_score = float("inf") for prev_blob in previous_day_blobs: prev_lat = prev_blob["center_lat"] prev_lon = prev_blob["center_lon"] prev_size = prev_blob["blob_size"] dist_km = geodesic( (prev_lat, prev_lon), (center_lat, center_lon) ).km size_diff = abs(len(indices) - prev_size) if dist_km < 50 and size_diff < 0.6 * prev_size: score = dist_km + 0.1 * size_diff if score < best_score: best_score = score best_match = prev_blob if best_match: matched_id = best_match["storm_id"] else: matched_id = storm_id_counter storm_id_counter += 1 blob_info = { "file": file, "storm_id": matched_id, "blob_size": len(indices), "avg_dust_mass": avg_mass, "max_dust_mass": max_mass, "center_lat": center_lat, "center_lon": center_lon, "nearest_airport_icao": icao, "distance_to_airport_km": distance_km, } day_results.append(blob_info) previous_day_blobs = day_results self.results.extend(day_results)
def _find_nearest_large_airport(self, lat, lon): min_dist = float("inf") nearest_airport = None for _, row in self.airports_df.iterrows(): airport_coords = (row["latitude_deg"], row["longitude_deg"]) dist = geodesic((lat, lon), airport_coords).km if dist < min_dist: min_dist = dist nearest_airport = row if nearest_airport is None: return None return nearest_airport["icao_code"], min_dist
[docs] def save_results(self): if not self.results: print("No storms tracked.") return df = pd.DataFrame(self.results) output_path = os.path.join( self.csv_output_path, f"{self.project_name}_DUSMASS_storm_lifecycle.csv" ) df.to_csv(output_path, index=False) print(f"Storm tracking results saved to {output_path}")
[docs] class FusionStormDetector: def __init__(self, aodana_dir, airport_csv_path, csv_output_path, project_name): self.aodana_dir = aodana_dir self.csv_output_path = csv_output_path self.project_name = project_name self.results = [] self.daily_records = [] self.storm_id_counter = 1 self.previous_day_blobs = [] self.airports_df = pd.read_csv(airport_csv_path) self.airports_df = self.airports_df[self.airports_df["type"] == "large_airport"]
[docs] def detect_storms(self): aod_files = sorted( [f for f in os.listdir(self.aodana_dir) if f.endswith(".csv")] ) for file in tqdm(aod_files, desc="AODANA Storm Tracking"): file_path = os.path.join(self.aodana_dir, file) aod_df = pd.read_csv(file_path) aod_df["time"] = pd.to_datetime(aod_df["time"], format="%d/%m/%Y %H:%M") self.daily_records.append(aod_df) filename_core = os.path.splitext(file)[0] parts = filename_core.split(".") date_str = parts[-1] if parts[-1].isdigit() else filename_core grouped = ( aod_df.groupby(["latitude", "longitude"])["AODANA"].mean().reset_index() ) lat_vals = sorted(grouped["latitude"].unique()) lon_vals = sorted(grouped["longitude"].unique()) aod_arr = np.full((len(lat_vals), len(lon_vals)), np.nan) for _, row in grouped.iterrows(): i = lat_vals.index(row["latitude"]) j = lon_vals.index(row["longitude"]) aod_arr[i, j] = row["AODANA"] if np.all(np.isnan(aod_arr)): print(f"{file}: Empty AOD array. Skipping.") continue aod_thresh = np.nanmean(aod_arr) + 1 * np.nanstd(aod_arr) aod_mask = aod_arr > aod_thresh labeled, num_features = label(aod_mask) regions = regionprops(labeled) day_results = [] for region in regions: coords = region.coords avg_aod = np.mean([aod_arr[i, j] for i, j in coords]) max_aod = np.max([aod_arr[i, j] for i, j in coords]) center_i, center_j = center_of_mass(aod_mask, labeled, region.label) center_lat = float( np.interp(center_i, np.arange(len(lat_vals)), lat_vals) ) center_lon = float( np.interp(center_j, np.arange(len(lon_vals)), lon_vals) ) airport_info = self._find_nearest_large_airport(center_lat, center_lon) if airport_info is None: continue icao, distance_km = airport_info if not (0.01 <= distance_km <= 80): continue best_match = None best_score = float("inf") for prev_blob in self.previous_day_blobs: prev_lat = prev_blob["center_lat"] prev_lon = prev_blob["center_lon"] prev_size = prev_blob["blob_size"] dist_km = geodesic( (prev_lat, prev_lon), (center_lat, center_lon) ).km size_diff = abs(region.area - prev_size) if dist_km < 50 and size_diff < 0.6 * prev_size: score = dist_km + 0.1 * size_diff # weighted score if score < best_score: best_score = score best_match = prev_blob if best_match: matched_id = best_match["storm_id"] else: matched_id = self.storm_id_counter self.storm_id_counter += 1 blob_info = { "date": date_str, "storm_id": matched_id, "blob_size": region.area, "avg_AODANA": avg_aod, "max_AODANA": max_aod, "center_lat": center_lat, "center_lon": center_lon, "nearest_airport_icao": icao, "distance_to_airport_km": distance_km, } day_results.append(blob_info) self.previous_day_blobs = day_results self.results.extend(day_results) self.save_results() self.generate_monthly_summary()
def _find_nearest_large_airport(self, lat, lon): min_dist = float("inf") nearest_airport = None for _, row in self.airports_df.iterrows(): airport_coords = (row["latitude_deg"], row["longitude_deg"]) dist = geodesic((lat, lon), airport_coords).km if dist < min_dist: min_dist = dist nearest_airport = row if nearest_airport is None: return None return nearest_airport["icao_code"], min_dist
[docs] def save_results(self): if not self.results: print("No storms detected.") return df = pd.DataFrame(self.results) output_path = os.path.join( self.csv_output_path, f"{self.project_name}_AOD_storm_lifecycle.csv" ) df.to_csv(output_path, index=False) print(f"AOD-based storm tracking results saved to {output_path}")
[docs] def generate_monthly_summary(self): print("Generating monthly AODANA summary...") if not self.daily_records: print("No fusion records to analyze.") return full_df = pd.concat(self.daily_records, ignore_index=True) full_df["hour_minute"] = full_df["time"].dt.strftime("%H:%M") stats_df = ( full_df.groupby("hour_minute") .agg( regional_mean_aod=("AODANA", "mean"), regional_std_aod=("AODANA", "std"), ) .reset_index() ) merged = full_df.merge(stats_df, on="hour_minute") merged["threshold_aod"] = ( merged["regional_mean_aod"] + 2 * merged["regional_std_aod"] ) merged["above_monthly_avg_aod"] = merged["AODANA"] > merged["threshold_aod"] output_filename = f"{self.project_name}_monthly_AODANA_summary.csv" output_path = os.path.join(self.csv_output_path, output_filename) merged.to_csv(output_path, index=False) print(f"Monthly AODANA summary saved to: {output_path}")