Source code for ecoscope.analysis.proximity
import itertools
import typing
import uuid
from dataclasses import dataclass, field
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point
[docs]@dataclass
class SpatialFeature:
"""
A spatial geometry with an associated name and unique ID. Becomes a useful construct in several movdata calculations
"""
name: str = ""
unique_id: typing.Any = uuid.uuid4()
geometry: typing.Any = None
[docs]@dataclass
class ProximityProfile:
spatial_features: typing.List[SpatialFeature] = field(default=list)
[docs]class Proximity:
[docs] @classmethod
def calculate_proximity(cls, proximity_profile, trajectory):
"""
A function to analyze the trajectory of a subject in relation to a set of spatial features and regions to
determine where/when the subject was proximal to the spatial feature.
Parameters
----------
proximity_profile: ProximityProfile
proximity setting for performing calculation
trajectory: ecoscope.base.Trajectory
Geodataframe stores goemetry, speed_kmhr, heading etc. for each subject.
Returns
-------
pd.DataFrame
"""
proximity_events = []
def analysis(traj):
for sf in proximity_profile.spatial_features:
proximity_dist = traj.geometry.distance(sf.geometry)
start_fix = gpd.GeoSeries([Point(g.coords[0]) for g in traj.geometry])
pr = traj[["groupby_col", "speed_kmhr", "heading"]]
pr["proximity_distance"] = proximity_dist
pr["proximal_fix"] = start_fix # TODO: figure out the estimated fix interpolated along the seg
pr["estimated_time"] = traj.segment_start
pr["geometry"] = traj.geometry
pr["spatialfeature_id"] = list(itertools.repeat(sf.unique_id, pr.shape[0]))
pr["spatialfeature_name"] = list(itertools.repeat(sf.name, pr.shape[0]))
proximity_events.append(pr)
trajectory.groupby("groupby_col").apply(analysis)
return pd.concat(proximity_events).reset_index(drop=True)