import itertools
import warnings
from copy import deepcopy
import geopandas as gpd # type: ignore[import-untyped]
import numpy as np
import pandas as pd
import shapely
from pyproj import Geod
from shapely import MultiPolygon, Polygon
from ecoscope import Relocations
from ecoscope.base import EcoDataFrame
from ecoscope.base._dataclasses import (
ProximityProfile,
TrajSegFilter,
)
from ecoscope.base.straightrack import StraightTrackProperties
[docs]
def get_displacement(gdf: gpd.GeoDataFrame):
"""
Get displacement in meters between first and final fixes.
"""
if not gdf["segment_start"].is_monotonic_increasing:
gdf = gdf.sort_values("segment_start")
start = gdf.geometry.iloc[0].coords[0]
end = gdf.geometry.iloc[-1].coords[1]
return Geod(ellps="WGS84").inv(start[0], start[1], end[0], end[1])[2]
[docs]
def get_tortuosity(gdf: gpd.GeoDataFrame):
"""
Get tortuosity for dataframe defined as distance traveled divided by displacement between first and final
points.
"""
return gdf["dist_meters"].sum() / get_displacement(gdf)
[docs]
class Trajectory(EcoDataFrame):
"""
A trajectory represents a time-ordered collection of segments.
Currently only straight track segments exist.
It is based on an underlying relocs object that is the point representation
"""
[docs]
@classmethod
def from_relocations(cls, relocs: Relocations, copy: bool = True):
"""
Create Trajectory class from Relocation dataframe.
Parameters
----------
relocs: Relocations
A `Relocations` instance.
copy : bool, optional
Whether or not to copy the `gdf`. Defaults to `True`.
Returns
-------
Trajectory
"""
assert isinstance(relocs, Relocations)
assert {"groupby_col", "fixtime", "geometry"}.issubset(relocs.gdf)
if copy:
relocs = Relocations(relocs.gdf.copy())
original_crs = relocs.gdf.crs
relocs.gdf.to_crs(4326, inplace=True)
relocs.gdf = (
relocs.gdf.groupby("groupby_col")[relocs.gdf.columns].apply(cls._create_multitraj).droplevel(level=0)
)
relocs.gdf.to_crs(original_crs, inplace=True)
relocs.gdf.sort_values("segment_start", inplace=True)
return cls(gdf=relocs.gdf)
[docs]
@staticmethod
def _create_multitraj(gdf: gpd.GeoDataFrame):
if len(gdf) == 1:
warnings.warn(
f"Subject id {gdf.get('groupby_col')} has only one relocation "
"and will be excluded from trajectory creation"
)
return None
gdf["_geometry"] = gdf["geometry"].shift(-1)
gdf["_fixtime"] = gdf["fixtime"].shift(-1)
return Trajectory._create_trajsegments(gdf[:-1])
[docs]
@staticmethod
def _create_trajsegments(gdf: gpd.GeoDataFrame):
track_properties = StraightTrackProperties(gdf)
coords = np.column_stack(
(
np.column_stack(track_properties.start_fixes),
np.column_stack(track_properties.end_fixes),
)
).reshape(gdf.shape[0], 2, 2)
df = gpd.GeoDataFrame(
{
"groupby_col": gdf.groupby_col,
"segment_start": gdf.fixtime,
"segment_end": gdf._fixtime,
"timespan_seconds": track_properties.timespan_seconds,
"dist_meters": track_properties.dist_meters,
"speed_kmhr": track_properties.speed_kmhr,
"heading": track_properties.heading,
"geometry": shapely.linestrings(coords),
"junk_status": gdf.junk_status,
"nsd": track_properties.nsd,
},
crs=4326,
index=gdf.index,
)
gdf.drop(["fixtime", "_fixtime", "_geometry"], axis=1, inplace=True)
extra_cols = gdf.columns.difference(df.columns)
gdf = gdf[extra_cols]
extra_cols = extra_cols[~extra_cols.str.startswith("extra_")]
gdf.rename(columns=dict(zip(extra_cols, "extra__" + extra_cols)), inplace=True)
return df.join(gdf, how="left")
[docs]
def apply_traj_filter(self, traj_seg_filter: TrajSegFilter, inplace: bool = False):
if not self.gdf["segment_start"].is_monotonic_increasing:
self.gdf.sort_values("segment_start", inplace=True)
assert self.gdf["segment_start"].is_monotonic_increasing
if inplace:
traj = self
else:
traj = deepcopy(self)
assert type(traj_seg_filter) is TrajSegFilter
traj.gdf.loc[
(traj.gdf["dist_meters"] < traj_seg_filter.min_length_meters)
| (traj.gdf["dist_meters"] > traj_seg_filter.max_length_meters)
| (traj.gdf["timespan_seconds"] < traj_seg_filter.min_time_secs)
| (traj.gdf["timespan_seconds"] > traj_seg_filter.max_time_secs)
| (traj.gdf["speed_kmhr"] < traj_seg_filter.min_speed_kmhr)
| (traj.gdf["speed_kmhr"] > traj_seg_filter.max_speed_kmhr),
"junk_status",
] = True
if not inplace:
return traj
[docs]
def get_turn_angle(self):
if not self.gdf["segment_start"].is_monotonic_increasing:
self.gdf.sort_values("segment_start", inplace=True)
assert self.gdf["segment_start"].is_monotonic_increasing
def turn_angle(traj):
return ((traj["heading"].diff() + 540) % 360 - 180)[traj["segment_end"].shift(1) == traj["segment_start"]]
uniq = self.gdf.groupby_col.nunique()
angles = (
self.gdf.groupby("groupby_col")[self.gdf.columns].apply(turn_angle, include_groups=False).droplevel(0)
if uniq > 1
else turn_angle(self.gdf)
)
return angles.rename("turn_angle").reindex(self.gdf.index)
[docs]
def upsample(self, freq: str | pd.Timedelta | pd.DateOffset):
"""
Interpolate to create upsampled Relocations
Parameters
----------
freq : str, pd.Timedelta or pd.DateOffset
Sampling frequency for new Relocations object
Returns
-------
relocs : ecoscope.Relocations
"""
freq = pd.tseries.frequencies.to_offset(freq) # type: ignore[arg-type, assignment]
if not self.gdf["segment_start"].is_monotonic_increasing:
self.gdf.sort_values("segment_start", inplace=True)
def f(gdf):
assert gdf.crs
times = pd.date_range(gdf["segment_start"].iat[0], gdf["segment_end"].iat[-1], freq=freq)
start_i = gdf["segment_start"].searchsorted(times, side="right") - 1
end_i = gdf["segment_end"].searchsorted(times, side="left")
valid_i = (start_i == end_i) | (times == gdf["segment_start"].iloc[start_i])
gdf = gdf.iloc[start_i[valid_i]].reset_index(drop=True)
times = times[valid_i]
return gpd.GeoDataFrame(
{"fixtime": times},
geometry=shapely.line_interpolate_point(
gdf["geometry"].values,
(times - gdf["segment_start"]) / (gdf["segment_end"] - gdf["segment_start"]),
normalized=True,
),
crs=gdf.crs,
)
return Relocations.from_gdf(
self.gdf.groupby("groupby_col")[self.gdf.columns].apply(f, include_groups=False).reset_index(level=0)
)
[docs]
def to_relocations(self):
"""
Converts a Trajectory object to a Relocations object.
Returns
-------
ecoscope.Relocations
"""
def f(gdf):
assert gdf.crs
points = np.concatenate([shapely.get_point(gdf.geometry, 0), shapely.get_point(gdf.geometry, 1)])
times = np.concatenate([gdf["segment_start"], gdf["segment_end"]])
return (
gpd.GeoDataFrame(
{"fixtime": times},
geometry=points,
crs=gdf.crs,
)
.drop_duplicates(subset=["fixtime"])
.sort_values("fixtime")
)
return Relocations.from_gdf(
self.gdf.groupby("groupby_col")[self.gdf.columns].apply(f, include_groups=False).reset_index(drop=True)
)
[docs]
def downsample(
self,
freq: str | pd.Timedelta | pd.DateOffset,
tolerance: str | pd.Timedelta | pd.DateOffset = "0S",
interpolation: bool = False,
):
"""
Function to downsample relocations.
Parameters
----------
freq: str, pd.Timedelta or pd.DateOffset
Downsampling frequency for new Relocations object
tolerance: str, pd.Timedelta or pd.DateOffset
Tolerance on the downsampling frequency
interpolation: bool, optional
If true, interpolates locations on the whole trajectory
Returns
-------
ecoscope.Relocations
"""
if interpolation:
return self.upsample(freq)
else:
freq = pd.tseries.frequencies.to_offset(freq) # type: ignore[arg-type, assignment]
tolerance = pd.tseries.frequencies.to_offset(tolerance) # type: ignore[arg-type, assignment]
def f(relocs_ind):
relocs_ind.crs = self.gdf.crs
fixtime = relocs_ind["fixtime"]
k = 1
i = 0
n = len(relocs_ind)
out = np.full(n, -1)
out[i] = k
while i < (n - 1):
t_min = fixtime.iloc[i] + freq - tolerance
t_max = fixtime.iloc[i] + freq + tolerance
j = i + 1
while (j < (n - 1)) and (fixtime.iloc[j] < t_min):
j += 1
i = j
if j == (n - 1):
break
elif (fixtime.iloc[j] >= t_min) and (fixtime.iloc[j] <= t_max):
out[j] = k
else:
k += 1
out[j] = k
relocs_ind["extra__burst"] = np.array(out, dtype=np.int64)
relocs_ind.drop(relocs_ind.loc[relocs_ind["extra__burst"] == -1].index, inplace=True)
return relocs_ind
relocs = self.to_relocations()
relocs.gdf = relocs.gdf.groupby("groupby_col")[relocs.gdf.columns].apply(f).reset_index(drop=True)
return relocs
[docs]
def calculate_proximity(
self,
proximity_profile: ProximityProfile,
) -> pd.DataFrame:
"""
A function to analyze the trajectory of a subject in relation to a set of spatial features and regions to
determine where/when the subject was proximal to the spatial feature.
Parameters
----------
proximity_profile: ProximityProfile
proximity setting for performing calculation
Returns
-------
pd.DataFrame
"""
proximity_events = []
def analysis(gdf):
for sf in proximity_profile.spatial_features:
proximity_dist = gdf.geometry.distance(sf.geometry)
start_fix = gpd.GeoSeries([shapely.Point(g.coords[0]) for g in gdf.geometry])
pr = gdf[["groupby_col", "speed_kmhr", "heading"]]
pr["proximity_distance"] = proximity_dist
pr["proximal_fix"] = start_fix # TODO: figure out the estimated fix interpolated along the seg
pr["estimated_time"] = gdf.segment_start
pr["geometry"] = gdf.geometry
pr["spatialfeature_id"] = list(itertools.repeat(sf.unique_id, pr.shape[0]))
pr["spatialfeature_name"] = list(itertools.repeat(sf.name, pr.shape[0]))
proximity_events.append(pr)
self.gdf.groupby("groupby_col")[self.gdf.columns].apply(analysis, include_groups=False)
return pd.concat(proximity_events).reset_index(drop=True)
[docs]
def apply_spatial_classification(
self, spatial_regions: gpd.GeoDataFrame, output_column_name: str = "spatial_index"
) -> gpd.GeoDataFrame:
if spatial_regions.crs.is_projected and spatial_regions.crs.axis_info[0].unit_name != "metre":
raise ValueError("Projected spatial_regions crs must be in metres")
if any(
[
not isinstance(feature, Polygon) and not isinstance(feature, MultiPolygon)
for feature in spatial_regions.geometry
]
):
raise ValueError("spatial_regions must only contain polygon or multipolygon geometry")
segments_to_overlay = self.gdf.to_crs(spatial_regions.crs)
spatial_regions[output_column_name] = spatial_regions.index
overlay = spatial_regions.overlay(segments_to_overlay, how="intersection", keep_geom_type=False)
if overlay.empty:
return overlay
# Drop anything that isn't a line
overlay = overlay[overlay.geometry.type == "LineString"]
# use the length of the linestring if our crs is projected
# otherwise get great circle distance from Geod
if overlay.crs.is_projected:
overlay["dist_meters"] = overlay.length
else:
overlay["dist_meters"] = overlay.geometry.apply(
lambda x: Geod(ellps="WGS84").inv(*x.coords[0], *x.coords[1])[2]
)
# fragment_distance is in metres so convert speed to 'meters per hour'
# then multiply by 3600 to get time in seconds
overlay["timespan_seconds"] = (overlay["dist_meters"] / (overlay["speed_kmhr"] * 1000)) * 3600
return overlay