import datetime
import json
import math
import typing
from contextlib import contextmanager
from typing import Any, Literal
import geopandas as gpd # type: ignore[import-untyped]
import numpy as np
import pandas as pd
import pytz
import requests
from erclient.client import ERClient, ERClientException, ERClientNotFound # type: ignore[import-untyped]
from tqdm.auto import tqdm
from ecoscope.base.utils import BoundingBox
from ecoscope.io.earthranger_utils import (
clean_kwargs,
dataframe_to_dict_or_list,
format_iso_time,
geometry_from_event_geojson,
pack_columns,
to_gdf,
to_hex,
unpack_events_from_patrols_df,
)
from ecoscope.io.utils import clean_time_cols
from ecoscope.relocations import Relocations
EventSortOptions = Literal[
"event_time",
"updated_at",
"created_at",
"serial_number",
"-event_time",
"-updated_at",
"-created_at",
"-serial_number",
]
StatusOptions = Literal["scheduled", "active", "overdue", "done", "cancelled"]
ApiVersionSelection = Literal["v1", "v2", "both"]
AppendCategorySelection = Literal["duplicates", "always", "never"]
SAFE_QUERY_PARAM_LIST_SIZE = 50
[docs]
class EarthRangerIO(ERClient):
def __init__(self, sub_page_size: int = 4000, tcp_limit: int = 5, **kwargs):
if "server" in kwargs:
self.server = kwargs.pop("server")
kwargs["service_root"] = f"{self.server}/api/v1.0"
kwargs["token_url"] = f"{self.server}/oauth2/token"
self.token = kwargs.get("token")
self.sub_page_size = sub_page_size
self.tcp_limit = tcp_limit
kwargs["client_id"] = kwargs.get("client_id", "das_web_client")
super().__init__(**kwargs)
if not self.auth:
try:
self.login()
except ERClientNotFound:
raise ERClientNotFound("Failed login. Check Stack Trace for specific reason.")
else:
try:
self.get_me()
except ERClientException:
raise ERClientException("Authorization token is invalid or expired.")
[docs]
def _token_request(self, payload):
response = requests.post(self.token_url, data=payload)
if response.ok:
self.auth = response.json()
expires_in = int(self.auth["expires_in"]) - 5 * 60
self.auth_expires = pytz.utc.localize(datetime.datetime.utcnow()) + datetime.timedelta(seconds=expires_in)
return True
self.auth = None
self.auth_expires = pytz.utc.localize(datetime.datetime.min)
raise ERClientNotFound(f"{response.status_code}, {response.text}")
[docs]
@contextmanager
def _use_v2_api(self):
"""
Context manager to safely handle switching the internal client
service root to the v2 api base where required, and then switch back to v1
"""
self.service_root = f"{self.server}/api/v2.0"
try:
yield
finally:
self.service_root = f"{self.server}/api/v1.0"
"""
GET Functions
"""
[docs]
def get_sources(
self,
manufacturer_id: str | None = None,
provider_key: str | None = None,
provider: str | None = None,
id: str | None = None,
sub_page_size: int | None = None,
**addl_kwargs,
) -> pd.DataFrame:
"""
Parameters
----------
manufacturer_id
provider_key
provider
id
sub_page_size: int | None
Optionally set a specific sub_page_size for this request, instead of using the client default
Returns
-------
sources : pd.DataFrame
DataFrame of queried sources
"""
params = clean_kwargs(
addl_kwargs,
manufacturer_id=manufacturer_id,
provider_key=provider_key,
provider=provider,
id=id,
)
df = pd.DataFrame(
self.get_objects_multithreaded(
object="sources/",
threads=self.tcp_limit,
page_size=sub_page_size or self.sub_page_size,
**params,
)
)
return df
[docs]
def get_subjects(
self,
include_inactive: bool | None = None,
bbox: BoundingBox | None = None,
subject_group_id: str | None = None,
name: str | None = None,
updated_since: str | None = None,
tracks: bool | None = None,
id: str | None = None,
updated_until: str | None = None,
subject_group_name: str | None = None,
max_ids_per_request: int = 50,
sub_page_size: int | None = None,
**addl_kwargs,
) -> pd.DataFrame:
"""
Parameters
----------
include_inactive: Include inactive subjects in list.
bbox: Include subjects having track data within this bounding box defined by a 4-tuple of coordinates marking
west, south, east, north.
subject_group_id: Indicate a subject group id for which Subjects should be listed.
This is translated to the subject_group parameter in the ER backend
name : Find subjects with the given name
updated_since: Return Subject that have been updated since the given timestamp.
tracks: Indicate whether to render each subject's recent tracks.
id: A comma-delimited list of Subject IDs.
updated_until
subject_group_name: A subject group name for which Subjects should be listed.
This is translated to the group_name parameter in the ER backend
sub_page_size: int | None
Optionally set a specific sub_page_size for this request, instead of using the client default
Returns
-------
subjects : pd.DataFrame
"""
params = clean_kwargs(
addl_kwargs,
include_inactive=include_inactive,
bbox=bbox,
subject_group=subject_group_id,
name=name,
updated_since=updated_since,
tracks=tracks,
id=id,
updated_until=updated_until,
group_name=subject_group_name,
)
assert params.get("subject_group") is None or params.get("group_name") is None
if params.get("group_name") is not None:
try:
params["subject_group"] = self._get(
"subjectgroups/",
params={
"group_name": params.pop("group_name"),
"include_inactive": True,
"include_hidden": True,
"flat": True,
},
)[0]["id"]
except IndexError:
raise KeyError("`group_name` not found")
if params.get("id") is not None:
params["id"] = str(params.get("id")).split(",")
def partial_subjects(subjects):
params["id"] = ",".join(subjects)
return pd.DataFrame(
self.get_objects_multithreaded(
object="subjects/",
threads=self.tcp_limit,
page_size=sub_page_size or self.sub_page_size,
**params,
)
)
df = pd.concat(
[
partial_subjects(s)
for s in np.array_split(params["id"], math.ceil(len(params["id"]) / max_ids_per_request))
],
ignore_index=True,
)
else:
df = pd.DataFrame(
self.get_objects_multithreaded(
object="subjects/",
threads=self.tcp_limit,
page_size=sub_page_size or self.sub_page_size,
**params,
)
)
if not df.empty:
df["hex"] = df["additional"].str["rgb"].map(to_hex) if "additional" in df else "#ff0000"
df = clean_time_cols(df)
return df
[docs]
def get_subjectsources(
self, subjects: str | None = None, sources: str | None = None, sub_page_size: int | None = None, **addl_kwargs
) -> pd.DataFrame:
"""
Parameters
----------
subjects: A comma-delimited list of Subject IDs.
sources: A comma-delimited list of Source IDs.
sub_page_size: int | None
Optionally set a specific sub_page_size for this request, instead of using the client default
Returns
-------
subjectsources : pd.DataFrame
"""
params = clean_kwargs(addl_kwargs, sources=sources, subjects=subjects)
df = pd.DataFrame(
self.get_objects_multithreaded(
object="subjectsources/",
threads=self.tcp_limit,
page_size=sub_page_size or self.sub_page_size,
**params,
)
)
df = clean_time_cols(df)
return df
[docs]
def _get_observations(
self,
source_ids: str | list[str] | None = None,
subject_ids: str | list[str] | None = None,
subjectsource_ids: str | list[str] | None = None,
tz="UTC",
since: str | None = None,
until: str | None = None,
filter: int | None = None,
include_details: bool | None = None,
created_after: str | None = None,
sub_page_size: int | None = None,
**addl_kwargs,
) -> gpd.GeoDataFrame:
"""
Return observations matching queries. If `subject_id`, `source_id`, or `subjectsource_id` is specified, the
index is set to the provided value.
Parameters
----------
subject_ids: filter to a single subject
source_ids: filter to a single source
subjectsource_ids: filter to a subjectsource_id, rather than source_id + time range
since: get observations after this ISO8061 date, include timezone
until:get observations up to this ISO8061 date, include timezone
filter
filter using exclusion_flags for an observation.
filter=None returns everything
filter=0 filters out everything but rows with exclusion flag 0 (i.e, passes back clean data)
filter=1 filters out everything but rows with exclusion flag 1 (i.e, passes back manually filtered data)
filter=2, filters out everything but rows with exclusion flag 2 (i.e., passes back automatically filtered
data)
filter=3, filters out everything but rows with exclusion flag 2 or 1 (i.e., passes back both manual and
automatically filtered data)
include_details: one of [true,false], default is false. This brings back the observation additional field
created_after: get observations created (saved in EarthRanger) after this ISO8061 date, include timezone
sub_page_size: int | None
Optionally set a specific sub_page_size for this request, instead of using the client default
Returns
-------
observations : gpd.GeoDataFrame
"""
assert (source_ids, subject_ids, subjectsource_ids).count(None) == 2
params = clean_kwargs(
addl_kwargs,
since=since,
until=until,
filter=filter,
include_details=include_details,
created_after=created_after,
)
if source_ids:
id_name, ids = "source_id", source_ids
elif subject_ids:
id_name, ids = "subject_id", subject_ids
elif subjectsource_ids:
id_name, ids = "subjectsource_id", subjectsource_ids
observations = []
pbar = tqdm([ids] if isinstance(ids, str) else ids)
for _id in pbar:
params[id_name] = _id
pbar.set_description(f"Downloading Observations for {id_name}={_id}")
dataframe = pd.DataFrame(
self.get_objects_multithreaded(
object="observations/",
threads=self.tcp_limit,
page_size=sub_page_size or self.sub_page_size,
**params,
)
)
dataframe[id_name] = _id
observations.append(dataframe)
observations_df = pd.concat(observations)
if observations_df.empty:
return gpd.GeoDataFrame()
observations_df = clean_time_cols(observations_df)
observations_df["created_at"] = observations_df["created_at"].dt.tz_convert(tz)
observations_df["recorded_at"] = observations_df["recorded_at"].dt.tz_convert(tz)
observations_df.sort_values("recorded_at", inplace=True)
return to_gdf(observations_df)
[docs]
def get_source_observations(
self,
source_ids: str | list[str],
include_source_details: bool = False,
relocations: bool = True,
**kwargs,
) -> Relocations | gpd.GeoDataFrame:
"""
Get observations for each listed source and create a `Relocations` object.
Parameters
----------
source_ids : str or list[str]
List of source UUIDs
include_source_details : bool, optional
Whether to merge source info into dataframe
sub_page_size: int | None
Optionally set a specific sub_page_size for this request, instead of using the client default
kwargs
Additional arguments to pass in the request to EarthRanger. See the docstring of `_get_observations` for
info.
Returns
-------
relocations : ecoscope.Relocations
Observations in `Relocations` format
"""
if isinstance(source_ids, str):
source_ids = [source_ids]
observations = self._get_observations(source_ids=source_ids, **kwargs)
if observations.empty:
return Relocations(gdf=gpd.GeoDataFrame()) if relocations else gpd.GeoDataFrame()
if include_source_details:
observations = observations.merge(
pd.DataFrame(self.get_sources(id=",".join(observations["source"].unique()))).add_prefix("source__"),
left_on="source",
right_on="source__id",
)
if relocations:
return Relocations.from_gdf(
observations,
groupby_col="source",
uuid_col="id",
time_col="recorded_at",
)
else:
return observations
[docs]
def get_subject_observations(
self,
subject_ids: str | list[str] | pd.DataFrame,
include_source_details: bool = False,
include_subject_details: bool = False,
include_subjectsource_details: bool = False,
relocations: bool = True,
sub_page_size: int | None = None,
**kwargs,
) -> Relocations | gpd.GeoDataFrame:
"""
Get observations for each listed subject and create a `Relocations` object.
Parameters
----------
subject_ids : str or list[str] or pd.DataFrame
List of subject UUIDs, or a DataFrame of subjects
include_source_details : bool, optional
Whether to merge source info into dataframe
include_subject_details : bool, optional
Whether to merge subject info into dataframe
include_subjectsource_details : bool, optional
Whether to merge subjectsource info into dataframe
sub_page_size: int | None
Optionally set a specific sub_page_size for this request, instead of using the client default
kwargs
Additional arguments to pass in the request to EarthRanger. See the docstring of `__get_observations` for
info.
Returns
-------
relocations : ecoscope.Relocations
Observations in `Relocations` format
"""
if isinstance(subject_ids, str):
subject_ids = [subject_ids]
elif isinstance(subject_ids, pd.DataFrame):
subject_ids = subject_ids.id.tolist()
elif not isinstance(subject_ids, list):
raise ValueError(f"subject_ids must be either a str or list[str] or pd.DataFrame, not {type(subject_ids)}")
observations = self._get_observations(subject_ids=subject_ids, sub_page_size=sub_page_size, **kwargs)
if observations.empty:
return Relocations(gdf=gpd.GeoDataFrame()) if relocations else gpd.GeoDataFrame()
if include_source_details:
observations = observations.merge(
self.get_sources(id=",".join(observations["source"].unique())).add_prefix("source__"),
left_on="source",
right_on="source__id",
)
if include_subject_details:
if isinstance(subject_ids, pd.DataFrame):
observations = observations.merge(
subject_ids.add_prefix("subject__"),
left_on="subject_id",
right_on="subject__id",
)
else:
observations = observations.merge(
self.get_subjects(
id=",".join(subject_ids), include_inactive=True, sub_page_size=sub_page_size
).add_prefix("subject__"),
left_on="subject_id",
right_on="subject__id",
)
if include_subjectsource_details:
observations = observations.merge(
self.get_subjectsources(
subjects=",".join(observations["subject_id"].unique()), sub_page_size=sub_page_size
).add_prefix("subjectsource__"),
left_on=["subject_id", "source"],
right_on=["subjectsource__subject", "subjectsource__source"],
)
if relocations:
return Relocations.from_gdf(
observations,
groupby_col="subject_id",
uuid_col="id",
time_col="recorded_at",
)
else:
return observations
[docs]
def get_subjectsource_observations(
self,
subjectsource_ids: str | list[str],
include_source_details: bool = False,
relocations: bool = True,
**kwargs,
) -> Relocations | gpd.GeoDataFrame:
"""
Get observations for each listed subjectsource and create a `Relocations` object.
Parameters
----------
subjectsource_ids : str or list[str]
List of subjectsource UUIDs
include_source_details : bool, optional
Whether to merge source info into dataframe
kwargs
Additional arguments to pass in the request to EarthRanger. See the docstring of `__get_observations` for
info.
Returns
-------
relocations : ecoscope.Relocations
Observations in `Relocations` format
"""
if isinstance(subjectsource_ids, str):
subjectsource_ids = [subjectsource_ids]
observations = self._get_observations(subjectsource_ids=subjectsource_ids, **kwargs)
if observations.empty:
return Relocations(gdf=gpd.GeoDataFrame()) if relocations else gpd.GeoDataFrame()
if include_source_details:
observations = observations.merge(
pd.DataFrame(self.get_sources(id=",".join(observations["source"].unique()))).add_prefix("source__"),
left_on="source",
right_on="source__id",
)
if relocations:
return Relocations.from_gdf(
observations,
groupby_col="subjectsource_id",
uuid_col="id",
time_col="recorded_at",
)
else:
return observations
[docs]
def get_subjectgroup_observations(
self,
subject_group_id: str | None = None,
subject_group_name: str | None = None,
include_inactive: bool = True,
sub_page_size: int | None = None,
**kwargs,
) -> Relocations | gpd.GeoDataFrame:
"""
Parameters
----------
subject_group_id : str
UUID of subject group to filter by
subject_group_name : str
Common name of subject group to filter by
include_inactive : bool, optional
Whether to get observations for Subjects marked inactive by EarthRanger
sub_page_size: int | None
Optionally set a specific sub_page_size for this request, instead of using the client default
kwargs
Additional arguments to pass in the request to `get_subject_observations`. See the docstring of
`get_subject_observations` for info.
Returns
-------
relocations : ecoscope.Relocations
Observations in `Relocations` format
"""
assert (subject_group_id is None) != (subject_group_name is None)
if subject_group_id:
subjects = self.get_subjects(
subject_group_id=subject_group_id, include_inactive=include_inactive, sub_page_size=sub_page_size
)
else:
subjects = self.get_subjects(
subject_group_name=subject_group_name, include_inactive=include_inactive, sub_page_size=sub_page_size
)
if subjects.empty:
return subjects
return self.get_subject_observations(subjects, sub_page_size=sub_page_size, **kwargs)
[docs]
def get_event_types(
self, include_inactive: bool = False, api_version: ApiVersionSelection = "both", **addl_kwargs
) -> pd.DataFrame:
"""
Return dataframe of ER event types
Parameters
----------
include_inactive: bool, default False
Whether to include inactive event types
api_version: "v1","v2", or "both", default "both"
Whether to fetch v1 or v2 event types, or both
Returns
-------
pd.DataFrame
"""
params = clean_kwargs(addl_kwargs, include_inactive=include_inactive)
results = []
if api_version == "v1" or api_version == "both":
results.append(pd.DataFrame(self._get("activity/events/eventtypes", params=params)))
if api_version == "v2" or api_version == "both":
with self._use_v2_api():
results.append(pd.DataFrame(self._get("activity/eventtypes", params=params)))
return pd.concat(results)
[docs]
def get_choices_from_v2_event_type(self, event_type: str, choice_field: str) -> dict[str, str]:
"""
Retrieve choice options for a specific field from a v2 event type schema.
Parameters
----------
event_type : str
The event type identifier.
choice_field : str
The name of the choice field to extract options from.
Returns
-------
dict[str, str]
A dictionary mapping choice values (const) to their display titles.
"""
choices: dict[str, str] = {}
with self._use_v2_api():
schema = self._get(f"activity/eventtypes/{event_type}/schema")
for prop_name, prop in schema.get("json", {}).get("properties", {}).items():
if prop_name == choice_field:
for definition in prop.get("anyOf", {}):
try:
schema = self._get(definition.get("$ref"), params={"s_format": "oneOf"})
choices |= {choice.get("const"): choice.get("title") for choice in schema.get("oneOf")}
except:
continue
return choices
[docs]
def get_fields_from_event_type_schema(self, event_type: str) -> dict[str, str]:
"""
Retrieve all fields from an event type schema.
Attempts to fetch from the v2 API first, falling back to the v1 API
if the event type is not found in v2.
Parameters
----------
event_type : str
The event type identifier.
Returns
-------
dict[str, str]
A dictionary mapping field names to their display titles.
"""
fields: dict[str, str] = {}
try:
with self._use_v2_api():
schema = self._get(f"activity/eventtypes/{event_type}/schema")
fields = {
prop_name: prop.get("title", prop_name)
for prop_name, prop in schema.get("json", {}).get("properties", {}).items()
}
except ERClientNotFound:
schema = self._get(f"activity/events/schema/eventtype/{event_type}")
fields = {
prop_name: prop.get("title", prop_name)
for prop_name, prop in schema.get("schema", {}).get("properties", {}).items()
}
return fields
[docs]
def get_events(
self,
is_collection: bool | None = None,
updated_size: str | None = None,
event_ids: list[str] | None = None,
bbox: BoundingBox | None = None,
sort_by: EventSortOptions | None = None,
patrol_segment: str | None = None,
state: list[StatusOptions] | None = None,
event_type: list[str] | None = None,
include_updates: bool = False,
include_details: bool = False,
include_notes: bool = False,
include_related_events: bool = False,
include_files: bool = False,
max_results: int | None = None,
oldest_update_date: str | None = None,
exclude_contained: bool | None = None,
updated_since: str | None = None,
event_category: str | None = None,
since: str | None = None,
until: str | None = None,
force_point_geometry: bool = True,
drop_null_geometry: bool = True,
sub_page_size: int | None = None,
**addl_kwargs,
) -> gpd.GeoDataFrame:
"""
Parameters
----------
is_collection
true/false whether to filter on is_collection
updated_since
date-string to limit on updated_at
event_ids : array[string]
Event IDs, comma-separated
bbox
bounding box including four coordinate values, comma-separated. Ex. bbox=-122.4,48.4,-122.95,49.0
(west, south, east, north).
sort_by
Sort by (use 'event_time', 'updated_at', 'created_at', 'serial_number') with optional minus ('-') prefix to
reverse order.
patrol_segment
ID of patrol segment to filter on
state
Comma-separated list of 'scheduled'/'active'/'overdue'/'done'/'cancelled'
event_type
Comma-separated list of event type uuids
include_updates
Boolean value
include_details
Boolean value
include_notes
Boolean value
include_related_events
Boolean value
include_files
Boolean value
max_results
oldest_update_date
exclude_contained
event_category
since
until
force_point_geometry: bool, default True
If true, non point geometry (ie polys) will be converted to a single point via Shape.centroid
drop_null_geometry: bool, default True
If true, events with no geometry will be removed from output
sub_page_size: int | None
Optionally set a specific sub_page_size for this request, instead of using the client default
Returns
-------
events : gpd.GeoDataFrame
GeoDataFrame of queried events
"""
if event_type is None:
event_type = []
params = clean_kwargs(
addl_kwargs,
is_collection=is_collection,
updated_size=updated_size,
event_ids=event_ids,
bbox=bbox,
sort_by=sort_by,
patrol_segment=patrol_segment,
state=state,
event_type=event_type,
include_updates=include_updates,
include_details=include_details,
include_notes=include_notes,
include_related_events=include_related_events,
include_files=include_files,
max_results=max_results,
oldest_update_date=oldest_update_date,
exclude_contained=exclude_contained,
updated_since=updated_since,
event_category=event_category,
)
filter: dict[str, Any] = {"date_range": {}}
if since is not None:
filter["date_range"]["lower"] = since
params["filter"] = json.dumps(filter)
if until is not None:
filter["date_range"]["upper"] = until
params["filter"] = json.dumps(filter)
if len(event_type) > SAFE_QUERY_PARAM_LIST_SIZE:
event_type_chunks = [
event_type[i : i + SAFE_QUERY_PARAM_LIST_SIZE]
for i in range(0, len(event_type), SAFE_QUERY_PARAM_LIST_SIZE)
]
params.pop("event_type")
df = pd.concat(
[
pd.DataFrame(
self.get_objects_multithreaded(
object="activity/events/",
threads=self.tcp_limit,
page_size=sub_page_size or self.sub_page_size,
event_type=chunk,
**params,
)
)
for chunk in event_type_chunks
]
)
else:
df = pd.DataFrame(
self.get_objects_multithreaded(
object="activity/events/",
threads=self.tcp_limit,
page_size=sub_page_size or self.sub_page_size,
**params,
)
)
if not df.empty:
df = clean_time_cols(df)
df = geometry_from_event_geojson(
df, force_point_geometry=force_point_geometry, drop_null_geometry=drop_null_geometry
)
gdf = gpd.GeoDataFrame(df, geometry="geometry", crs=4326)
gdf.sort_values("time", inplace=True)
gdf.set_index("id", inplace=True)
return gdf
return gpd.GeoDataFrame()
[docs]
def get_event_type_display_names_from_events(
self,
events_gdf: gpd.GeoDataFrame,
append_category_names: AppendCategorySelection = "never",
) -> gpd.GeoDataFrame:
"""
For the provided events_gdf, append an "event_type_display" column
containing the display names for each event type,
optionally including the corresponding event category name
Parameters
----------
events_gdf : gpd.GeoDataFrame
The events dataframe to add display names to
append_category_names : "always","duplicates", or "never", default "never"
Whether to append the event category to the event type display name
If appended, the format value will be: "Event Type (Event Category)"
If set to always, the event category value will be appended in all cases
If set to duplicates, the event category value will be appended
only for event types with overlapping display names
If set to never, no category names will be appended
Event types that are present on events but missing from the ER
event-type registry (e.g. deleted/orphan types still attached to
historical events) fall back to their raw event_type value as the
display, rather than raising KeyError.
Returns
-------
gpd.GeoDataFrame
"""
assert "event_type" in events_gdf.columns
event_types = self.get_event_types(include_inactive=True)
event_type_lookup = dict(zip(event_types["value"], event_types["display"]))
events_gdf["event_type_display"] = (
events_gdf["event_type"].map(event_type_lookup).fillna(events_gdf["event_type"])
)
has_duplicates = len(events_gdf["event_type_display"].unique()) != len(events_gdf["event_type"].unique())
do_append = append_category_names == "always" or (append_category_names == "duplicates" and has_duplicates)
if not do_append:
return events_gdf
event_categories_display_lookup = {
category["value"]: category["display"] for category in self.get_event_categories()
}
# In V1 event types, event category information is bundled as a nested dict
# In V2 event types we only get the event category value at the same "category" key
# So we need to safely handle that difference here
categories_to_lookup = event_types["category"].apply(
lambda category: category["value"] if isinstance(category, dict) else category
)
category_display_values = categories_to_lookup.map(event_categories_display_lookup)
event_type_to_category_display_lookup = dict(zip(event_types["value"], category_display_values))
if append_category_names == "duplicates":
is_duplicate_display = events_gdf.groupby("event_type_display")["event_type"].transform("nunique") > 1
category_display = events_gdf.loc[is_duplicate_display, "event_type"].map(
event_type_to_category_display_lookup
)
events_gdf.loc[is_duplicate_display, "event_type_display"] = (
events_gdf.loc[is_duplicate_display, "event_type_display"] + " (" + category_display + ")"
)
else:
category_display = events_gdf["event_type"].map(event_type_to_category_display_lookup)
events_gdf["event_type_display"] = events_gdf["event_type_display"] + " (" + category_display + ")"
return events_gdf
[docs]
def get_patrol_types(self) -> pd.DataFrame:
df = pd.DataFrame(self._get("activity/patrols/types"))
if not df.empty:
df = df.set_index("id")
return df
[docs]
def get_patrols(
self,
since: str | None = None,
until: str | None = None,
patrol_type: str | list[str] | None = None,
patrol_type_value: str | list[str] | None = None,
status: list[StatusOptions] | None = None,
sub_page_size: int | None = None,
patrols_overlap_daterange: bool = True,
**addl_kwargs,
) -> pd.DataFrame:
"""
Parameters
----------
since:
Lower time range
until:
Upper time range
patrol_type:
A patrol type UUID or a list of UUIDs
patrol_type_value:
A patrol type value or a list of patrol type values
status
'scheduled'/'active'/'overdue'/'done'/'cancelled'
Accept a status string or a list of statuses
sub_page_size: int | None
Optionally set a specific sub_page_size for this request, instead of using the client default
patrols_overlap_daterange: bool, default True
If false, restricts patrols to only those that start within the bounds of the provided time range
Returns
-------
patrols : pd.DataFrame
DataFrame of queried patrols
"""
patrol_type_value_list = [patrol_type_value] if isinstance(patrol_type_value, str) else patrol_type_value
params = clean_kwargs(
addl_kwargs,
status=status,
patrol_type=[patrol_type] if isinstance(patrol_type, str) else patrol_type,
patrol_type_value=patrol_type_value_list,
return_data=True,
)
filter: dict[str, Any] = {"date_range": {}, "patrol_type": []}
if since is not None:
filter["date_range"]["lower"] = format_iso_time(since)
if until is not None:
filter["date_range"]["upper"] = format_iso_time(until)
if patrol_type is not None:
filter["patrol_type"] = params["patrol_type"]
if patrol_type_value_list is not None:
patrol_types = self.get_patrol_types()
matching_rows = patrol_types[patrol_types["value"].isin(patrol_type_value_list)]
missing_values = set(patrol_type_value_list) - set(matching_rows["value"])
if missing_values:
raise ValueError(f"Failed to find IDs for values: {missing_values}")
filter["patrol_type"] = matching_rows.index.tolist()
if filter.get("date_range", False):
filter["patrols_overlap_daterange"] = patrols_overlap_daterange
params["filter"] = json.dumps(filter)
df = pd.DataFrame(
self.get_objects_multithreaded(
object="activity/patrols",
threads=self.tcp_limit,
page_size=sub_page_size or self.sub_page_size,
**params,
)
)
if "serial_number" in df.columns:
df = df.sort_values(by="serial_number").reset_index(drop=True)
df = clean_time_cols(df)
return df
[docs]
def get_patrol_events(
self,
since: str | None = None,
until: str | None = None,
patrol_type: str | list[str] | None = None,
patrol_type_value: str | list[str] | None = None,
event_type: list[str] | None = None,
status: list[StatusOptions] | None = None,
force_point_geometry: bool = True,
drop_null_geometry: bool = True,
sub_page_size: int | None = None,
patrols_overlap_daterange: bool = True,
**addl_kwargs,
) -> gpd.GeoDataFrame | pd.DataFrame:
"""
Parameters
----------
since:
Lower time range
until:
Upper time range
patrol_type:
A patrol type UUID or a list of UUIDs
patrol_type_value:
A patrol type value or a list of patrol type values
status
'scheduled'/'active'/'overdue'/'done'/'cancelled'
Accept a status string or a list of statuses
force_point_geometry: bool, default True
If true, non point geometry (ie polys) will be converted to a single point via Shape.centroid
drop_null_geometry: bool, default True
If true, events with no geometry will be removed from output
sub_page_size: int | None
Optionally set a specific sub_page_size for this request, instead of using the client default
patrols_overlap_daterange: bool, default True
If false, restricts patrols to only those that start within the bounds of the provided time range
Returns
-------
events : pd.DataFrame
DataFrame of queried patrols
"""
patrol_df = self.get_patrols(
since=since,
until=until,
patrol_type=patrol_type,
patrol_type_value=patrol_type_value,
status=status,
sub_page_size=sub_page_size,
patrols_overlap_daterange=patrols_overlap_daterange,
**addl_kwargs,
)
return unpack_events_from_patrols_df(patrol_df, event_type, force_point_geometry, drop_null_geometry)
[docs]
def get_patrol_segments_from_patrol_id(self, patrol_id: str, **addl_kwargs) -> pd.DataFrame:
"""
Download patrols for a given `patrol id`.
Parameters
----------
patrol_id :
Patrol UUID.
kwargs
Additional parameters to pass to `_get`.
Returns
-------
dataframe : Dataframe of patrols.
"""
params = clean_kwargs(addl_kwargs)
object = f"activity/patrols/{patrol_id}/"
df = self._get(object, **params)
df["patrol_segments"][0].pop("updates")
df.pop("updates")
df = clean_time_cols(df)
return pd.DataFrame(dict([(k, pd.Series(v)) for k, v in df.items()]))
[docs]
def get_patrol_segments(self) -> pd.DataFrame:
object = "activity/patrols/segments/"
return pd.DataFrame(
self.get_objects_multithreaded(object=object, threads=self.tcp_limit, page_size=self.sub_page_size)
)
[docs]
def get_patrol_observations_with_patrol_filter(
self,
since: str | None = None,
until: str | None = None,
patrol_type: str | list[str] | None = None,
patrol_type_value: str | list[str] | None = None,
status: list[StatusOptions] | None = None,
include_patrol_details: bool = False,
sub_page_size: int | None = None,
patrols_overlap_daterange: bool = True,
**kwargs,
) -> Relocations | pd.DataFrame:
"""
Download observations for patrols with provided filters.
Parameters
----------
since:
Lower time range
until:
Upper time range
patrol_type:
A patrol type UUID or a list of UUIDs
patrol_type_value:
A patrol type value or a list of patrol type values
status
'scheduled'/'active'/'overdue'/'done'/'cancelled'
Accept a status string or a list of statuses
include_patrol_details : bool, optional
Whether to merge patrol details into dataframe
sub_page_size: int | None
Optionally set a specific sub_page_size for this request, instead of using the client default
patrols_overlap_daterange: bool, default True
If false, restricts patrols to only those that start within the bounds of the provided time range
kwargs
Additional parameters to pass to `get_subject_observations`.
Returns
-------
relocations : ecoscope.Relocations
"""
patrols_df = self.get_patrols(
since=since,
until=until,
patrol_type=patrol_type,
patrol_type_value=patrol_type_value,
status=status,
sub_page_size=sub_page_size,
patrols_overlap_daterange=patrols_overlap_daterange,
**kwargs,
)
return self.get_patrol_observations(
patrols_df, include_patrol_details=include_patrol_details, sub_page_size=sub_page_size, **kwargs
)
[docs]
def get_patrol_observations(
self, patrols_df: pd.DataFrame, include_patrol_details: bool = False, sub_page_size: int | None = None, **kwargs
) -> Relocations | pd.DataFrame:
"""
Download observations for provided `patrols_df`.
Parameters
----------
patrols_df : pd.DataFrame
Data returned from a call to `get_patrols`.
include_patrol_details : bool, optional
Whether to merge patrol details into dataframe
sub_page_size: int | None
Optionally set a specific sub_page_size for this request, instead of using the client default
kwargs
Additional parameters to pass to `get_subject_observations`.
Returns
-------
relocations : ecoscope.Relocations
"""
observations = []
df_pt = self.get_patrol_types()
for _, patrol in patrols_df.iterrows():
for patrol_segment in patrol["patrol_segments"]:
subject_id = (patrol_segment.get("leader") or {}).get("id")
subject_name = (patrol_segment.get("leader") or {}).get("name")
patrol_start_time = (patrol_segment.get("time_range") or {}).get("start_time")
patrol_end_time = (patrol_segment.get("time_range") or {}).get("end_time")
patrol_type = (
df_pt[df_pt["value"] == patrol_segment.get("patrol_type")].reset_index()["id"][0]
if patrol_segment.get("patrol_type")
else None
)
if None in {subject_id, patrol_start_time}:
continue
try:
observation = self.get_subject_observations(
subject_ids=[subject_id], # type: ignore[list-item]
since=patrol_start_time,
until=patrol_end_time,
sub_page_size=sub_page_size,
**kwargs,
)
if len(observation.gdf) > 0:
observation.gdf["groupby_col"] = patrol["id"]
if include_patrol_details:
observation.gdf["patrol_id"] = patrol["id"]
observation.gdf["patrol_title"] = patrol["title"]
observation.gdf["patrol_serial_number"] = patrol["serial_number"]
observation.gdf["patrol_start_time"] = patrol_start_time
observation.gdf["patrol_end_time"] = patrol_end_time
observation.gdf["patrol_type"] = patrol_type
observation.gdf["patrol_status"] = patrol["state"]
observation.gdf["patrol_subject"] = subject_name
observation.gdf = (
observation.gdf.reset_index()
.merge(
pd.DataFrame(df_pt).add_prefix("patrol_type__"),
left_on="patrol_type",
right_on="id",
)
.drop(
columns=[
"patrol_type__ordernum",
"patrol_type__icon_id",
"patrol_type__default_priority",
"patrol_type__is_active",
]
)
)
observations.append(observation)
except Exception as e:
print(
f"Getting observations for subject_id={subject_id} start_time={patrol_start_time}"
f"end_time={patrol_end_time} failed for: {e}"
)
if not observations:
return pd.DataFrame()
gdf = pd.concat(observation.gdf for observation in observations)
gdf = clean_time_cols(gdf)
if include_patrol_details:
gdf = gdf.set_index("id")
relocs = Relocations(gdf=gdf)
return relocs
[docs]
def get_patrol_segment_events(
self,
patrol_segment_id: str | None = None,
include_details: bool = False,
include_files: bool = False,
include_related_events: bool = False,
include_notes: bool = False,
**addl_kwargs,
) -> pd.DataFrame:
params = clean_kwargs(
addl_kwargs,
patrol_segment_id=patrol_segment_id,
include_details=include_details,
include_files=include_files,
include_related_events=include_related_events,
include_notes=include_notes,
)
object = f"activity/patrols/segments/{patrol_segment_id}/events/"
df = pd.DataFrame(
self.get_objects_multithreaded(
object=object,
threads=self.tcp_limit,
page_size=self.sub_page_size,
**params,
)
)
df = clean_time_cols(df)
return df
[docs]
def get_spatial_features_group(
self,
spatial_features_group_name: str | None = None,
spatial_features_group_id: str | None = None,
with_group_data: bool = False,
**addl_kwargs,
) -> gpd.GeoDataFrame | dict[str, str | gpd.GeoDataFrame]:
"""
Download spatial features in a spatial features group for a given `spatial features group id`.
Parameters
----------
spatial_features_group_id :
Spatial Features Group UUID.
with_group_data :
Whether or not to return group data
kwargs
Additional parameters to pass to `_get`.
Returns
-------
If with_group_data is False this will return a GDF of the features within the requested group
If with_group_data is True this will return a dict containing the group data and a "features"
field containing a GDF of the features within the requested group
"""
assert (spatial_features_group_name is None) != (
spatial_features_group_id is None
), "Either spatial_features_group_name or spatial_features_group_id must be provided, not both."
if spatial_features_group_id is None and spatial_features_group_name is not None:
spatial_feature_groups = self._get("spatialfeaturegroup")
for sfg in spatial_feature_groups:
if spatial_features_group_name == sfg.get("name", None):
spatial_features_group_id = sfg.get("id")
if spatial_features_group_id is None:
raise ERClientNotFound(f"No Spatial Feature Group with name {spatial_features_group_name} found")
params = clean_kwargs(addl_kwargs, spatial_features_group_id=spatial_features_group_id)
object = f"spatialfeaturegroup/{spatial_features_group_id}/"
spatial_features_group = self._get(object, **params)
spatial_features = []
for fc in spatial_features_group["features"]:
crs = fc.get("crs", {}).get("properties", {}).get("name", None)
if crs is None:
raise ValueError(
f"CRS information missing for spatial feature group {spatial_features_group.get('id')}"
)
feature = gpd.GeoDataFrame.from_features(fc, crs=crs)
feature = feature.to_crs(4326)
spatial_features.append(feature)
features_gdf = pd.concat(spatial_features)
return {**spatial_features_group, "features": features_gdf} if with_group_data else features_gdf
[docs]
def get_spatial_feature(self, spatial_feature_id: str | None = None, **addl_kwargs) -> gpd.GeoDataFrame:
"""
Download spatial feature for a given `spatial feature id`.
Parameters
----------
spatial_feature_id :
Spatial Feature UUID.
kwargs
Additional parameters to pass to `_get`.
Returns
-------
dataframe : GeoDataFrame of spatial feature.
"""
params = clean_kwargs(addl_kwargs, spatial_feature_id=spatial_feature_id)
object = f"spatialfeature/{spatial_feature_id}/"
spatial_feature = self._get(object, **params)
return gpd.GeoDataFrame.from_features(spatial_feature["features"])
"""
POST Functions
"""
[docs]
def post_source(
self,
source_type: str,
manufacturer_id: str,
model_name: str,
provider: str = "default",
additional: typing.Dict | None = None,
**kwargs,
) -> pd.DataFrame:
"""
Parameters
----------
source_type
manufacturer_id
model_name
provider
additional
Returns
-------
pd.DataFrame
"""
payload = {
"source_type": source_type,
"manufacturer_id": manufacturer_id,
"model_name": model_name,
"additional": additional if additional else {},
"provider": provider,
}
if kwargs:
payload.update(kwargs)
response = self._post("sources", payload=payload)
return pd.DataFrame([response])
[docs]
def post_sourceproviders(
self,
provider_key: str,
display_name: str,
additional: typing.Dict | None = None,
**kwargs,
) -> pd.DataFrame:
"""
Parameters
----------
provider_key
display_name
Returns
-------
pd.DataFrame
"""
payload = {
"provider_key": provider_key,
"display_name": display_name,
"additional": additional if additional else {},
}
if kwargs:
payload.update(kwargs)
response = self._post("sourceproviders", payload=payload)
return pd.DataFrame([response])
[docs]
def post_subject(
self,
subject_name: str,
subject_type: str,
subject_subtype: str,
is_active: bool = True,
**kwargs,
) -> pd.DataFrame:
"""
Parameters
----------
subject_name
subject_subtype
is_active
Returns
-------
pd.DataFrame
"""
payload = {
"name": subject_name,
"subject_subtype": subject_subtype,
"is_active": is_active,
}
if kwargs:
payload.update(kwargs)
response = self._post("subjects", payload=payload)
return pd.DataFrame([response])
[docs]
def post_subjectsource(
self,
subject_id: str,
source_id: str,
lower_bound_assigned_range: datetime.datetime,
upper_bound_assigned_range: datetime.datetime,
additional: typing.Dict | None = None,
) -> pd.DataFrame:
"""
Parameters
----------
subject_id
source_id
lower_bound_assigned_range
upper_bound_assigned_range
additional
Returns
-------
pd.DataFrame
"""
if additional is None:
additional = {}
payload = {
"source": source_id,
"assigned_range": {
"lower": lower_bound_assigned_range,
"upper": upper_bound_assigned_range,
},
"additional": additional,
}
urlpath = f"subject/{subject_id}/sources"
response = self._post(urlpath, payload=payload)
return pd.DataFrame([response])
[docs]
def post_observations(
self,
observations: gpd.GeoDataFrame,
source_id_col: str = "source",
recorded_at_col: str = "recorded_at",
) -> pd.DataFrame:
"""
Parameters
----------
observations : gpd.GeoDataFrame
observation data to be uploaded
source_id_col : str
The source column in the observation dataframe
recorded_at_col : str
The observation recorded time column in the dataframe
Returns
-------
None
"""
def upload(obs):
try:
obs = obs.rename(columns={source_id_col: "source", recorded_at_col: "recorded_at"})
if "location" not in obs.columns:
obs["location"] = pd.DataFrame({"longitude": obs.geometry.x, "latitude": obs.geometry.y}).to_dict(
"records"
)
if "geometry" in obs.columns:
del obs["geometry"]
obs = pack_columns(
obs,
columns=[
"source",
"recorded_at",
"location",
"exclusion_flags",
"additional",
"device_status_properties",
],
)
post_data = obs.to_dict("records")
results = super(EarthRangerIO, self).post_observation(post_data)
except ERClientException as exc:
self.logger.error(exc)
except requests.exceptions.RequestException as exc:
self.logger.error(exc)
else:
return pd.DataFrame(results)
return observations.groupby(source_id_col, group_keys=False).apply(upload)
[docs]
def post_event(
self,
events: typing.Union[gpd.GeoDataFrame, pd.DataFrame, typing.Dict, list[typing.Dict]],
) -> pd.DataFrame:
"""
Parameters
----------
events
Returns
-------
pd.DataFrame:
New events created in EarthRanger.
"""
events = dataframe_to_dict_or_list(events)
results = super().post_event(event=events)
results = results if isinstance(results, list) else [results]
return pd.DataFrame(results)
[docs]
def post_patrol(self, priority: int, **kwargs) -> pd.DataFrame:
"""
Parameters
----------
priority
Returns
-------
pd.DataFrame
"""
payload = {"priority": priority}
if kwargs:
payload.update(kwargs)
response = self._post("activity/patrols", payload=payload)
return pd.DataFrame([response])
[docs]
def post_patrol_segment(
self,
patrol_id: str,
patrol_segment_id: str,
patrol_type: str | None = None,
tracked_subject_id: str | None = None,
scheduled_start: str | None = None,
scheduled_end: str | None = None,
start_time: str | None = None,
end_time: str | None = None,
start_location: typing.Tuple[float, float] | None = None,
end_location: typing.Tuple[float, float] | None = None,
**kwargs,
) -> pd.DataFrame:
"""
Parameters
----------
patrol_id
patrol_segment_id
patrol_type
tracked_subject_id
scheduled_start
scheduled_end
start_time
end_time
start_location
end_location
Returns
-------
pd.DataFrame
"""
payload: dict[str, str | None | dict[str, str | float | None]] = {
"patrol": patrol_id,
"patrol_segment": patrol_segment_id,
"scheduled_start": scheduled_start,
"scheduled_end": scheduled_end,
"time_range": {"start_time": start_time, "end_time": end_time},
}
if tracked_subject_id is not None:
payload.update(
{
"leader": {
"content_type": "observations.subject",
"id": tracked_subject_id,
}
}
)
else:
payload.update({"leader": None})
if start_location is not None:
payload.update(
{
"start_location": {
"latitude": start_location[0],
"longitude": start_location[1],
}
}
)
else:
payload.update({"start_location": None})
if end_location is not None:
payload.update(
{
"end_location": {
"latitude": end_location[0],
"longitude": end_location[1],
}
}
)
else:
payload.update({"end_location": None})
if patrol_type is not None:
payload.update({"patrol_type": patrol_type})
if kwargs:
payload.update(kwargs)
response = self._post("activity/patrols/segments/", payload=payload)
return pd.DataFrame([response])
[docs]
def post_patrol_segment_event(
self,
patrol_segment_id: str,
event_type: str,
**addl_kwargs,
) -> pd.DataFrame:
"""
Parameters
----------
patrol_segment_id
event_type
Returns
-------
pd.DataFrame
"""
payload = {
"patrol_segment": patrol_segment_id,
"event_type": event_type,
}
if addl_kwargs:
payload.update(addl_kwargs)
response = self._post(f"activity/patrols/segments/{patrol_segment_id}/events/", payload=payload)
return pd.DataFrame([response])
"""
PATCH Functions
"""
[docs]
def patch_event(
self,
event_id: str,
events: typing.Union[gpd.GeoDataFrame, pd.DataFrame, typing.Dict, list[typing.Dict]],
) -> pd.DataFrame:
"""
Parameters
----------
event_id
UUID for the event that will be updated.
events
Returns
-------
pd.DataFrame:
Updated events in EarthRanger.
"""
events_payload = dataframe_to_dict_or_list(events)
if isinstance(events_payload, list):
results = [self._patch(f"activity/event/{event_id}", payload=event) for event in events_payload]
else:
results = [self._patch(f"activity/event/{event_id}", payload=events_payload)]
return pd.DataFrame(results)
"""
DELETE Functions
"""
[docs]
def delete_observation(self, observation_id: str):
"""
Parameters
----------
observation_id
-------
"""
self._delete("observation/" + observation_id + "/")