import math
import os
import typing
from dataclasses import dataclass
import numba as nb
import numpy as np
import scipy
import sklearn
from scipy.optimize import minimize
from scipy.stats import weibull_min
from ecoscope.base import Trajectory
from ecoscope.io import raster
@nb.cfunc("double(intc, CPointer(double))")
def __etd__(_, a):
s = a[0]
k = a[1]
el = a[2]
m = a[3]
return np.exp(-((s / el) ** k)) * s ** (k - 2) / np.sqrt(s**2 - m**2)
_etd = scipy.LowLevelCallable(__etd__.ctypes)
[docs]
class WeibullPDF:
[docs]
@staticmethod
def fit(data, floc=0):
# returns estimates of shape and scale parameters from data; keeping the location fixed
shape, _, scale = weibull_min.fit(data, floc=floc)
return shape, scale
[docs]
@staticmethod
def pdf(data, shape, location=0, scale=1):
# probability density function.
return weibull_min.pdf(x=data, c=shape, loc=location, scale=scale)
[docs]
@staticmethod
def cdf(data, shape, location=0, scale=1):
# cumulative distribution function.
return weibull_min.cdf(x=data, c=shape, loc=location, scale=scale)
[docs]
@staticmethod
def nelder_mead(func, x0, args=(), **kwargs):
# minimization of scalar function of one or more variables using the Nelder-Mead algorithm.
return minimize(fun=func, x0=x0, args=args, method="Nelder-Mead", **kwargs)
[docs]
@staticmethod
def expected_func(speed, shape, scale, time, distance):
# time-density expectation function for two-parameter weibull distribution.
_funcs = [
4 * shape / (math.pi * scale * speed),
math.pow((speed / scale), shape - 1),
math.exp(-1 * math.pow(speed / scale, shape))
/ math.sqrt(np.square(speed) * np.square(time) - np.square(distance)),
]
return math.prod(_funcs)
[docs]
@dataclass
class Weibull2Parameter(WeibullPDF):
shape: float = 1.0
scale: float = 1.0
[docs]
@dataclass
class Weibull3Parameter(WeibullPDF):
# Weibull parameterization using scale=a(t^b)(c^t)
shape: float = 1.0
a: float = 1.0
b: float = 1.0
c: float = 1.0
[docs]
def calculate_etd_range(
trajectory_gdf: Trajectory,
output_path: typing.Union[str, bytes, os.PathLike],
max_speed_kmhr: float = 0.0,
max_speed_percentage: float = 0.9999,
raster_profile: raster.RasterProfile = None,
expansion_factor: float = 1.3,
weibull_pdf: typing.Union[Weibull2Parameter, Weibull3Parameter] = Weibull2Parameter(),
) -> None:
"""
The ETDRange class provides a trajectory-based, nonparametric approach to estimate the utilization distribution (UD)
of an animal, using model parameters derived directly from the movement behaviour of the species.
The model builds on the theory of "time-geography" whereby elliptical constrain- ing regions are established
between temporally adjacent recorded locations.
Parameters
----------
trajectory_gdf : geopandas.GeoDataFrame
output_path : str or PathLike
max_speed_kmhr : float
max_speed_percentage : 0.999
raster_profile : raster.RasterProfile
expansion_factor : float
weibull_pdf : Weibull2Parameter or Weibull3Parameter
Returns
-------
output_path : str
"""
# if two-parameter weibull has default values; run an optimization routine to auto-determine parameters
if isinstance(weibull_pdf, Weibull2Parameter) and all([weibull_pdf.shape == 1.0, weibull_pdf.scale == 1.0]):
speed_kmhr = trajectory_gdf.speed_kmhr
shape, scale = weibull_pdf.fit(speed_kmhr)
# update the shape/scale parameters
weibull_pdf.shape = shape
weibull_pdf.scale = scale
# reproject trajseg to desired crs
trajectory_gdf = trajectory_gdf.to_crs(raster_profile.crs)
# determine envelope of trajectory
x_min, y_min, x_max, y_max = trajectory_gdf.geometry.total_bounds
# apply expansion factor on the trajectory total bound.
if expansion_factor > 1.0:
dx = (x_max - x_min) * (expansion_factor - 1.0) / 2.0
dy = (y_max - y_min) * (expansion_factor - 1.0) / 2.0
x_min -= dx
x_max += dx
y_min -= dy
y_max += dy
# update the raster extent for the raster profile
raster_profile.raster_extent = raster.RasterExtent(x_min=x_min, x_max=x_max, y_min=y_min, y_max=y_max)
# determine the output raster size
num_rows, num_columns = (
raster_profile.rows,
raster_profile.columns,
)
# maximum trajectory segment speed value
max_trajseg_speed = trajectory_gdf.speed_kmhr.max()
# determine max-speed value
if max_speed_kmhr > 0.0:
maxspeed = max_speed_kmhr
else:
# Use a value calculated from the CDF
maxspeed = weibull_pdf.scale * math.pow(
-1 * math.log(1.0 - max_speed_percentage),
1.0 / weibull_pdf.shape,
)
if maxspeed <= max_trajseg_speed:
raise ValueError(
f"ETD maximum speed value: {maxspeed} should be greater than "
f"trajectory maximum speed value {max_trajseg_speed}"
)
# Define the affine transform to get grid pixel centroids as geographic coordinates.
grid_centroids = np.array(raster_profile.transform.to_gdal()).reshape(2, 3)
grid_centroids[0, 0] = x_min + raster_profile.pixel_size * 0.5
grid_centroids[1, 0] = y_max - raster_profile.pixel_size * 0.5
centroids_coords = np.dot(grid_centroids, np.mgrid[1:2, :num_columns, :num_rows].T.reshape(-1, 3, 1))
tr = sklearn.neighbors.KDTree(centroids_coords.squeeze().T)
del centroids_coords
points = np.stack(trajectory_gdf.geometry.apply(lambda x: np.array(x.coords.xy)))
time = trajectory_gdf.timespan_seconds.values / 3600
r = maxspeed * time * 1000
start = tr.query_radius(points[:, :, 0], r=r, return_distance=True)
end = tr.query_radius(points[:, :, 1], r=r, return_distance=True)
del tr, points, r
shape = weibull_pdf.shape
scale = weibull_pdf.scale
x = np.arange(0.001, maxspeed, 0.001)
y = (4 * shape * scale ** (-shape) / np.pi) * np.array(
[scipy.integrate.quad(_etd, m, maxspeed, args=(shape, scale, m))[0] for m in x]
)
raster_ndarray = np.zeros(num_rows * num_columns, dtype=np.float64)
for k in range(len(start[0])):
a, b, c = np.intersect1d(start[0][k], end[0][k], return_indices=True)
speeds = (start[1][k][b] + end[1][k][c]) * 0.001 / time[k]
i = speeds < maxspeed
vals = y[np.digitize(speeds[i], x[:-1])] / time[k]
raster_ndarray[a[i]] += vals / vals.sum() / time[k]
# Normalize the grid values
raster_ndarray = raster_ndarray / raster_ndarray.sum()
# Set the null data values
raster_ndarray[raster_ndarray == 0] = raster_profile.nodata_value
# write raster_ndarray to GeoTIFF file.
raster.RasterPy.write(
ndarray=raster_ndarray.reshape(num_rows, num_columns),
fp=output_path,
**raster_profile,
)