Source code for ecoscope.base.base

import warnings
from functools import cached_property

import geopandas as gpd
import numpy as np
import pandas as pd
import shapely
from pyproj import Geod

from ecoscope.base._dataclasses import (
    RelocsCoordinateFilter,
    RelocsDateRangeFilter,
    RelocsDistFilter,
    RelocsSpeedFilter,
    TrajSegFilter,
)


[docs] class EcoDataFrame(gpd.GeoDataFrame): """ `EcoDataFrame` extends `geopandas.GeoDataFrame` to provide customizations and allow for simpler extension. """ @property def _constructor(self): return type(self) def __init__(self, data=None, *args, **kwargs): if kwargs.get("geometry") is None: # Load geometry from data if not specified in kwargs if hasattr(data, "geometry"): kwargs["geometry"] = data.geometry.name if kwargs.get("crs") is None: # Load crs from data if not specified in kwargs if hasattr(data, "crs"): kwargs["crs"] = data.crs super().__init__(data, *args, **kwargs)
[docs] def __getitem__(self, key): result = super().__getitem__(key) if isinstance(key, (list, slice, np.ndarray, pd.Series)): result.__class__ = self._constructor return result
[docs] @classmethod def from_file(cls, filename, **kwargs): result = gpd.GeoDataFrame.from_file(filename, **kwargs) result.__class__ = cls return result
[docs] @classmethod def from_features(cls, features, **kwargs): result = gpd.GeoDataFrame.from_features(features, **kwargs) result.__class__ = cls return result
[docs] def __finalize__(self, *args, **kwargs): result = super().__finalize__(*args, **kwargs) result.__class__ = self._constructor return result
[docs] def astype(self, *args, **kwargs): result = super().astype(*args, **kwargs) result.__class__ = self._constructor return result
[docs] def merge(self, *args, **kwargs): result = super().merge(*args, **kwargs) result.__class__ = self._constructor return result
[docs] def dissolve(self, *args, **kwargs): result = super().dissolve(*args, **kwargs) result.__class__ = self._constructor return result
[docs] def explode(self, *args, **kwargs): result = super().explode(*args, **kwargs) result.__class__ = self._constructor return result
[docs] def plot(self, *args, **kwargs): if self._geometry_column_name in self: return gpd.GeoDataFrame.plot(self, *args, **kwargs) else: return pd.DataFrame(self).plot(*args, **kwargs)
[docs] def reset_filter(self, inplace=False): if inplace: frame = self else: frame = self.copy() frame["junk_status"] = False if not inplace: return frame
[docs] def remove_filtered(self, inplace=False): if inplace: frame = self else: frame = self.copy() if not frame["junk_status"].dtype == bool: warnings.warn( f"junk_status column is of type {frame['junk_status'].dtype}, expected `bool`. " "Attempting to automatically convert." ) frame["junk_status"] = frame["junk_status"].astype(bool) frame.query("~junk_status", inplace=True) if not inplace: return frame
[docs] class Relocations(EcoDataFrame): """ Relocation is a model for a set of fixes from a given subject. Because fixes are temporal, they can be ordered asc or desc. The additional_data dict can contain info specific to the subject and relocations: name, type, region, sex etc. These values are applicable to all fixes in the relocations array. If they vary, then they should be put into each fix's additional_data dict. """
[docs] @classmethod def from_gdf(cls, gdf, groupby_col=None, time_col="fixtime", uuid_col=None, **kwargs): """ Parameters ---------- gdf : GeoDataFrame Observations data groupby_col : str, optional Name of `gdf` column of identities to treat as separate individuals. Usually `subject_id`. Default is treating the gdf as being of a single individual. time_col : str, optional Name of `gdf` column containing relocation times. Default is 'fixtime'. uuid_col : str, optional Name of `gdf` column of row identities. Used as index. Default is existing index. """ assert {"geometry", time_col}.issubset(gdf) if kwargs.get("copy") is not False: gdf = gdf.copy() if groupby_col is None: if "groupby_col" not in gdf: gdf["groupby_col"] = 0 else: gdf["groupby_col"] = gdf.loc[:, groupby_col] if time_col != "fixtime": gdf["fixtime"] = gdf.loc[:, time_col] if not pd.api.types.is_datetime64_any_dtype(gdf["fixtime"]): warnings.warn( f"{time_col} is not of type datetime64. Attempting to automatically infer format and timezone. " "Results may be incorrect." ) gdf["fixtime"] = pd.to_datetime(gdf["fixtime"]) if gdf["fixtime"].dt.tz is None: warnings.warn(f"{time_col} is not timezone aware. Assuming datetime are in UTC.") gdf["fixtime"] = gdf["fixtime"].dt.tz_localize(tz="UTC") if gdf.crs is None: warnings.warn("CRS was not set. Assuming geometries are in WGS84.") gdf.set_crs(4326, inplace=True) if uuid_col is not None: gdf.set_index(uuid_col, drop=False, inplace=True) gdf["junk_status"] = False default_cols = ["groupby_col", "fixtime", "junk_status", "geometry"] extra_cols = gdf.columns.difference(default_cols) extra_cols = extra_cols[~extra_cols.str.startswith("extra__")] assert gdf.columns.intersection("extra__" + extra_cols).empty, "Column names overlap with existing `extra`" gdf.rename(columns=dict(zip(extra_cols, "extra__" + extra_cols)), inplace=True) return cls(gdf, **kwargs)
[docs] @staticmethod def _apply_speedfilter(df, fix_filter): with warnings.catch_warnings(): """ Note : This warning can be removed once the version of Geopandas is updated on Colab to the one that fixes this bug """ warnings.filterwarnings("ignore", message="CRS not set for some of the concatenation inputs") gdf = df.assign( _fixtime=df["fixtime"].shift(-1), _geometry=df["geometry"].shift(-1), _junk_status=df["junk_status"].shift(-1), )[:-1] straight_track = Trajectory._straighttrack_properties(gdf) gdf["speed_kmhr"] = straight_track.speed_kmhr gdf.loc[ (~gdf["junk_status"]) & (~gdf["_junk_status"]) & (gdf["speed_kmhr"] > fix_filter.max_speed_kmhr), "junk_status", ] = True gdf.drop( ["_fixtime", "_geometry", "_junk_status", "speed_kmhr"], axis=1, inplace=True, ) return gdf
[docs] @staticmethod def _apply_distfilter(df, fix_filter): with warnings.catch_warnings(): """ Note : This warning can be removed once the version of Geopandas is updated on Colab to the one that fixes this bug """ warnings.filterwarnings("ignore", message="CRS not set for some of the concatenation inputs") gdf = df.assign( _junk_status=df["junk_status"].shift(-1), _geometry=df["geometry"].shift(-1), )[:-1] _, _, distance_m = Geod(ellps="WGS84").inv( gdf["geometry"].x, gdf["geometry"].y, gdf["_geometry"].x, gdf["_geometry"].y ) gdf["distance_km"] = distance_m / 1000 gdf.loc[ (~gdf["junk_status"]) & (~gdf["_junk_status"]) & (gdf["distance_km"] < fix_filter.min_dist_km) | (gdf["distance_km"] > fix_filter.max_dist_km), "junk_status", ] = True gdf.drop(["_geometry", "_junk_status", "distance_km"], axis=1, inplace=True) return gdf
[docs] def apply_reloc_filter(self, fix_filter=None, inplace=False): """Apply a given filter by marking the fix junk_status based on the conditions of a filter""" if not self["fixtime"].is_monotonic_increasing: self.sort_values("fixtime", inplace=True) assert self["fixtime"].is_monotonic_increasing if inplace: frame = self else: frame = self.copy() # Identify junk fixes based on location coordinate x,y ranges or that match specific coordinates if isinstance(fix_filter, RelocsCoordinateFilter): frame.loc[ (frame["geometry"].x < fix_filter.min_x) | (frame["geometry"].x > fix_filter.max_x) | (frame["geometry"].y < fix_filter.min_y) | (frame["geometry"].y > fix_filter.max_y) | (frame["geometry"].isin(fix_filter.filter_point_coords)), "junk_status", ] = True # Mark fixes outside this date range as junk elif isinstance(fix_filter, RelocsDateRangeFilter): if fix_filter.start is not None: frame.loc[frame["fixtime"] < fix_filter.start, "junk_status"] = True if fix_filter.end is not None: frame.loc[frame["fixtime"] > fix_filter.end, "junk_status"] = True else: crs = frame.crs frame.to_crs(4326) if isinstance(fix_filter, RelocsSpeedFilter): frame._update_inplace( frame.groupby("groupby_col")[frame.columns] .apply(self._apply_speedfilter, fix_filter=fix_filter) .droplevel(["groupby_col"]) ) elif isinstance(fix_filter, RelocsDistFilter): frame._update_inplace( frame.groupby("groupby_col")[frame.columns] .apply(self._apply_distfilter, fix_filter=fix_filter) .droplevel(["groupby_col"]) ) frame.to_crs(crs, inplace=True) if not inplace: return frame
@cached_property def distance_from_centroid(self): # calculate the distance between the centroid and the fix gs = self.geometry.to_crs(crs=self.estimate_utm_crs()) return gs.distance(gs.unary_union.centroid) @cached_property def cluster_radius(self): """ The cluster radius is the largest distance between a point in the relocationss and the centroid of the relocationss """ distance = self.distance_from_centroid return distance.max() @cached_property def cluster_std_dev(self): """ The cluster standard deviation is the standard deviation of the radii from the centroid to each point making up the cluster """ distance = self.distance_from_centroid return np.std(distance)
[docs] def threshold_point_count(self, threshold_dist): """Counts the number of points in the cluster that are within a threshold distance of the geographic centre""" distance = self.distance_from_centroid return distance[distance <= threshold_dist].size
[docs] def apply_threshold_filter(self, threshold_dist_meters=float("Inf")): # Apply filter to the underlying geodataframe. distance = self.distance_from_centroid _filter = distance > threshold_dist_meters self.relocations.loc[_filter, "junk_status"] = True
[docs] class Trajectory(EcoDataFrame): """ A trajectory represents a time-ordered collection of segments. Currently only straight track segments exist. It is based on an underlying relocs object that is the point representation """
[docs] @classmethod def from_relocations(cls, gdf, *args, **kwargs): """ Create Trajectory class from Relocation dataframe. Parameters ---------- gdf: Geodataframe Relocation geodataframe with relevant columns args kwargs Returns ------- Trajectory """ assert isinstance(gdf, Relocations) assert {"groupby_col", "fixtime", "geometry"}.issubset(gdf) if kwargs.get("copy"): gdf = gdf.copy() gdf = EcoDataFrame(gdf) crs = gdf.crs gdf.to_crs(4326, inplace=True) gdf = gdf.groupby("groupby_col")[gdf.columns].apply(cls._create_multitraj).droplevel(level=0) gdf.to_crs(crs, inplace=True) gdf.sort_values("segment_start", inplace=True) return cls(gdf, *args, **kwargs)
[docs] def get_displacement(self): """ Get displacement in meters between first and final fixes. """ if not self["segment_start"].is_monotonic_increasing: self = self.sort_values("segment_start") start = self.geometry.iloc[0].coords[0] end = self.geometry.iloc[-1].coords[1] return Geod(ellps="WGS84").inv(start[0], start[1], end[0], end[1])[2]
[docs] def get_tortuosity(self): """ Get tortuosity for dataframe defined as distance traveled divided by displacement between first and final points. """ return self["dist_meters"].sum() / self.get_displacement()
[docs] @staticmethod def _create_multitraj(df): if len(df) == 1: warnings.warn( f"Subject id {df.get('groupby_col')} has only one relocation " "and will be excluded from trajectory creation" ) return None with warnings.catch_warnings(): """ Note : This warning can be removed once the version of Geopandas is updated on Colab to the one that fixes this bug """ warnings.filterwarnings("ignore", message="CRS not set for some of the concatenation inputs") df["_geometry"] = df["geometry"].shift(-1) df["_fixtime"] = df["fixtime"].shift(-1) return Trajectory._create_trajsegments(df[:-1])
[docs] @staticmethod def _create_trajsegments(gdf): track_properties = Trajectory._straighttrack_properties(gdf) coords = np.column_stack( ( np.column_stack(track_properties.start_fixes), np.column_stack(track_properties.end_fixes), ) ).reshape(gdf.shape[0], 2, 2) df = gpd.GeoDataFrame( { "groupby_col": gdf.groupby_col, "segment_start": gdf.fixtime, "segment_end": gdf._fixtime, "timespan_seconds": track_properties.timespan_seconds, "dist_meters": track_properties.dist_meters, "speed_kmhr": track_properties.speed_kmhr, "heading": track_properties.heading, "geometry": shapely.linestrings(coords), "junk_status": gdf.junk_status, "nsd": track_properties.nsd, }, crs=4326, index=gdf.index, ) gdf.drop(["fixtime", "_fixtime", "_geometry"], axis=1, inplace=True) extra_cols = gdf.columns.difference(df.columns) gdf = gdf[extra_cols] extra_cols = extra_cols[~extra_cols.str.startswith("extra_")] gdf.rename(columns=dict(zip(extra_cols, "extra__" + extra_cols)), inplace=True) return df.join(gdf, how="left")
[docs] def apply_traj_filter(self, traj_seg_filter, inplace=False): if not self["segment_start"].is_monotonic_increasing: self.sort_values("segment_start", inplace=True) assert self["segment_start"].is_monotonic_increasing if inplace: frame = self else: frame = self.copy() assert type(traj_seg_filter) is TrajSegFilter frame.loc[ (frame["dist_meters"] < traj_seg_filter.min_length_meters) | (frame["dist_meters"] > traj_seg_filter.max_length_meters) | (frame["timespan_seconds"] < traj_seg_filter.min_time_secs) | (frame["timespan_seconds"] > traj_seg_filter.max_time_secs) | (frame["speed_kmhr"] < traj_seg_filter.min_speed_kmhr) | (frame["speed_kmhr"] > traj_seg_filter.max_speed_kmhr), "junk_status", ] = True if not inplace: return frame
[docs] def get_turn_angle(self): if not self["segment_start"].is_monotonic_increasing: self.sort_values("segment_start", inplace=True) assert self["segment_start"].is_monotonic_increasing def turn_angle(traj): return ((traj["heading"].diff() + 540) % 360 - 180)[traj["segment_end"].shift(1) == traj["segment_start"]] uniq = self.groupby_col.nunique() angles = ( self.groupby("groupby_col")[self.columns].apply(turn_angle, include_groups=False).droplevel(0) if uniq > 1 else turn_angle(self) ) return angles.rename("turn_angle").reindex(self.index)
[docs] def upsample(self, freq): """ Interpolate to create upsampled Relocations Parameters ---------- freq : str, pd.Timedelta or pd.DateOffset Sampling frequency for new Relocations object Returns ------- relocs : ecoscope.base.Relocations """ freq = pd.tseries.frequencies.to_offset(freq) if not self["segment_start"].is_monotonic_increasing: self.sort_values("segment_start", inplace=True) def f(traj): traj.crs = self.crs # Lost in groupby-apply due to GeoPandas bug times = pd.date_range(traj["segment_start"].iat[0], traj["segment_end"].iat[-1], freq=freq) start_i = traj["segment_start"].searchsorted(times, side="right") - 1 end_i = traj["segment_end"].searchsorted(times, side="left") valid_i = (start_i == end_i) | (times == traj["segment_start"].iloc[start_i]) traj = traj.iloc[start_i[valid_i]].reset_index(drop=True) times = times[valid_i] return gpd.GeoDataFrame( {"fixtime": times}, geometry=shapely.line_interpolate_point( traj["geometry"].values, (times - traj["segment_start"]) / (traj["segment_end"] - traj["segment_start"]), normalized=True, ), crs=traj.crs, ) return Relocations.from_gdf( self.groupby("groupby_col")[self.columns].apply(f, include_groups=False).reset_index(level=0) )
[docs] def to_relocations(self): """ Converts a Trajectory object to a Relocations object. Returns ------- ecoscope.base.Relocations """ def f(traj): traj.crs = self.crs points = np.concatenate([shapely.get_point(traj.geometry, 0), shapely.get_point(traj.geometry, 1)]) times = np.concatenate([traj["segment_start"], traj["segment_end"]]) return ( gpd.GeoDataFrame( {"fixtime": times}, geometry=points, crs=traj.crs, ) .drop_duplicates(subset=["fixtime"]) .sort_values("fixtime") ) return Relocations.from_gdf( self.groupby("groupby_col")[self.columns].apply(f, include_groups=False).reset_index(drop=True) )
[docs] def downsample(self, freq, tolerance="0S", interpolation=False): """ Function to downsample relocations. Parameters ---------- freq: str, pd.Timedelta or pd.DateOffset Downsampling frequency for new Relocations object tolerance: str, pd.Timedelta or pd.DateOffset Tolerance on the downsampling frequency interpolation: bool, optional If true, interpolates locations on the whole trajectory Returns ------- ecoscope.base.Relocations """ if interpolation: return self.upsample(freq) else: freq = pd.tseries.frequencies.to_offset(freq) tolerance = pd.tseries.frequencies.to_offset(tolerance) def f(relocs_ind): relocs_ind.crs = self.crs fixtime = relocs_ind["fixtime"] k = 1 i = 0 n = len(relocs_ind) out = np.full(n, -1) out[i] = k while i < (n - 1): t_min = fixtime.iloc[i] + freq - tolerance t_max = fixtime.iloc[i] + freq + tolerance j = i + 1 while (j < (n - 1)) and (fixtime.iloc[j] < t_min): j += 1 i = j if j == (n - 1): break elif (fixtime.iloc[j] >= t_min) and (fixtime.iloc[j] <= t_max): out[j] = k else: k += 1 out[j] = k relocs_ind["extra__burst"] = np.array(out, dtype=np.int64) relocs_ind.drop(relocs_ind.loc[relocs_ind["extra__burst"] == -1].index, inplace=True) return relocs_ind relocs = self.to_relocations() return relocs.groupby("groupby_col")[relocs.columns].apply(f).reset_index(drop=True)
[docs] @staticmethod def _straighttrack_properties(df: gpd.GeoDataFrame): """Private function used by Trajectory class.""" class Properties: @property def start_fixes(self): # unpack xy-coordinates of start fixes return df["geometry"].x, df["geometry"].y @property def end_fixes(self): # unpack xy-coordinates of end fixes return df["_geometry"].x, df["_geometry"].y @property def inverse_transformation(self): # use pyproj geodesic inverse function to compute vectorized distance & heading calculations return Geod(ellps="WGS84").inv(*self.start_fixes, *self.end_fixes) @property def heading(self): # Forward azimuth(s) forward_azimuth, _, _ = self.inverse_transformation forward_azimuth[forward_azimuth < 0] += 360 return forward_azimuth @property def dist_meters(self): _, _, distance = self.inverse_transformation return distance @property def nsd(self): start_point = df["geometry"].iloc[0] geod = Geod(ellps="WGS84") geod_displacement = [geod.inv(start_point.x, start_point.y, geo.x, geo.y)[2] for geo in df["_geometry"]] return [(x**2) / (1000 * 2) for x in geod_displacement] @property def timespan_seconds(self): return (df["_fixtime"] - df["fixtime"]).dt.total_seconds() @property def speed_kmhr(self): return (self.dist_meters / self.timespan_seconds) * 3.6 instance = Properties() return instance