import json
import logging
import uuid
import geopandas as gpd # type: ignore[import-untyped]
import pandas as pd
import requests
import shapely
from ecoscope.io.utils import clean_time_cols
from ecoscope.relocations import Relocations
logger = logging.getLogger(__name__)
[docs]
class SmartIO:
def __init__(self, **kwargs):
self._urlBase = kwargs.get("urlBase")
self._username = kwargs.get("username")
self._password = kwargs.get("password")
self._session = requests.Session()
self._token = kwargs.get("token")
self._verify_ssl = kwargs.get("verify_ssl")
self.login()
[docs]
def login(self) -> None:
login_data = {
"username": self._username,
"password": self._password,
}
if not self._token:
self._session = requests.Session()
response = self._session.post(f"{self._urlBase}token", data=login_data, verify=self._verify_ssl)
response.raise_for_status()
self._token = response.json()["access_token"]
return
[docs]
def query_data(self, url: str, params: dict | None = None) -> pd.DataFrame:
headers = {
"Authorization": f"Bearer {self._token}",
}
if params is None:
params = {}
session = requests.Session()
r = session.get(
f"{self._urlBase}{url}",
verify=self._verify_ssl,
params=params,
headers=headers,
)
r.raise_for_status()
return pd.DataFrame(r.json())
[docs]
def query_geojson_data(self, url: str, params: dict | None = None) -> gpd.GeoDataFrame | None:
headers = {
"Authorization": f"Bearer {self._token}",
}
if params is None:
params = {}
session = requests.Session()
r = session.get(
f"{self._urlBase}{url}",
verify=self._verify_ssl,
params=params,
headers=headers,
)
r.raise_for_status()
return gpd.GeoDataFrame.from_features(r.json(), crs=4326)
[docs]
def get_patrols_list(
self,
ca_uuid: str,
language_uuid: str,
start: str,
end: str,
patrol_mandate: str | None,
patrol_transport: str | None,
station_uuid: str | None = None,
) -> gpd.GeoDataFrame | None:
params: dict[str, str | None] = {}
params["ca_uuid"] = ca_uuid
params["language_uuid"] = language_uuid
params["start_date"] = start
params["end_date"] = end
params["patrol_mandate"] = patrol_mandate
params["patrol_transport"] = patrol_transport
params["station_uuid"] = station_uuid
return self.query_geojson_data(url="patrol/", params=params)
[docs]
def process_patrols_gdf(self, df: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Process multiple geometries in a vectorized way.
Args:
df: Input DataFrame with geometry column containing MULTILINESTRING Z data
Returns:
gpd.GeoDataFrame: Processed GeoDataFrame with expanded coordinate data
"""
if df.empty:
return gpd.GeoDataFrame()
all_coords = []
for index, row in df.iterrows():
geometry = row["geometry"]
if geometry is None:
continue
longitudes, latitudes, timestamps = self.extract_coordinates(geometry)
if not longitudes:
logger.error(f"Warning: No valid coordinates found in geometry at index {index}")
continue
times = pd.to_datetime(timestamps, unit="ms", utc=True)
coords_data = pd.DataFrame({"longitude": longitudes, "latitude": latitudes, "fixtime": times})
static_data = {col: row[col] for col in df.columns}
for col, value in static_data.items():
coords_data[col] = value
all_coords.append(coords_data)
if not all_coords:
return gpd.GeoDataFrame()
result = pd.concat(all_coords, ignore_index=True)
result_df = gpd.GeoDataFrame(
result,
geometry=gpd.points_from_xy(x=result["longitude"], y=result["latitude"]),
crs="EPSG:4326",
)
result_df = result_df.rename(
columns={
"uuid": "patrol_id",
"patrol_leg_day_start": "patrol_start_time",
"patrol_leg_day_end": "patrol_end_time",
"id": "groupby_col",
}
)
result_df["patrol_type__display"] = result_df["patrol_mandate"]
return result_df
[docs]
def get_patrol_observations(
self,
ca_uuid: str,
language_uuid: str,
start: str,
end: str,
patrol_mandate: str | None = None,
patrol_transport: str | None = None,
station_uuid: str | None = None,
) -> Relocations | None:
start_dt = pd.to_datetime(start)
end_dt = pd.to_datetime(end)
patrols = self.get_patrols_list(
ca_uuid=ca_uuid,
language_uuid=language_uuid,
# SMART API throws error if the start/end time is not at 00:00:00
start=pd.Timestamp(start_dt.date()).isoformat(),
end=pd.Timestamp(end_dt.date()).isoformat(),
patrol_mandate=patrol_mandate,
patrol_transport=patrol_transport,
station_uuid=station_uuid,
)
try:
patrols_df = self.process_patrols_gdf(patrols)
if patrols_df.empty:
return None
patrols_relocs = Relocations.from_gdf(
patrols_df,
groupby_col="groupby_col",
uuid_col="patrol_id",
time_col="fixtime",
)
patrols_relocs.gdf.reset_index()
patrols_relocs.gdf.columns = [col.replace("extra__", "") for col in patrols_relocs.gdf.columns]
patrols_relocs.gdf = patrols_relocs.gdf.assign(
id=[str(uuid.uuid4()) for _ in range(len(patrols_relocs.gdf))]
).set_index("id")
patrols_relocs.gdf = clean_time_cols(patrols_relocs.gdf)
return patrols_relocs
except Exception as e:
logger.error(f"Error processing patrol data: {e}")
return None
[docs]
def get_events(
self,
ca_uuid: str,
language_uuid: str,
start: str,
end: str,
) -> gpd.GeoDataFrame:
params = {}
params["ca_uuid"] = ca_uuid
params["language_uuid"] = language_uuid
params["start_date"] = start
params["end_date"] = end
events_df = self.query_geojson_data(url="observation/", params=params)
events_df = gpd.GeoDataFrame(
events_df,
geometry=gpd.points_from_xy(x=events_df["X"], y=events_df["Y"]), # type: ignore[index]
crs="EPSG:4326",
)
if "datetime" in events_df.columns:
events_df = events_df.rename(columns={"datetime": "time"})
else:
logger.warning('"datetime" column does not exist in events_df')
if "category" in events_df.columns:
events_df = events_df.rename(columns={"category": "event_type"})
else:
logger.warning('"category" column does not exist in events_df')
events_df["extracted_attributes"] = events_df["attributes"].apply(self.extract_event_attributes)
events_df = clean_time_cols(events_df)
return events_df