Source code for ecoscope.relocations

import warnings
from copy import deepcopy
from functools import cached_property

import geopandas as gpd  # type: ignore[import-untyped]
import numpy as np
import pandas as pd
from pyproj import Geod

from ecoscope.base import EcoDataFrame
from ecoscope.base._dataclasses import (
    RelocsCoordinateFilter,
    RelocsDateRangeFilter,
    RelocsDistFilter,
    RelocsFilterType,
    RelocsSpeedFilter,
)
from ecoscope.base.straightrack import StraightTrackProperties


[docs] class Relocations(EcoDataFrame): """ Relocation is a model for a set of fixes from a given subject. Because fixes are temporal, they can be ordered asc or desc. The additional_data dict can contain info specific to the subject and relocations: name, type, region, sex etc. These values are applicable to all fixes in the relocations array. If they vary, then they should be put into each fix's additional_data dict. """
[docs] @classmethod def from_gdf( cls, gdf: gpd.GeoDataFrame, groupby_col: str | None = None, time_col: str = "fixtime", uuid_col: str | None = None, copy: bool = True, ): """ Parameters ---------- gdf : GeoDataFrame Observations data groupby_col : str, optional Name of `gdf` column of identities to treat as separate individuals. Usually `subject_id`. Default is treating the gdf as being of a single individual. time_col : str, optional Name of `gdf` column containing relocation times. Default is 'fixtime'. uuid_col : str, optional Name of `gdf` column of row identities. Used as index. Default is existing index. copy : bool, optional Whether or not to copy the `gdf`. Defaults to `True`. """ assert {"geometry", time_col}.issubset(gdf) if copy: gdf = gdf.copy() if groupby_col is None: if "groupby_col" not in gdf: gdf["groupby_col"] = 0 else: gdf["groupby_col"] = gdf.loc[:, groupby_col] if time_col != "fixtime": gdf["fixtime"] = gdf.loc[:, time_col] if not pd.api.types.is_datetime64_any_dtype(gdf["fixtime"]): warnings.warn( f"{time_col} is not of type datetime64. Attempting to automatically infer format and timezone. " "Results may be incorrect." ) gdf["fixtime"] = pd.to_datetime(gdf["fixtime"]) if gdf["fixtime"].dt.tz is None: warnings.warn(f"{time_col} is not timezone aware. Assuming datetime are in UTC.") gdf["fixtime"] = gdf["fixtime"].dt.tz_localize(tz="UTC") if gdf.crs is None: warnings.warn("CRS was not set. Assuming geometries are in WGS84.") gdf.set_crs(4326, inplace=True) if uuid_col is not None: gdf.set_index(uuid_col, drop=False, inplace=True) gdf["junk_status"] = False default_cols = ["groupby_col", "fixtime", "junk_status", "geometry"] extra_cols = gdf.columns.difference(default_cols) extra_cols = extra_cols[~extra_cols.str.startswith("extra__")] assert gdf.columns.intersection("extra__" + extra_cols).empty, "Column names overlap with existing `extra`" gdf.rename(columns=dict(zip(extra_cols, "extra__" + extra_cols)), inplace=True) return cls(gdf=gdf)
[docs] @staticmethod def _apply_speedfilter(df: pd.DataFrame, fix_filter: RelocsSpeedFilter): gdf = df.assign( _fixtime=df["fixtime"].shift(-1), _geometry=df["geometry"].shift(-1), _junk_status=df["junk_status"].shift(-1), )[:-1] straight_track = StraightTrackProperties(gdf) gdf["speed_kmhr"] = straight_track.speed_kmhr gdf.loc[ (~gdf["junk_status"]) & (~gdf["_junk_status"]) & (gdf["speed_kmhr"] > fix_filter.max_speed_kmhr), "junk_status", ] = True gdf.drop( ["_fixtime", "_geometry", "_junk_status", "speed_kmhr"], axis=1, inplace=True, ) return gdf
[docs] @staticmethod def _apply_distfilter(df: pd.DataFrame, fix_filter: RelocsDistFilter): gdf = df.assign( _junk_status=df["junk_status"].shift(-1), _geometry=df["geometry"].shift(-1), )[:-1] _, _, distance_m = Geod(ellps="WGS84").inv( gdf["geometry"].x, gdf["geometry"].y, gdf["_geometry"].x, gdf["_geometry"].y ) gdf["distance_km"] = distance_m / 1000 gdf.loc[ (~gdf["junk_status"]) & (~gdf["_junk_status"]) & (gdf["distance_km"] < fix_filter.min_dist_km) | (gdf["distance_km"] > fix_filter.max_dist_km), "junk_status", ] = True gdf.drop(["_geometry", "_junk_status", "distance_km"], axis=1, inplace=True) return gdf
[docs] def apply_reloc_filter(self, fix_filter: RelocsFilterType | None = None, inplace: bool = False): """Apply a given filter by marking the fix junk_status based on the conditions of a filter""" if not self.gdf["fixtime"].is_monotonic_increasing: self.gdf.sort_values("fixtime", inplace=True) assert self.gdf["fixtime"].is_monotonic_increasing if inplace: relocs = self else: relocs = deepcopy(self) # Identify junk fixes based on location coordinate x,y ranges or that match specific coordinates if isinstance(fix_filter, RelocsCoordinateFilter): relocs.gdf.loc[ (relocs.gdf["geometry"].x < fix_filter.min_x) | (relocs.gdf["geometry"].x > fix_filter.max_x) | (relocs.gdf["geometry"].y < fix_filter.min_y) | (relocs.gdf["geometry"].y > fix_filter.max_y) | (relocs.gdf["geometry"].isin(fix_filter.filter_point_coords)), "junk_status", ] = True # Mark fixes outside this date range as junk elif isinstance(fix_filter, RelocsDateRangeFilter): if fix_filter.start is not None: relocs.gdf.loc[relocs.gdf["fixtime"] < fix_filter.start, "junk_status"] = True if fix_filter.end is not None: relocs.gdf.loc[relocs.gdf["fixtime"] > fix_filter.end, "junk_status"] = True else: crs = relocs.gdf.crs relocs.gdf.to_crs(4326) if isinstance(fix_filter, RelocsSpeedFilter): relocs.gdf._update_inplace( relocs.gdf.groupby("groupby_col")[relocs.gdf.columns] .apply(self._apply_speedfilter, fix_filter=fix_filter) .droplevel(["groupby_col"]) ) elif isinstance(fix_filter, RelocsDistFilter): relocs.gdf._update_inplace( relocs.gdf.groupby("groupby_col")[relocs.gdf.columns] .apply(self._apply_distfilter, fix_filter=fix_filter) .droplevel(["groupby_col"]) ) relocs.gdf.to_crs(crs, inplace=True) if not inplace: return relocs
@cached_property def distance_from_centroid(self): # calculate the distance between the centroid and the fix gs = self.gdf.geometry.to_crs(crs=self.gdf.estimate_utm_crs()) return gs.distance(gs.unary_union.centroid) @cached_property def cluster_radius(self): """ The cluster radius is the largest distance between a point in the relocationss and the centroid of the relocationss """ distance = self.distance_from_centroid return distance.max() @cached_property def cluster_std_dev(self): """ The cluster standard deviation is the standard deviation of the radii from the centroid to each point making up the cluster """ distance = self.distance_from_centroid return np.std(distance)
[docs] def threshold_point_count(self, threshold_dist: float): """Counts the number of points in the cluster that are within a threshold distance of the geographic centre""" distance = self.distance_from_centroid return distance[distance <= threshold_dist].size
[docs] def apply_threshold_filter(self, threshold_dist_meters: float = float("Inf")): # Apply filter to the underlying geodataframe. distance = self.distance_from_centroid _filter = distance > threshold_dist_meters self.gdf.loc[_filter, "junk_status"] = True