# mangrove_monitor.py import os import json import yaml import numpy as np import rasterio from rasterio.enums import Resampling from rasterio.mask import mask as rio_mask from rasterio.features import shapes from rasterio.warp import reproject, calculate_default_transform import geopandas as gpd from shapely.geometry import shape as shp_shape, Polygon, mapping import matplotlib.pyplot as plt from skimage.filters import threshold_otsu def read_cfg(path): with open(path, "r") as f: return yaml.safe_load(f) def load_band(path, target_profile=None, resampling=Resampling.bilinear): src = rasterio.open(path) arr = src.read(1).astype(np.float32) profile = src.profile if target_profile and (profile["transform"] != target_profile["transform"] or profile["crs"] != target_profile["crs"] or profile["width"] != target_profile["width"] or profile["height"] != target_profile["height"]): dst = np.empty((target_profile["height"], target_profile["width"]), dtype=np.float32) reproject( arr, dst, src_transform=profile["transform"], src_crs=profile["crs"], dst_transform=target_profile["transform"], dst_crs=target_profile["crs"], resampling=resampling, ) arr = dst profile = target_profile.copy() src.close() return arr, profile def build_target_profile(reference_path, upscale_20m_to_10m=False): with rasterio.open(reference_path) as ref: profile = ref.profile.copy() if upscale_20m_to_10m: # Example if reference is 20m but you want 10m; here we keep native pass return profile def safe_div(a, b, eps=1e-6): return (a - b) / (np.maximum(a + b, eps)) def compute_indices(bands): # bands scaled to reflectance [0..1] red = bands["B04"] nir = bands["B08"] green = bands["B03"] swir = bands["B11"] ndvi = (nir - red) / (np.maximum(nir + red, 1e-6)) # McFeeters NDWI: (Green - NIR) / (Green + NIR) ndwi = (green - nir) / (np.maximum(green + nir, 1e-6)) # Normalized Burn Ratio (vegetation stress): (NIR - SWIR) / (NIR + SWIR) nbr = (nir - swir) / (np.maximum(nir + swir, 1e-6)) return {"NDVI": ndvi, "NDWI": ndwi, "NBR": nbr} def scl_cloud_mask(scl_arr): # Sentinel-2 SCL classes (L2A): # 3 = Cloud shadow, 8/9/10 = Clouds, 11 = Thin cirrus, 1 = Saturated/defective mask_cloud = np.isin(scl_arr, [1, 3, 8, 9, 10, 11]) return ~mask_cloud # True = clear def qa60_cloud_mask(qa_arr): # Bits 10 and 11 are clouds/cirrus in QA60 # If bit set => cloudy bit10 = (qa_arr & (1 << 10)) > 0 bit11 = (qa_arr & (1 << 11)) > 0 mask_cloud = bit10 | bit11 return ~mask_cloud def clip_to_aoi(arr, profile, aoi_geom): out_image, out_transform = rio_mask( rasterio.io.MemoryFile().open(**profile, write=True).write(arr, 1) or rasterio.open(profile), [aoi_geom], crop=True, filled=True, nodata=np.nan ) def clip_profile_and_array(arr, profile, aoi): with rasterio.open("/vsimem/temp.tif", "w", **profile) as tmp: tmp.write(arr, 1) out_img, out_transform = rio_mask(tmp, [aoi], crop=True, filled=True, nodata=np.nan) new_profile = profile.copy() new_profile.update({ "height": out_img.shape[1], "width": out_img.shape[2], "transform": out_transform }) return out_img[0], new_profile def save_geotiff(path, arr, profile, nodata=np.nan): prof = profile.copy() prof.update(dtype=rasterio.float32, count=1, compress="lzw", nodata=nodata) with rasterio.open(path, "w", **prof) as dst: dst.write(arr.astype(np.float32), 1) def quicklook_png(path, arr, vmin=None, vmax=None, cmap="RdYlGn"): plt.figure(figsize=(8, 6)) plt.imshow(arr, cmap=cmap, vmin=vmin, vmax=vmax) plt.colorbar(shrink=0.7) plt.axis("off") plt.tight_layout() plt.savefig(path, dpi=200) plt.close() def vectorize_change(change_mask, profile, min_area_m2=200): results = [] transform = profile["transform"] crs = profile["crs"] shapes_iter = shapes(change_mask.astype(np.uint8), mask=~np.isnan(change_mask), transform=transform) geoms = [] vals = [] for geom, val in shapes_iter: if val != 1: continue poly = shp_shape(geom) if poly.is_valid and not poly.is_empty: geoms.append(poly) vals.append(val) if not geoms: return gpd.GeoDataFrame({"value": []}, geometry=[], crs=crs) gdf = gpd.GeoDataFrame({"value": vals}, geometry=geoms, crs=crs) # Remove tiny patches by area threshold (project to meters if needed) if crs is None or ("EPSG:4326" in str(crs)): gdf = gdf.to_crs(3857) projected = True else: projected = False gdf["area_m2"] = gdf.geometry.area gdf = gdf[gdf["area_m2"] >= min_area_m2].copy() if projected: gdf = gdf.to_crs(crs) gdf.reset_index(drop=True, inplace=True) return gdf def main(cfg_path="config.yaml"): cfg = read_cfg(cfg_path) os.makedirs(cfg["out_dir"], exist_ok=True) aoi = gpd.read_file(cfg["aoi_path"]).to_crs(32648) # UTM zone example; adjust to your AOI aoi_union = aoi.unary_union # Choose a reference band for grid/profile (use high-res 10m B04 from date2) ref_band_path = os.path.join(cfg["date2_dir"], "B04.tif") ref_profile = build_target_profile(ref_band_path) def load_scene(scene_dir, have_scl=True, have_qa=False, scale=10000.0): bands = {} for b in ["B03", "B04", "B08", "B11"]: arr, prof = load_band(os.path.join(scene_dir, f"{b}.tif"), target_profile=ref_profile) bands[b] = arr / scale mask_clear = np.ones_like(bands["B04"], dtype=bool) if have_scl and os.path.exists(os.path.join(scene_dir, "SCL.tif")): scl, _ = load_band(os.path.join(scene_dir, "SCL.tif"), target_profile=ref_profile, resampling=Resampling.nearest) mask_clear = scl_cloud_mask(scl.astype(np.int16)) elif have_qa and os.path.exists(os.path.join(scene_dir, "QA60.tif")): qa, _ = load_band(os.path.join(scene_dir, "QA60.tif"), target_profile=ref_profile, resampling=Resampling.nearest) mask_clear = qa60_cloud_mask(qa.astype(np.uint16)) return bands, prof, mask_clear bands1, prof, clear1 = load_scene(cfg["date1_dir"], have_scl=cfg.get("use_scl", True), have_qa=cfg.get("use_qa60", False), scale=cfg.get("pixel_scale_factor", 10000.0)) bands2, prof, clear2 = load_scene(cfg["date2_dir"], have_scl=cfg.get("use_scl", True), have_qa=cfg.get("use_qa60", False), scale=cfg.get("pixel_scale_factor", 10000.0)) # Compute indices idx1 = compute_indices(bands1) idx2 = compute_indices(bands2) # Clip to AOI # Ensure AOI CRS matches raster CRS aoi_proj = gpd.GeoSeries([aoi_union], crs=aoi.crs).to_crs(prof["crs"]).iloc[0] ndvi1, prof_clip = clip_profile_and_array(idx1["NDVI"], prof, aoi_proj) ndvi2, _ = clip_profile_and_array(idx2["NDVI"], prof, aoi_proj) ndwi2, _ = clip_profile_and_array(idx2["NDWI"], prof, aoi_proj) nbr1, _ = clip_profile_and_array(idx1["NBR"], prof, aoi_proj) nbr2, _ = clip_profile_and_array(idx2["NBR"], prof, aoi_proj) # Cloud mask intersection (clip masks too) clear1_c, _ = clip_profile_and_array(clear1.astype(np.uint8), prof, aoi_proj) clear2_c, _ = clip_profile_and_array(clear2.astype(np.uint8), prof, aoi_proj) clear_mask = (clear1_c.astype(bool)) & (clear2_c.astype(bool)) # Mask indices with clear pixels only ndvi1[~clear_mask] = np.nan ndvi2[~clear_mask] = np.nan ndwi2[~clear_mask] = np.nan nbr1[~clear_mask] = np.nan nbr2[~clear_mask] = np.nan # Water mask (exclude open water): NDWI high means water water_mask = ndwi2 > cfg.get("ndwi_threshold", 0.2) # Change maps delta_ndvi = ndvi2 - ndvi1 delta_nbr = nbr2 - nbr1 change_index = cfg.get("change_index", "NDVI").upper() change_arr = delta_ndvi if change_index == "NDVI" else delta_nbr # Thresholding for loss (significant negative change) vals = change_arr[~np.isnan(change_arr) & ~water_mask] if vals.size == 0: print("No valid pixels after masks; check inputs.") return try: t_otsu = threshold_otsu(vals) except Exception: t_otsu = np.percentile(vals, 10) # fallback # We want negative change; build mask where change << 0 # Center with mean to stabilize mu = np.nanmean(vals) std = np.nanstd(vals) + 1e-6 z = (change_arr - mu) / std loss_mask = (change_arr < t_otsu) & (z < -0.75) # Remove water and NaNs loss_mask = loss_mask & (~water_mask) & (~np.isnan(change_arr)) # Save rasters save_geotiff(os.path.join(cfg["out_dir"], f"ndvi_date1.tif"), ndvi1, prof_clip) save_geotiff(os.path.join(cfg["out_dir"], f"ndvi_date2.tif"), ndvi2, prof_clip) save_geotiff(os.path.join(cfg["out_dir"], f"{change_index.lower()}_delta.tif"), change_arr, prof_clip) save_geotiff(os.path.join(cfg["out_dir"], "loss_mask.tif"), loss_mask.astype(np.uint8), prof_clip, nodata=0) # Quicklooks quicklook_png(os.path.join(cfg["out_dir"], "ndvi_date1.png"), ndvi1, vmin=0, vmax=1) quicklook_png(os.path.join(cfg["out_dir"], "ndvi_date2.png"), ndvi2, vmin=0, vmax=1) quicklook_png(os.path.join(cfg["out_dir"], f"{change_index.lower()}_delta.png"), change_arr, vmin=-0.6, vmax=0.6, cmap="RdBu_r") quicklook_png(os.path.join(cfg["out_dir"], "loss_mask.png"), loss_mask.astype(float), vmin=0, vmax=1, cmap="Reds") # Vectorize change hotspots gdf = vectorize_change(loss_mask, prof_clip, min_area_m2=cfg.get("min_patch_area_m2", 200)) if gdf.empty: print("No change polygons detected after filtering.") else: # Add simple stats per polygon # Re-project to meters to compute areas if necessary if gdf.crs is None or ("EPSG:4326" in str(gdf.crs)): gdf = gdf.to_crs(3857) gdf["area_m2"] = gdf.geometry.area gdf = gdf.to_crs(prof_clip["crs"]) else: gdf["area_m2"] = gdf.geometry.area gdf["id"] = np.arange(1, len(gdf) + 1) gdf.to_file(os.path.join(cfg["out_dir"], "mangrove_loss_hotspots.geojson"), driver="GeoJSON") print(f"Saved {len(gdf)} hotspot polygons.") print("Done. Check the 'out' folder for outputs.") if __name__ == "__main__": main()