import logging
import ee
import numpy as np
import pandas
import shapely
import sklearn.mixture
from scipy.stats import norm
from sklearn.preprocessing import LabelEncoder
logger = logging.getLogger(__name__)
[docs]
def _min_max_scaler(x):
x = np.array(x, dtype=np.float64)
x_min = np.nanmin(x)
x_max = np.nanmax(x)
x_std = (x - x_min) / (x_max - x_min) # in the range 0..1
return x_std
[docs]
def std_ndvi_vals(aoi=None, img_coll=None, band=None, img_scale=1, start=None, end=None):
coll = (
ee.ImageCollection(img_coll)
.select(band)
.filterDate(start, end)
.map(lambda x: x.multiply(ee.Image(img_scale)).set("system:time_start", x.get("system:time_start")))
)
if aoi:
geo = ee.Feature(shapely.geometry.mapping(aoi)).geometry()
else:
geo = None
ndvi_vals = coll.toBands().reduceRegion("mean", geo, bestEffort=True).values().getInfo()
df = pandas.DataFrame(
{
"img_date": pandas.to_datetime(coll.aggregate_array("system:time_start").getInfo(), unit="ms", utc=True),
"NDVI": ndvi_vals,
}
).dropna(axis=0)
df["NDVI"] = _min_max_scaler(df["NDVI"])
return df
[docs]
def val_cuts(vals, num_seasons=2):
distr = sklearn.mixture.GaussianMixture(n_components=num_seasons, max_iter=500)
vals = vals["NDVI"].to_numpy().reshape(-1, 1)
distr.fit(vals)
mu_vars = np.array(
sorted(
zip(distr.means_.flatten(), distr.covariances_.flatten()),
key=lambda x: x[0],
)
)
cuts = []
x = np.linspace(0, 1.0, 1000)
for i in range(1, len(mu_vars)):
cuts.append(
np.max(
x[
tuple(
[
norm.sf(
x,
loc=mu_vars[tuple([i - 1, 0])],
scale=np.sqrt(mu_vars[tuple([i - 1, 1])]),
)
> norm.cdf(
x,
loc=mu_vars[tuple([i, 0])],
scale=np.sqrt(mu_vars[tuple([i, 1])]),
)
]
)
]
)
)
cuts.append(float("inf"))
cuts.append(float("-inf"))
return sorted(cuts)
[docs]
def seasonal_windows(ndvi_vals, cuts, season_labels):
enc = LabelEncoder()
ndvi_vals["season"] = pandas.cut(ndvi_vals["NDVI"], bins=cuts, labels=season_labels)
ndvi_vals["season_code"] = enc.fit_transform(ndvi_vals["season"])
ndvi_vals["unique_season"] = (ndvi_vals["season_code"].diff(1) != 0).astype("int").cumsum()
ndvi_vals["end"] = ndvi_vals["img_date"].shift(-1)
# Note: There is a slight shift that needs to be done with the date-ranges returned by GEE. What happens is that
# the image daterange use 12:00 a.m on the given day rather than midnight so it introduces day long gaps in the
# time windows. Therefore need to add 1 day to each of the end dates to make them align exactly with the next
# successive start date
grpd = ndvi_vals.groupby("unique_season")
return pandas.DataFrame(
{
"start": pandas.Series([grp["img_date"].iloc[0] for _, grp in grpd]),
"end": pandas.Series([grp["img_date"].iloc[-1] for _, grp in grpd]).apply(
lambda x: x + pandas.Timedelta(days=1)
),
"season": pandas.Series(grp["season"].iloc[0] for name, grp in grpd),
"unique_season": pandas.Series([name for name, _ in grpd]),
}
).set_index("unique_season")
[docs]
def add_seasonal_index(
df, index_name, start_date, end_date, aoi_geom_filter=None, seasons=2, season_labels=["dry", "wet"]
):
aoi_ = None
try:
aoi_ = aoi_geom_filter.dissolve().iloc[0]["geometry"]
except:
aoi_ = aoi_geom_filter
if len(season_labels) != seasons:
raise Exception(
f"Parameter value 'seasons' ({seasons}) must match the number of 'season_labels' elements ({season_labels})"
)
# extract the standardized NDVI ndvi_vals within the AOI
ndvi_vals = std_ndvi_vals(aoi_, start=since_filter.isoformat(), end=until_filter.isoformat())
# calculate the seasonal transition point
cuts = val_cuts(ndvi_vals, seasons)
# determine the seasonal time windows
windows = seasonal_windows(ndvi_vals, cuts, season_labels)
# Categorize the fixtime values according to the season
bins = pandas.IntervalIndex(data=windows.apply(lambda x: pandas.Interval(x["start"], x["end"]), axis=1))
labels = windows.season
df[index_name] = pandas.cut(df[time_col], bins=bins).map(dict(zip(bins, labels)))
# set the index
return df.set_index(index_name, append=True)