Source code for ecoscope.io.earthranger

import datetime
import json
import math
import typing

import geopandas as gpd
import numpy as np
import pandas as pd
import pytz
import requests
from erclient.client import ERClient, ERClientException, ERClientNotFound
from shapely.geometry import shape
from tqdm.auto import tqdm

import ecoscope
from ecoscope.io.earthranger_utils import (
    clean_kwargs,
    clean_time_cols,
    dataframe_to_dict,
    format_iso_time,
    to_gdf,
)
from ecoscope.io.utils import pack_columns, to_hex


[docs] class EarthRangerIO(ERClient): def __init__(self, sub_page_size=4000, tcp_limit=5, **kwargs): if "server" in kwargs: server = kwargs.pop("server") kwargs["service_root"] = f"{server}/api/v1.0" kwargs["token_url"] = f"{server}/oauth2/token" self.sub_page_size = sub_page_size self.tcp_limit = tcp_limit kwargs["client_id"] = kwargs.get("client_id", "das_web_client") super().__init__(**kwargs) try: self.login() except ERClientNotFound: raise ERClientNotFound("Failed login. Check Stack Trace for specific reason.")
[docs] def _token_request(self, payload): response = requests.post(self.token_url, data=payload) if response.ok: self.auth = json.loads(response.text) expires_in = int(self.auth["expires_in"]) - 5 * 60 self.auth_expires = pytz.utc.localize(datetime.datetime.utcnow()) + datetime.timedelta(seconds=expires_in) return True self.auth = None self.auth_expires = pytz.utc.localize(datetime.datetime.min) raise ERClientNotFound(json.loads(response.text)["error_description"])
""" GET Functions """
[docs] def get_sources( self, manufacturer_id=None, provider_key=None, provider=None, id=None, **addl_kwargs, ): """ Parameters ---------- manufacturer_id provider_key provider id Returns ------- sources : pd.DataFrame DataFrame of queried sources """ params = clean_kwargs( addl_kwargs, manufacturer_id=manufacturer_id, provider_key=provider_key, provider=provider, id=id, ) df = pd.DataFrame( self.get_objects_multithreaded( object="sources/", threads=self.tcp_limit, page_size=self.sub_page_size, **params, ) ) return df
[docs] def get_subjects( self, include_inactive=None, bbox=None, subject_group_id=None, name=None, updated_since=None, tracks=None, id=None, updated_until=None, subject_group_name=None, max_ids_per_request=50, **addl_kwargs, ): """ Parameters ---------- include_inactive: Include inactive subjects in list. bbox: Include subjects having track data within this bounding box defined by a 4-tuple of coordinates marking west, south, east, north. subject_group_id: Indicate a subject group id for which Subjects should be listed. This is translated to the subject_group parameter in the ER backend name : Find subjects with the given name updated_since: Return Subject that have been updated since the given timestamp. tracks: Indicate whether to render each subject's recent tracks. id: A comma-delimited list of Subject IDs. updated_until subject_group_name: A subject group name for which Subjects should be listed. This is translated to the group_name parameter in the ER backend Returns ------- subjects : pd.DataFrame """ params = clean_kwargs( addl_kwargs, include_inactive=include_inactive, bbox=bbox, subject_group=subject_group_id, name=name, updated_since=updated_since, tracks=tracks, id=id, updated_until=updated_until, group_name=subject_group_name, ) assert params.get("subject_group") is None or params.get("group_name") is None if params.get("group_name") is not None: try: params["subject_group"] = self._get( "subjectgroups/", params={ "group_name": params.pop("group_name"), "include_inactive": True, "include_hidden": True, "flat": True, }, )[0]["id"] except IndexError: raise KeyError("`group_name` not found") if params.get("id") is not None: params["id"] = params.get("id").split(",") def partial_subjects(subjects): params["id"] = ",".join(subjects) return pd.DataFrame( self.get_objects_multithreaded( object="subjects/", threads=self.tcp_limit, page_size=self.sub_page_size, **params, ) ) df = pd.concat( [ partial_subjects(s) for s in np.array_split(params["id"], math.ceil(len(params["id"]) / max_ids_per_request)) ], ignore_index=True, ) else: df = pd.DataFrame( self.get_objects_multithreaded( object="subjects/", threads=self.tcp_limit, page_size=self.sub_page_size, **params, ) ) if not df.empty: df["hex"] = df["additional"].str["rgb"].map(to_hex) if "additional" in df else "#ff0000" df = clean_time_cols(df) return df
[docs] def get_subjectsources(self, subjects=None, sources=None, **addl_kwargs): """ Parameters ---------- subjects: A comma-delimited list of Subject IDs. sources: A comma-delimited list of Source IDs. Returns ------- subjectsources : pd.DataFrame """ params = clean_kwargs(addl_kwargs, sources=sources, subjects=subjects) df = pd.DataFrame( self.get_objects_multithreaded( object="subjectsources/", threads=self.tcp_limit, page_size=self.sub_page_size, **params, ) ) df = clean_time_cols(df) return df
[docs] def _get_observations( self, source_ids=None, subject_ids=None, subjectsource_ids=None, tz="UTC", since=None, until=None, filter=None, include_details=None, created_after=None, **addl_kwargs, ): """ Return observations matching queries. If `subject_id`, `source_id`, or `subjectsource_id` is specified, the index is set to the provided value. Parameters ---------- subject_ids: filter to a single subject source_ids: filter to a single source subjectsource_ids: filter to a subjectsource_id, rather than source_id + time range since: get observations after this ISO8061 date, include timezone until:get observations up to this ISO8061 date, include timezone filter filter using exclusion_flags for an observation. filter=None returns everything filter=0 filters out everything but rows with exclusion flag 0 (i.e, passes back clean data) filter=1 filters out everything but rows with exclusion flag 1 (i.e, passes back manually filtered data) filter=2, filters out everything but rows with exclusion flag 2 (i.e., passes back automatically filtered data) filter=3, filters out everything but rows with exclusion flag 2 or 1 (i.e., passes back both manual and automatically filtered data) include_details: one of [true,false], default is false. This brings back the observation additional field created_after: get observations created (saved in EarthRanger) after this ISO8061 date, include timezone Returns ------- observations : gpd.GeoDataFrame """ assert (source_ids, subject_ids, subjectsource_ids).count(None) == 2 params = clean_kwargs( addl_kwargs, since=since, until=until, filter=filter, include_details=include_details, created_after=created_after, ) if source_ids: id_name, ids = "source_id", source_ids elif subject_ids: id_name, ids = "subject_id", subject_ids else: id_name, ids = "subjectsource_id", subjectsource_ids observations = [] pbar = tqdm([ids] if isinstance(ids, str) else ids) for _id in pbar: params[id_name] = _id pbar.set_description(f"Downloading Observations for {id_name}={_id}") dataframe = pd.DataFrame( self.get_objects_multithreaded( object="observations/", threads=self.tcp_limit, page_size=self.sub_page_size, **params, ) ) dataframe[id_name] = _id observations.append(dataframe) observations = pd.concat(observations) if observations.empty: return gpd.GeoDataFrame() observations = clean_time_cols(observations) observations["created_at"] = observations["created_at"].dt.tz_convert(tz) observations["recorded_at"] = observations["recorded_at"].dt.tz_convert(tz) observations.sort_values("recorded_at", inplace=True) return to_gdf(observations)
[docs] def get_source_observations(self, source_ids, include_source_details=False, relocations=True, **kwargs): """ Get observations for each listed source and create a `Relocations` object. Parameters ---------- source_ids : str or list[str] List of source UUIDs include_source_details : bool, optional Whether to merge source info into dataframe kwargs Additional arguments to pass in the request to EarthRanger. See the docstring of `_get_observations` for info. Returns ------- relocations : ecoscope.base.Relocations Observations in `Relocations` format """ if isinstance(source_ids, str): source_ids = [source_ids] observations = self._get_observations(source_ids=source_ids, **kwargs) if observations.empty: return gpd.GeoDataFrame() if include_source_details: observations = observations.merge( pd.DataFrame(self.get_sources(id=",".join(observations["source"].unique()))).add_prefix("source__"), left_on="source", right_on="source__id", ) if relocations: return ecoscope.base.Relocations.from_gdf( observations, groupby_col="source", uuid_col="id", time_col="recorded_at", ) else: return observations
[docs] def get_subject_observations( self, subject_ids, include_source_details=False, include_subject_details=False, include_subjectsource_details=False, relocations=True, **kwargs, ): """ Get observations for each listed subject and create a `Relocations` object. Parameters ---------- subject_ids : str or list[str] or pd.DataFrame List of subject UUIDs, or a DataFrame of subjects include_source_details : bool, optional Whether to merge source info into dataframe include_subject_details : bool, optional Whether to merge subject info into dataframe include_subjectsource_details : bool, optional Whether to merge subjectsource info into dataframe kwargs Additional arguments to pass in the request to EarthRanger. See the docstring of `__get_observations` for info. Returns ------- relocations : ecoscope.base.Relocations Observations in `Relocations` format """ if isinstance(subject_ids, str): subject_ids = [subject_ids] elif isinstance(subject_ids, pd.DataFrame): subject_ids = subject_ids.id.tolist() elif not isinstance(subject_ids, list): raise ValueError(f"subject_ids must be either a str or list[str] or pd.DataFrame, not {type(subject_ids)}") observations = self._get_observations(subject_ids=subject_ids, **kwargs) if observations.empty: return gpd.GeoDataFrame() if include_source_details: observations = observations.merge( self.get_sources(id=",".join(observations["source"].unique())).add_prefix("source__"), left_on="source", right_on="source__id", ) if include_subject_details: if isinstance(subject_ids, pd.DataFrame): observations = observations.merge( subject_ids.add_prefix("subject__"), left_on="subject_id", right_on="subject__id", ) else: observations = observations.merge( self.get_subjects(id=",".join(subject_ids), include_inactive=True).add_prefix("subject__"), left_on="subject_id", right_on="subject__id", ) if include_subjectsource_details: observations = observations.merge( self.get_subjectsources(subjects=",".join(observations["subject_id"].unique())).add_prefix( "subjectsource__" ), left_on=["subject_id", "source"], right_on=["subjectsource__subject", "subjectsource__source"], ) if relocations: return ecoscope.base.Relocations.from_gdf( observations, groupby_col="subject_id", uuid_col="id", time_col="recorded_at", ) else: return observations
[docs] def get_subjectsource_observations( self, subjectsource_ids, include_source_details=False, relocations=True, **kwargs, ): """ Get observations for each listed subjectsource and create a `Relocations` object. Parameters ---------- subjectsource_ids : str or list[str] List of subjectsource UUIDs include_source_details : bool, optional Whether to merge source info into dataframe kwargs Additional arguments to pass in the request to EarthRanger. See the docstring of `__get_observations` for info. Returns ------- relocations : ecoscope.base.Relocations Observations in `Relocations` format """ if isinstance(subjectsource_ids, str): subjectsource_ids = [subjectsource_ids] observations = self._get_observations(subjectsource_ids=subjectsource_ids, **kwargs) if observations.empty: return gpd.GeoDataFrame() if include_source_details: observations = observations.merge( pd.DataFrame(self.get_sources(id=",".join(observations["source"].unique()))).add_prefix("source__"), left_on="source", right_on="source__id", ) if relocations: return ecoscope.base.Relocations.from_gdf( observations, groupby_col="subjectsource_id", uuid_col="id", time_col="recorded_at", ) else: return observations
[docs] def get_subjectgroup_observations( self, subject_group_id=None, subject_group_name=None, include_inactive=True, **kwargs, ): """ Parameters ---------- subject_group_id : str UUID of subject group to filter by subject_group_name : str Common name of subject group to filter by include_inactive : bool, optional Whether to get observations for Subjects marked inactive by EarthRanger kwargs Additional arguments to pass in the request to `get_subject_observations`. See the docstring of `get_subject_observations` for info. Returns ------- relocations : ecoscope.base.Relocations Observations in `Relocations` format """ assert (subject_group_id is None) != (subject_group_name is None) if subject_group_id: subjects = self.get_subjects(subject_group_id=subject_group_id, include_inactive=include_inactive) else: subjects = self.get_subjects(subject_group_name=subject_group_name, include_inactive=include_inactive) return self.get_subject_observations(subjects, **kwargs)
[docs] def get_event_types(self, include_inactive=False, **addl_kwargs): params = clean_kwargs(addl_kwargs, include_inactive=include_inactive) return pd.DataFrame(self._get("activity/events/eventtypes", **params))
[docs] def get_events( self, is_collection=None, updated_size=None, event_ids=None, bbox=None, sort_by=None, patrol_segment=None, state=None, event_type=None, include_updates=False, include_details=False, include_notes=False, include_related_events=False, include_files=False, max_results=None, oldest_update_date=None, exclude_contained=None, updated_since=None, event_category=None, since=None, until=None, **addl_kwargs, ): """ Parameters ---------- is_collection true/false whether to filter on is_collection updated_since date-string to limit on updated_at event_ids : array[string] Event IDs, comma-separated bbox bounding box including four coordinate values, comma-separated. Ex. bbox=-122.4,48.4,-122.95,49.0 (west, south, east, north). sort_by Sort by (use 'event_time', 'updated_at', 'created_at', 'serial_number') with optional minus ('-') prefix to reverse order. patrol_segment ID of patrol segment to filter on state Comma-separated list of 'scheduled'/'active'/'overdue'/'done'/'cancelled' event_type Comma-separated list of event type uuids include_updates Boolean value include_details Boolean value include_notes Boolean value include_related_events Boolean value include_files Boolean value max_results oldest_update_date exclude_contained event_category since until Returns ------- events : gpd.GeoDataFrame GeoDataFrame of queried events """ params = clean_kwargs( addl_kwargs, is_collection=is_collection, updated_size=updated_size, event_ids=event_ids, bbox=bbox, sort_by=sort_by, patrol_segment=patrol_segment, state=state, event_type=event_type, include_updates=include_updates, include_details=include_details, include_notes=include_notes, include_related_events=include_related_events, include_files=include_files, max_results=max_results, oldest_update_date=oldest_update_date, exclude_contained=exclude_contained, updated_since=updated_since, event_category=event_category, ) filter = {"date_range": {}} if since is not None: filter["date_range"]["lower"] = since params["filter"] = json.dumps(filter) if until is not None: filter["date_range"]["upper"] = until params["filter"] = json.dumps(filter) df = pd.DataFrame( self.get_objects_multithreaded( object="activity/events/", threads=self.tcp_limit, page_size=self.sub_page_size, **params, ) ) gdf = gpd.GeoDataFrame(df) if not df.empty: df = clean_time_cols(df) if gdf.loc[0, "location"] is not None: gdf.loc[~gdf["geojson"].isna(), "geometry"] = gpd.GeoDataFrame.from_features( gdf.loc[~gdf["geojson"].isna(), "geojson"] )["geometry"] gdf.set_geometry("geometry", inplace=True) gdf.set_crs(4326, inplace=True) gdf.sort_values("time", inplace=True) gdf.set_index("id", inplace=True) return gdf
[docs] def get_patrol_types(self): df = pd.DataFrame(self._get("activity/patrols/types")) return df.set_index("id")
[docs] def get_patrols(self, since=None, until=None, patrol_type=None, patrol_type_value=None, status=None, **addl_kwargs): """ Parameters ---------- since: Lower time range until: Upper time range patrol_type: A patrol type UUID or a list of UUIDs patrol_type_value: A patrol type value or a list of patrol type values status 'scheduled'/'active'/'overdue'/'done'/'cancelled' Accept a status string or a list of statuses Returns ------- patrols : pd.DataFrame DataFrame of queried patrols """ patrol_type_value_list = [patrol_type_value] if isinstance(patrol_type_value, str) else patrol_type_value params = clean_kwargs( addl_kwargs, status=status, patrol_type=[patrol_type] if isinstance(patrol_type, str) else patrol_type, patrol_type_value=patrol_type_value_list, return_data=True, ) filter = {"date_range": {}, "patrol_type": []} if since is not None: filter["date_range"]["lower"] = format_iso_time(since) if until is not None: filter["date_range"]["upper"] = format_iso_time(until) if patrol_type is not None: filter["patrol_type"] = params["patrol_type"] if patrol_type_value_list is not None: patrol_types = self.get_patrol_types() matching_rows = patrol_types[patrol_types["value"].isin(patrol_type_value_list)] missing_values = set(patrol_type_value_list) - set(matching_rows["value"]) if missing_values: raise ValueError(f"Failed to find IDs for values: {missing_values}") filter["patrol_type"] = matching_rows.index.tolist() params["filter"] = json.dumps(filter) df = pd.DataFrame( self.get_objects_multithreaded( object="activity/patrols", threads=self.tcp_limit, page_size=self.sub_page_size, **params, ) ) if "serial_number" in df.columns: df = df.sort_values(by="serial_number").reset_index(drop=True) df = clean_time_cols(df) return df
[docs] def get_patrol_events( self, since=None, until=None, patrol_type=None, patrol_type_value=None, status=None, **addl_kwargs ): """ Parameters ---------- since: Lower time range until: Upper time range patrol_type: A patrol type UUID or a list of UUIDs patrol_type_value: A patrol type value or a list of patrol type values status 'scheduled'/'active'/'overdue'/'done'/'cancelled' Accept a status string or a list of statuses Returns ------- events : pd.DataFrame DataFrame of queried patrols """ patrol_df = self.get_patrols( since=since, until=until, patrol_type=patrol_type, patrol_type_value=patrol_type_value, status=status, **addl_kwargs, ) events = [] for _, row in patrol_df.iterrows(): if row["patrol_segments"]: for segment in row["patrol_segments"]: for event in segment.get("events", []): event["patrol_id"] = row.get("id") event["patrol_segment_id"] = segment.get("id") events.append(event) events_df = pd.DataFrame(events) events_df = clean_time_cols(events_df) events_df["geometry"] = events_df["geojson"].apply(lambda x: shape(x.get("geometry"))) events_df["time"] = events_df["geojson"].apply( lambda x: datetime.datetime.strptime(x.get("properties").get("datetime"), "%Y-%m-%dT%H:%M:%S%z") ) return gpd.GeoDataFrame(events_df, geometry="geometry", crs=4326)
[docs] def get_patrol_segments_from_patrol_id(self, patrol_id, **addl_kwargs): """ Download patrols for a given `patrol id`. Parameters ---------- patrol_id : Patrol UUID. kwargs Additional parameters to pass to `_get`. Returns ------- dataframe : Dataframe of patrols. """ params = clean_kwargs(addl_kwargs) object = f"activity/patrols/{patrol_id}/" df = self._get(object, **params) df["patrol_segments"][0].pop("updates") df.pop("updates") df = clean_time_cols(df) return pd.DataFrame(dict([(k, pd.Series(v)) for k, v in df.items()]))
[docs] def get_patrol_segments(self): object = "activity/patrols/segments/" return pd.DataFrame( self.get_objects_multithreaded(object=object, threads=self.tcp_limit, page_size=self.sub_page_size) )
[docs] def get_patrol_observations_with_patrol_filter( self, since=None, until=None, patrol_type=None, patrol_type_value=None, status=None, include_patrol_details=False, **kwargs, ): """ Download observations for patrols with provided filters. Parameters ---------- since: Lower time range until: Upper time range patrol_type: A patrol type UUID or a list of UUIDs patrol_type_value: A patrol type value or a list of patrol type values status 'scheduled'/'active'/'overdue'/'done'/'cancelled' Accept a status string or a list of statuses include_patrol_details : bool, optional Whether to merge patrol details into dataframe kwargs Additional parameters to pass to `get_subject_observations`. Returns ------- relocations : ecoscope.base.Relocations """ patrols_df = self.get_patrols( since=since, until=until, patrol_type=patrol_type, patrol_type_value=patrol_type_value, status=status, **kwargs, ) return self.get_patrol_observations(patrols_df, include_patrol_details=include_patrol_details, **kwargs)
[docs] def get_patrol_observations(self, patrols_df, include_patrol_details=False, **kwargs): """ Download observations for provided `patrols_df`. Parameters ---------- patrols_df : pd.DataFrame Data returned from a call to `get_patrols`. include_patrol_details : bool, optional Whether to merge patrol details into dataframe kwargs Additional parameters to pass to `get_subject_observations`. Returns ------- relocations : ecoscope.base.Relocations """ observations = [] df_pt = self.get_patrol_types() for _, patrol in patrols_df.iterrows(): for patrol_segment in patrol["patrol_segments"]: subject_id = (patrol_segment.get("leader") or {}).get("id") patrol_start_time = (patrol_segment.get("time_range") or {}).get("start_time") patrol_end_time = (patrol_segment.get("time_range") or {}).get("end_time") patrol_type = df_pt[df_pt["value"] == patrol_segment.get("patrol_type")].reset_index()["id"][0] if None in {subject_id, patrol_start_time}: continue try: observation = self.get_subject_observations( subject_ids=[subject_id], since=patrol_start_time, until=patrol_end_time, **kwargs, ) if include_patrol_details: observation["patrol_id"] = patrol["id"] observation["patrol_serial_number"] = patrol["serial_number"] observation["patrol_start_time"] = patrol_start_time observation["patrol_end_time"] = patrol_end_time observation["patrol_type"] = patrol_type observation = ( observation.reset_index() .merge( pd.DataFrame(df_pt).add_prefix("patrol_type__"), left_on="patrol_type", right_on="id", ) .drop( columns=[ "patrol_type__ordernum", "patrol_type__icon_id", "patrol_type__default_priority", "patrol_type__is_active", ] ) ) if len(observation) > 0: observations.append(observation) except Exception as e: print( f"Getting observations for subject_id={subject_id} start_time={patrol_start_time}" f"end_time={patrol_end_time} failed for: {e}" ) df = pd.concat(observations) df = clean_time_cols(df) df = ecoscope.base.Relocations(df) if include_patrol_details: return df.set_index("id") return df
[docs] def get_patrol_segment_events( self, patrol_segment_id=None, include_details=False, include_files=False, include_related_events=False, include_notes=False, **addl_kwargs, ): params = clean_kwargs( addl_kwargs, patrol_segment_id=patrol_segment_id, include_details=include_details, include_files=include_files, include_related_events=include_related_events, include_notes=include_notes, ) object = f"activity/patrols/segments/{patrol_segment_id}/events/" df = pd.DataFrame( self.get_objects_multithreaded( object=object, threads=self.tcp_limit, page_size=self.sub_page_size, **params, ) ) df = clean_time_cols(df) return df
[docs] def get_spatial_features_group(self, spatial_features_group_id=None, **addl_kwargs): """ Download spatial features in a spatial features group for a given `spatial features group id`. Parameters ---------- spatial_features_group_id : Spatial Features Group UUID. kwargs Additional parameters to pass to `_get`. Returns ------- dataframe : GeoDataFrame of spatial features in a spatial features group. """ params = clean_kwargs(addl_kwargs, spatial_features_group_id=spatial_features_group_id) object = f"spatialfeaturegroup/{spatial_features_group_id}/" spatial_features_group = self._get(object, **params) spatial_features = [] for spatial_feature in spatial_features_group["features"]: spatial_features.append(spatial_feature["features"][0]) return gpd.GeoDataFrame.from_features(spatial_features)
[docs] def get_spatial_feature(self, spatial_feature_id=None, **addl_kwargs): """ Download spatial feature for a given `spatial feature id`. Parameters ---------- spatial_feature_id : Spatial Feature UUID. kwargs Additional parameters to pass to `_get`. Returns ------- dataframe : GeoDataFrame of spatial feature. """ params = clean_kwargs(addl_kwargs, spatial_feature_id=spatial_feature_id) object = f"spatialfeature/{spatial_feature_id}/" spatial_feature = self._get(object, **params) return gpd.GeoDataFrame.from_features(spatial_feature["features"])
""" POST Functions """
[docs] def post_source( self, source_type: str, manufacturer_id: str, model_name: str, provider: str = "default", additional: typing.Dict = {}, **kwargs, ) -> pd.DataFrame: """ Parameters ---------- source_type manufacturer_id model_name provider additional Returns ------- pd.DataFrame """ payload = { "source_type": source_type, "manufacturer_id": manufacturer_id, "model_name": model_name, "additional": additional, "provider": provider, } if kwargs: payload.update(kwargs) response = self._post("sources", payload=payload) return pd.DataFrame([response])
[docs] def post_subject( self, subject_name: str, subject_type: str, subject_subtype: str, is_active: bool = True, **kwargs, ) -> pd.DataFrame: """ Parameters ---------- subject_name subject_subtype is_active Returns ------- pd.DataFrame """ payload = { "name": subject_name, "subject_subtype": subject_subtype, "is_active": is_active, } if kwargs: payload.update(kwargs) response = self._post("subjects", payload=payload) return pd.DataFrame([response])
[docs] def post_subjectsource( self, subject_id: str, source_id: str, lower_bound_assigned_range: datetime.datetime, upper_bound_assigned_range: datetime.datetime, additional: typing.Dict = None, ) -> pd.DataFrame: """ Parameters ---------- subject_id source_id lower_bound_assigned_range upper_bound_assigned_range additional Returns ------- pd.DataFrame """ if additional is None: additional = {} payload = { "source": source_id, "assigned_range": { "lower": lower_bound_assigned_range, "upper": upper_bound_assigned_range, }, "additional": additional, } urlpath = f"subject/{subject_id}/sources" response = self._post(urlpath, payload=payload) return pd.DataFrame([response])
[docs] def post_observations( self, observations: gpd.GeoDataFrame, source_id_col: str = "source", recorded_at_col: str = "recorded_at", ) -> pd.DataFrame: """ Parameters ---------- observations : gpd.GeoDataFrame observation data to be uploaded source_id_col : str The source column in the observation dataframe recorded_at_col : str The observation recorded time column in the dataframe Returns ------- None """ def upload(obs): try: obs = obs.rename(columns={source_id_col: "source", recorded_at_col: "recorded_at"}) obs["location"] = pd.DataFrame({"longitude": obs.geometry.x, "latitude": obs.geometry.y}).to_dict( "records" ) del obs["geometry"] obs = pack_columns(obs, columns=["source", "recorded_at", "location"]) post_data = obs.to_dict("records") results = super(EarthRangerIO, self).post_observation(post_data) except ERClientException as exc: self.logger.error(exc) except requests.exceptions.RequestException as exc: self.logger.error(exc) else: return pd.DataFrame(results) return observations.groupby(source_id_col, group_keys=False).apply(upload)
[docs] def post_event( self, events: typing.Union[gpd.GeoDataFrame, pd.DataFrame, typing.Dict, typing.List[typing.Dict]], ) -> pd.DataFrame: """ Parameters ---------- events Returns ------- pd.DataFrame: New events created in EarthRanger. """ events = dataframe_to_dict(events) results = super().post_event(event=events) results = results if isinstance(results, list) else [results] return pd.DataFrame(results)
[docs] def post_patrol(self, priority: int, **kwargs) -> pd.DataFrame: """ Parameters ---------- priority Returns ------- pd.DataFrame """ payload = {"priority": priority} if kwargs: payload.update(kwargs) response = self._post("activity/patrols", payload=payload) return pd.DataFrame([response])
[docs] def post_patrol_segment( self, patrol_id: str, patrol_segment_id: str, patrol_type: str = None, tracked_subject_id: str = None, scheduled_start: str = None, scheduled_end: str = None, start_time: str = None, end_time: str = None, start_location: typing.Tuple[float, float] = None, end_location: typing.Tuple[float, float] = None, **kwargs, ) -> pd.DataFrame: """ Parameters ---------- patrol_id patrol_segment_id patrol_type tracked_subject_id scheduled_start scheduled_end start_time end_time start_location end_location Returns ------- pd.DataFrame """ payload = { "patrol": patrol_id, "patrol_segment": patrol_segment_id, "scheduled_start": scheduled_start, "scheduled_end": scheduled_end, "time_range": {"start_time": start_time, "end_time": end_time}, } if tracked_subject_id is not None: payload.update( { "leader": { "content_type": "observations.subject", "id": tracked_subject_id, } } ) else: payload.update({"leader": None}) if start_location is not None: payload.update( { "start_location": { "latitude": start_location[0], "longitude": start_location[1], } } ) else: payload.update({"start_location": None}) if end_location is not None: payload.update( { "end_location": { "latitude": end_location[0], "longitude": end_location[1], } } ) else: payload.update({"end_location": None}) if patrol_type is not None: payload.update({"patrol_type": patrol_type}) if kwargs: payload.update(kwargs) response = self._post("activity/patrols/segments/", payload=payload) return pd.DataFrame([response])
[docs] def post_patrol_segment_event( self, patrol_segment_id: str, event_type: str, **addl_kwargs, ) -> pd.DataFrame: """ Parameters ---------- patrol_segment_id event_type Returns ------- pd.DataFrame """ payload = { "patrol_segment": patrol_segment_id, "event_type": event_type, } if addl_kwargs: payload.update(addl_kwargs) response = self._post(f"activity/patrols/segments/{patrol_segment_id}/events/", payload=payload) return pd.DataFrame([response])
""" PATCH Functions """
[docs] def patch_event( self, event_id: str, events: typing.Union[gpd.GeoDataFrame, pd.DataFrame, typing.Dict, typing.List[typing.Dict]], ) -> pd.DataFrame: """ Parameters ---------- event_id UUID for the event that will be updated. events Returns ------- pd.DataFrame: Updated events in EarthRanger. """ events = dataframe_to_dict(events) if isinstance(events, list): results = [self._patch(f"activity/event/{event_id}", payload=event) for event in events] else: results = [self._patch(f"activity/event/{event_id}", payload=events)] return pd.DataFrame(results)
""" DELETE Functions """
[docs] def delete_observation(self, observation_id: str): """ Parameters ---------- observation_id ------- """ self._delete("observation/" + observation_id + "/")