Skip to content

IO Tasks

ecoscope.platform.tasks.io

Attributes

SpatialFeaturesConfig module-attribute

Classes

EarthRangerSpatialFeatures

Bases: BaseModel

Attributes
data_source instance-attribute
data_source: Annotated[str, Field(title='Data Source', description='Select one of your configured EarthRanger data sources.')]
model_config class-attribute instance-attribute
model_config = ConfigDict(json_schema_extra={'title': 'EarthRanger'})
source class-attribute instance-attribute
source: Annotated[Literal['earthranger'], Field(exclude=True)] = 'earthranger'
spatial_features_group_name instance-attribute
spatial_features_group_name: Annotated[str, Field(description='Name of the spatial features group in EarthRanger')]

LocalFileSpatialFeatures

Bases: BaseModel

Attributes
file_path instance-attribute
file_path: Annotated[str, Field(description='Path to geoparquet (.parquet) or geopackage (.gpkg) file')]
layer class-attribute instance-attribute
layer: Annotated[str | SkipJsonSchema[None], AdvancedField(default=None, description='Layer name (only applicable to geopackage files)')] = None
model_config class-attribute instance-attribute
model_config = ConfigDict(json_schema_extra={'title': 'Local File'})
name_column instance-attribute
name_column: Annotated[str, Field(description='Column to use as region name')]
source class-attribute instance-attribute
source: Annotated[Literal['local_file'], Field(exclude=True)] = 'local_file'
Methods:
validate_file_extension
validate_file_extension() -> LocalFileSpatialFeatures
Source code in ecoscope/platform/tasks/io/_spatial_features.py
@model_validator(mode="after")
def validate_file_extension(self) -> "LocalFileSpatialFeatures":
    path = self.file_path.lower()
    if not path.endswith(VALID_EXTENSIONS):
        raise ValueError(
            f"File must be geoparquet {GEOPARQUET_EXTENSIONS} or "
            "geopackage {GEOPACKAGE_EXTENSIONS}. Got: {self.file_path}"
        )
    if self.layer is not None and not path.endswith(GEOPACKAGE_EXTENSIONS):
        raise ValueError("Layer can only be specified for geopackage (.gpkg) files")
    return self

RemoteFileSpatialFeatures

Bases: BaseModel

Attributes
layer class-attribute instance-attribute
layer: Annotated[str | SkipJsonSchema[None], AdvancedField(default=None, description='Layer name (only applicable for geopackage files)')] = None
model_config class-attribute instance-attribute
model_config = ConfigDict(json_schema_extra={'title': 'Remote URL'})
name_column instance-attribute
name_column: Annotated[str, Field(description='Column to use as region name')]
source class-attribute instance-attribute
source: Annotated[Literal['remote_file'], Field(exclude=True)] = 'remote_file'
url instance-attribute
url: Annotated[str, Field(description='URL to geoparquet (.parquet) or geopackage (.gpkg) file')]
Methods:
validate_url_extension
validate_url_extension() -> RemoteFileSpatialFeatures
Source code in ecoscope/platform/tasks/io/_spatial_features.py
@model_validator(mode="after")
def validate_url_extension(self) -> "RemoteFileSpatialFeatures":
    path = self.url.split("?")[0].lower()
    if not path.endswith(VALID_EXTENSIONS):
        raise ValueError(
            f"URL must point to geoparquet {GEOPARQUET_EXTENSIONS} or "
            "geopackage {GEOPACKAGE_EXTENSIONS}. Got: {self.url}"
        )
    if self.layer is not None and not path.endswith(GEOPACKAGE_EXTENSIONS):
        raise ValueError("Layer can only be specified for geopackage (.gpkg) files")
    return self

Functions:

calculate_ndvi_range

calculate_ndvi_range(client: EarthEngineClient, roi: AnyGeoDataFrame, time_range: Annotated[TimeRange, Field(description='Current time range for trend analysis')], image_size: Annotated[int, Field(description='Number of historical satellite images to fetch. Set to a large value to retrieve all available historical data. Only used when baseline_time_range is not provided.')] = 1000000000, baseline_time_range: Annotated[Optional[BaselineTimeRange], Field(description='Explicit historical baseline and current time range. When provided, overrides time_range and image_size.')] = None, ndvi_method: Annotated[NDVIMethod, Field(description="Method to obtain NDVI values. 'MODIS MYD13A1 16-Day Composite': Uses pre-calculated NDVI from 16-day composites. Provides quality-filtered 'best pixel' values at 500m resolution with ~0.025 accuracy. Better for phenology studies but may saturate in dense canopies. 'MODIS MCD43A4 Daily NBAR': Uses daily nadir BRDF-adjusted reflectance. Computes NDVI from NIR/Red bands with view-angle correction for consistent measurements. Higher temporal resolution but more susceptible to cloud gaps.")] = 'MODIS MYD13A1 16-Day Composite', grouping_unit: Annotated[GroupingUnit, Field(description="Temporal unit for grouping historical data when calculating statistics. 'month': Compare against same calendar month (1-12). 'week': Compare against same ISO week number (1-53). 'day_of_year': Compare against same day of year (1-366). 'modis_16_day': Compare against same MODIS 16-day composite period (0-22).")] = 'month') -> AnyDataFrame

Calculate NDVI values over an ROI using Earth Engine Image Collections.

Compares current NDVI values against historical statistics (min, max, mean) grouped by a configurable temporal unit to identify trends and anomalies.

Supports two modes
  • Mode A (default): Uses time_range + image_size to fetch N historical images before the current period. Deprecated; will be removed in a future release.
  • Mode B: Uses baseline_time_range with explicit historical_start, current_start, current_end for precise control over both periods.

Functions:

Name Description
- MODIS MYD13A1 16-Day Composite

Uses pre-calculated NDVI band

- MODIS MCD43A4 Daily NBAR

Computes NDVI from (NIR - Red) / (NIR + Red)

Grouping
  • month: Compare against same calendar month across historical years
  • week: Compare against same ISO week number across historical years
  • day_of_year: Compare against same day of year across historical years
  • modis_16_day: Compare against same MODIS 16-day composite period (0-22) across historical years
Source code in ecoscope/platform/tasks/io/_earthengine.py
@register(tags=["io"])
def calculate_ndvi_range(
    client: EarthEngineClient,
    roi: AnyGeoDataFrame,
    time_range: Annotated[TimeRange, Field(description="Current time range for trend analysis")],
    image_size: Annotated[
        int,
        Field(
            description="Number of historical satellite images to fetch. "
            "Set to a large value to retrieve all available historical data. "
            "Only used when baseline_time_range is not provided."
        ),
    ] = 1_000_000_000,
    baseline_time_range: Annotated[
        Optional[BaselineTimeRange],
        Field(
            description="Explicit historical baseline and current time range. "
            "When provided, overrides time_range and image_size."
        ),
    ] = None,
    ndvi_method: Annotated[
        NDVIMethod,
        Field(
            description="Method to obtain NDVI values. "
            "'MODIS MYD13A1 16-Day Composite': Uses pre-calculated NDVI from 16-day composites. "
            "Provides quality-filtered 'best pixel' values at 500m resolution with ~0.025 accuracy. "
            "Better for phenology studies but may saturate in dense canopies. "
            "'MODIS MCD43A4 Daily NBAR': Uses daily nadir BRDF-adjusted reflectance. "
            "Computes NDVI from NIR/Red bands with view-angle correction for consistent measurements. "
            "Higher temporal resolution but more susceptible to cloud gaps."
        ),
    ] = "MODIS MYD13A1 16-Day Composite",
    grouping_unit: Annotated[
        GroupingUnit,
        Field(
            description="Temporal unit for grouping historical data when calculating statistics. "
            "'month': Compare against same calendar month (1-12). "
            "'week': Compare against same ISO week number (1-53). "
            "'day_of_year': Compare against same day of year (1-366). "
            "'modis_16_day': Compare against same MODIS 16-day composite period (0-22)."
        ),
    ] = "month",
) -> AnyDataFrame:
    """Calculate NDVI values over an ROI using Earth Engine Image Collections.

    Compares current NDVI values against historical statistics (min, max, mean)
    grouped by a configurable temporal unit to identify trends and anomalies.

    Supports two modes:
        - Mode A (default): Uses time_range + image_size to fetch N historical images
          before the current period. Deprecated; will be removed in a future release.
        - Mode B: Uses baseline_time_range with explicit historical_start, current_start,
          current_end for precise control over both periods.

    Methods:
        - MODIS MYD13A1 16-Day Composite: Uses pre-calculated NDVI band
        - MODIS MCD43A4 Daily NBAR: Computes NDVI from (NIR - Red) / (NIR + Red)

    Grouping:
        - month: Compare against same calendar month across historical years
        - week: Compare against same ISO week number across historical years
        - day_of_year: Compare against same day of year across historical years
        - modis_16_day: Compare against same MODIS 16-day composite period (0-22) across historical years
    """
    import ee
    import pandas as pd

    from ecoscope.io import eetools  # type: ignore[import-untyped]

    if baseline_time_range is not None:
        # Mode B: explicit three-date configuration
        filter_start = baseline_time_range.historical_start.isoformat()
        filter_end = baseline_time_range.current_end.isoformat()
        current_since = baseline_time_range.current_start
        current_until = baseline_time_range.current_end
        historical_since = baseline_time_range.historical_start
        historical_until = baseline_time_range.current_start
        n_images = None  # fetch all images in the filtered range
    else:
        # Mode A: time_range + image_size
        filter_start = _MODIS_DATA_START.isoformat()
        filter_end = time_range.until.isoformat()
        current_since = time_range.since
        current_until = time_range.until
        historical_since = _MODIS_DATA_START
        historical_until = time_range.since
        n_images = image_size

    if ndvi_method == "MODIS MYD13A1 16-Day Composite":
        img_coll = (
            ee.ImageCollection(_MODIS_PRECOMPUTED)
            .filterDate(filter_start, filter_end)
            .select("NDVI")
            .map(
                lambda img: img.multiply(0.0001)
                .set("system:time_start", img.get("system:time_start"))
                .set("id", img.get("id"))
            )
            .sort("system:time_start")
        )
    else:  # calculated
        img_coll = (
            ee.ImageCollection(_MODIS_CALCULATED)
            .filterDate(filter_start, filter_end)
            .select([_NIR_BAND, _RED_BAND])
            .sort("system:time_start")
        )

    # Avoid mutating input - work with a copy
    roi_with_time = roi.copy()
    roi_with_time["tmp_time"] = current_until

    n_before = n_images if n_images is not None else img_coll.size().getInfo()

    ee_data = eetools.label_gdf_with_temporal_image_collection_by_feature(
        gdf=roi_with_time,
        time_col_name="tmp_time",
        n_before=n_before,
        n_after=0,
        img_coll=img_coll,
        region_reducer=ee.Reducer.mean(),
        scale=_ANALYSIS_SCALE,
    )

    ee_data = ee_data.reset_index()
    ee_data["img_date"] = pd.to_datetime(ee_data["img_date"]).dt.tz_localize("UTC")

    if ndvi_method == "MODIS MYD13A1 16-Day Composite":
        assert len(ee_data.columns) == 3, f"Expected 3 columns, got {len(ee_data.columns)}"
        ee_data.columns = ["name", "img_date", "NDVI"]
    else:  # calculated
        ee_data["NDVI"] = (ee_data[_NIR_BAND] - ee_data[_RED_BAND]) / (ee_data[_NIR_BAND] + ee_data[_RED_BAND])
        idx_col = ee_data.columns[0]
        ee_data = ee_data[[idx_col, "img_date", "NDVI"]]
        ee_data.columns = ["name", "img_date", "NDVI"]

    cur_data = ee_data[(ee_data.img_date >= current_since) & (ee_data.img_date <= current_until)]
    historical_data = ee_data[(ee_data.img_date >= historical_since) & (ee_data.img_date < historical_until)]

    def get_grouping_key(dt_series, unit: str):
        """Map a datetime or datetime accessor to a grouping key for the given unit."""
        if unit == "month":
            return dt_series.month
        elif unit == "week":
            return dt_series.isocalendar().week
        elif unit == "modis_16_day":
            return (dt_series.dayofyear - 1) // 16
        else:  # day_of_year
            return dt_series.dayofyear

    def calc_mean_range(row):
        current_key = get_grouping_key(row.img_date, grouping_unit)
        grouped_hist = historical_data[get_grouping_key(historical_data.img_date.dt, grouping_unit) == current_key]
        return pd.Series(
            {
                "min": grouped_hist["NDVI"].min(),
                "max": grouped_hist["NDVI"].max(),
                "mean": grouped_hist["NDVI"].mean(),
                "NDVI": row["NDVI"],
                "img_date": row.img_date,
            }
        )

    if cur_data.empty:
        return cast(
            AnyDataFrame,
            pd.DataFrame(columns=["min", "max", "mean", "NDVI", "img_date"]),
        )

    return cur_data.apply(calc_mean_range, axis=1).reset_index(drop=True)

determine_season_windows

determine_season_windows(client: EarthEngineClient, roi: AnyGeoDataFrame, time_range: Annotated[TimeRange, Field(description='Time range filter')]) -> AnyDataFrame

Determine wet/dry season windows from NDVI values over an ROI.

Computes standardized NDVI values using MODIS MCD43A4 daily NBAR data, then identifies seasonal transition points to classify time windows as "dry" or "wet" seasons.

Source code in ecoscope/platform/tasks/io/_earthengine.py
@register(tags=["io"])
def determine_season_windows(
    client: EarthEngineClient,
    roi: AnyGeoDataFrame,
    time_range: Annotated[TimeRange, Field(description="Time range filter")],
) -> AnyDataFrame:
    """Determine wet/dry season windows from NDVI values over an ROI.

    Computes standardized NDVI values using MODIS MCD43A4 daily NBAR data,
    then identifies seasonal transition points to classify time windows
    as "dry" or "wet" seasons.
    """
    import pandas as pd

    from ecoscope.analysis.seasons import (  # type: ignore[import-untyped]
        seasonal_windows,
        std_ndvi_vals,
        val_cuts,
    )

    # If there's more than one roi, merge them to one
    merged_roi = roi.to_crs(4326).dissolve().iloc[0]["geometry"]  # type: ignore[operator]

    # Determine wet/dry seasons
    date_chunks = (
        pd.date_range(start=time_range.since, end=time_range.until, periods=5, inclusive="both")
        .to_series()
        .apply(lambda x: x.isoformat())
        .values
    )

    ndvi_vals = pd.concat(
        [
            std_ndvi_vals(
                img_coll=_MODIS_CALCULATED,
                nir_band=_NIR_BAND,
                red_band=_RED_BAND,
                aoi=merged_roi,
                start=date_chunks[t - 1],
                end=date_chunks[t],
            )
            for t in range(1, len(date_chunks))
        ]
    )

    # Calculate the seasonal transition point
    cuts = val_cuts(ndvi_vals, 2)

    # Determine the seasonal time windows
    return cast(AnyDataFrame, seasonal_windows(ndvi_vals, cuts, season_labels=["dry", "wet"]))

download_roi

download_roi(url: Annotated[str, Field(description='The path to ROI gpkg file')], roi_column: Annotated[str | None, Field(description='The column name of the ROI name')] = 'name', roi_name: Annotated[str | None, Field(description='The ROI name')] = None, layer_name: Annotated[str | None, Field(description='The layer name')] = None) -> AnyGeoDataFrame

Download ROI from a URL.

Source code in ecoscope/platform/tasks/io/_downloader.py
@register(tags=["io"])
def download_roi(
    url: Annotated[str, Field(description="The path to ROI gpkg file")],
    roi_column: Annotated[str | None, Field(description="The column name of the ROI name")] = "name",
    roi_name: Annotated[str | None, Field(description="The ROI name")] = None,
    layer_name: Annotated[str | None, Field(description="The layer name")] = None,
) -> AnyGeoDataFrame:
    """Download ROI from a URL."""
    import tempfile

    import geopandas as gpd  # type: ignore[import-untyped]

    from ecoscope.io import download_file

    tmp_roi_path = tempfile.NamedTemporaryFile(suffix=".gpkg").name
    download_file(
        url=url,
        path=tmp_roi_path,
        overwrite_existing=True,
    )

    roi = gpd.read_file(tmp_roi_path, layer=layer_name).to_crs(4326)
    if roi_column is not None:
        roi = roi.rename(columns={roi_column: "name"})
        roi.set_index("name", inplace=True)

    if roi_name:
        roi = roi.loc[roi_name]

    return cast(
        AnyGeoDataFrame,
        roi,
    )

get_analysis_field_from_event_details

get_analysis_field_from_event_details(combined_params: CombinedEventsAndDetailsParams) -> str
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register()
def get_analysis_field_from_event_details(
    combined_params: CombinedEventsAndDetailsParams,
) -> str:
    return combined_params.analysis_field

get_analysis_field_label_from_event_details

get_analysis_field_label_from_event_details(combined_params: CombinedEventsAndDetailsParams) -> str
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register()
def get_analysis_field_label_from_event_details(
    combined_params: CombinedEventsAndDetailsParams,
) -> str:
    return combined_params.analysis_field_label

get_analysis_field_unit_from_event_details

get_analysis_field_unit_from_event_details(combined_params: CombinedEventsAndDetailsParams) -> str
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register()
def get_analysis_field_unit_from_event_details(
    combined_params: CombinedEventsAndDetailsParams,
) -> str:
    return combined_params.analysis_field_unit

get_category_field_from_event_details

get_category_field_from_event_details(combined_params: CombinedEventsAndDetailsParams) -> str | None
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register()
def get_category_field_from_event_details(
    combined_params: CombinedEventsAndDetailsParams,
) -> str | None:
    return combined_params.category_field

get_category_field_label_from_event_details

get_category_field_label_from_event_details(combined_params: CombinedEventsAndDetailsParams) -> str | None
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register()
def get_category_field_label_from_event_details(
    combined_params: CombinedEventsAndDetailsParams,
) -> str | None:
    return combined_params.category_field_label

get_choices_from_v2_event_type

get_choices_from_v2_event_type(client: EarthRangerClient, event_type: SingleEventTypeAnnotation, choice_field: Annotated[str | SkipJsonSchema[None], Field(description='The choice field to lookup values from')]) -> dict[str, str]
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_choices_from_v2_event_type(
    client: EarthRangerClient,
    event_type: SingleEventTypeAnnotation,
    choice_field: Annotated[
        str | SkipJsonSchema[None],
        Field(description="The choice field to lookup values from"),
    ],
) -> dict[str, str]:
    from ecoscope.io.earthranger import ERClientNotFound

    choices: dict[str, str] = {}
    try:
        choices = client.get_choices_from_v2_event_type(event_type, choice_field)
    except ERClientNotFound:
        pass

    return choices

get_event_type_display_names_from_events

get_event_type_display_names_from_events(client: EarthRangerClient, events_gdf: EventGDF, append_category_names: AppendCategorySelectionAnnotation = 'duplicates') -> EventsWithDisplayNamesGDF
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_event_type_display_names_from_events(
    client: EarthRangerClient,
    events_gdf: EventGDF,
    append_category_names: AppendCategorySelectionAnnotation = "duplicates",
) -> EventsWithDisplayNamesGDF:
    events_gdf = client.get_event_type_display_names_from_events(  # type: ignore[assignment]
        events_gdf=events_gdf,
        append_category_names=append_category_names,
    )
    return cast(EventsWithDisplayNamesGDF, events_gdf)

get_event_type_from_event_details

get_event_type_from_event_details(combined_params: CombinedEventsAndDetailsParams) -> str | None
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register()
def get_event_type_from_event_details(
    combined_params: CombinedEventsAndDetailsParams,
) -> str | None:
    return combined_params.event_type

get_events

get_events(client: EarthRangerClient, time_range: TimeRangeAnnotation, event_types: EventTypesAnnotation, event_columns: EventColumnsAnnotation = None, include_null_geometry: IncludeNullGeometryAnnotation = True, raise_on_empty: RaiseOnEmptyAnnotation = True, include_details: IncludeDetailsAnnotation = False, include_updates: IncludeUpdatesAnnotation = False, include_related_events: IncludeRelatedEventsAnnotation = False, include_display_values: IncludeDisplayValuesAnnotation = False, force_point_geometry: ForcePointGeometryAnnotation = True) -> EventGDF | EventsWithDisplayNamesGDF | EmptyDataFrame

Get events.

Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_events(
    client: EarthRangerClient,
    time_range: TimeRangeAnnotation,
    event_types: EventTypesAnnotation,
    event_columns: EventColumnsAnnotation = None,
    include_null_geometry: IncludeNullGeometryAnnotation = True,
    raise_on_empty: RaiseOnEmptyAnnotation = True,
    include_details: IncludeDetailsAnnotation = False,
    include_updates: IncludeUpdatesAnnotation = False,
    include_related_events: IncludeRelatedEventsAnnotation = False,
    include_display_values: IncludeDisplayValuesAnnotation = False,
    force_point_geometry: ForcePointGeometryAnnotation = True,
) -> EventGDF | EventsWithDisplayNamesGDF | EmptyDataFrame:
    """Get events."""
    event_type_ids: list[str] = []
    no_ids_found = False
    # Resolve event_type ids from the values input in event_types
    # If none are resolved we flag this explicitly in `no_ids_found`
    # as we need to treat this case separately from empty input
    if len(event_types) > 0:
        all_event_types = pd.DataFrame(client.get_event_types())
        event_type_ids = all_event_types[all_event_types["value"].isin(event_types)]["id"].to_list()
        no_ids_found = not event_type_ids

    events_df = (
        pd.DataFrame()
        if no_ids_found
        else client.get_events(
            since=time_range.since.isoformat(),
            until=time_range.until.isoformat(),
            event_type=event_type_ids,
            drop_null_geometry=not include_null_geometry,
            include_details=include_details,
            include_updates=include_updates,
            include_related_events=include_related_events,
            force_point_geometry=force_point_geometry,
        )
    )

    if raise_on_empty and events_df.empty:
        raise ValueError("No data returned from EarthRanger for get_events")

    if not events_df.empty:
        events_df = events_df.reset_index()

        if event_columns is not None:
            events_df = events_df[event_columns]  # type: ignore[assignment]

        if include_display_values:
            events_df = client.get_event_type_display_names_from_events(
                events_df,
                append_category_names="duplicates",
            )

    return cast(
        EventGDF | EventsWithDisplayNamesGDF,
        events_df,
    )

get_events_from_combined_params

get_events_from_combined_params(combined_params: CombinedEventsAndDetailsParams) -> EventGDF | EventsWithDisplayNamesGDF | EmptyDataFrame
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_events_from_combined_params(
    combined_params: CombinedEventsAndDetailsParams,
) -> EventGDF | EventsWithDisplayNamesGDF | EmptyDataFrame:
    return task(get_events).validate().call(**combined_params.get_events_params())

get_events_from_smart

get_events_from_smart(client: SmartClient, time_range: Annotated[TimeRange, Field(description='Time range filter')], ca_uuid: Annotated[str, Field(description='Conservation Area UUID', title='Conservation Area UUID')], language_uuid: Annotated[str, Field(description='Language UUID', title='Language UUID')]) -> EventGDF | EmptyDataFrame

Get events.

Source code in ecoscope/platform/tasks/io/_smart.py
@register(tags=["io"])
def get_events_from_smart(
    client: SmartClient,
    time_range: Annotated[TimeRange, Field(description="Time range filter")],
    ca_uuid: Annotated[str, Field(description="Conservation Area UUID", title="Conservation Area UUID")],
    language_uuid: Annotated[str, Field(description="Language UUID", title="Language UUID")],
) -> EventGDF | EmptyDataFrame:
    """Get events."""
    return cast(
        EventGDF,
        client.get_events(
            start=time_range.since.isoformat(),
            end=time_range.until.isoformat(),
            ca_uuid=ca_uuid,
            language_uuid=language_uuid,
        ),
    )

get_fields_from_event_type_schema

get_fields_from_event_type_schema(client: EarthRangerClient, event_type: SingleEventTypeAnnotation) -> dict[str, str]
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_fields_from_event_type_schema(
    client: EarthRangerClient,
    event_type: SingleEventTypeAnnotation,
) -> dict[str, str]:
    fields: dict[str, str] = client.get_fields_from_event_type_schema(event_type)
    return fields

get_patrol_events

get_patrol_events(client: EarthRangerClient, time_range: TimeRangeAnnotation, patrol_types: PatrolTypesAnnotation, event_types: EventTypesAnnotation, status: PatrolStatusAnnotation = None, include_null_geometry: IncludeNullGeometryAnnotation = True, truncate_to_time_range: TruncateToTimeRangeAnnotation = True, raise_on_empty: RaiseOnEmptyAnnotation = True, sub_page_size: SubPageSizeAnnotation = 100, include_display_values: IncludeDisplayValuesAnnotation = False, patrols_overlap_daterange: PatrolsOverlapDateRangeAnnotation = True) -> EventGDF | EventsWithDisplayNamesGDF | EmptyDataFrame

Get events from patrols.

Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_patrol_events(
    client: EarthRangerClient,
    time_range: TimeRangeAnnotation,
    patrol_types: PatrolTypesAnnotation,
    event_types: EventTypesAnnotation,
    status: PatrolStatusAnnotation = None,
    include_null_geometry: IncludeNullGeometryAnnotation = True,
    truncate_to_time_range: TruncateToTimeRangeAnnotation = True,
    raise_on_empty: RaiseOnEmptyAnnotation = True,
    sub_page_size: SubPageSizeAnnotation = 100,
    include_display_values: IncludeDisplayValuesAnnotation = False,
    patrols_overlap_daterange: PatrolsOverlapDateRangeAnnotation = True,
) -> EventGDF | EventsWithDisplayNamesGDF | EmptyDataFrame:
    """Get events from patrols."""

    if status is None:
        status = ["done"]

    events = client.get_patrol_events(
        since=time_range.since.isoformat(),
        until=time_range.until.isoformat(),
        patrol_type_value=patrol_types,
        event_type=event_types,
        status=status,
        drop_null_geometry=not include_null_geometry,
        sub_page_size=sub_page_size,
        patrols_overlap_daterange=patrols_overlap_daterange,
    )

    if raise_on_empty and events.empty:
        raise ValueError("No data returned from EarthRanger for get_patrol_events")

    if truncate_to_time_range and not events.empty:
        events = events.loc[  # type: ignore[assignment]
            (events.time >= time_range.since) & (events.time <= time_range.until)
        ]

    if not events.empty and include_display_values:
        events = client.get_event_type_display_names_from_events(events, append_category_names="duplicates")

    return cast(EventGDF | EventsWithDisplayNamesGDF, events)

get_patrol_events_from_combined_params

get_patrol_events_from_combined_params(combined_params: CombinedPatrolAndEventsParams) -> EventGDF | EventsWithDisplayNamesGDF | EmptyDataFrame
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_patrol_events_from_combined_params(
    combined_params: CombinedPatrolAndEventsParams,
) -> EventGDF | EventsWithDisplayNamesGDF | EmptyDataFrame:
    return task(get_patrol_events).validate().call(**combined_params.get_patrol_events_params())

get_patrol_observations

get_patrol_observations(client: EarthRangerClient, time_range: TimeRangeAnnotation, patrol_types: PatrolTypesAnnotation, status: PatrolStatusAnnotation = None, include_patrol_details: IncludePatrolDetailsAnnotation = True, raise_on_empty: RaiseOnEmptyAnnotation = True, sub_page_size: SubPageSizeAnnotation = 100, patrols_overlap_daterange: PatrolsOverlapDateRangeAnnotation = True) -> PatrolObservationsGDF | EmptyDataFrame

Get observations for a patrol type from EarthRanger.

Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_patrol_observations(
    client: EarthRangerClient,
    time_range: TimeRangeAnnotation,
    patrol_types: PatrolTypesAnnotation,
    status: PatrolStatusAnnotation = None,
    include_patrol_details: IncludePatrolDetailsAnnotation = True,
    raise_on_empty: RaiseOnEmptyAnnotation = True,
    sub_page_size: SubPageSizeAnnotation = 100,
    patrols_overlap_daterange: PatrolsOverlapDateRangeAnnotation = True,
) -> PatrolObservationsGDF | EmptyDataFrame:
    """Get observations for a patrol type from EarthRanger."""
    from ecoscope.relocations import Relocations

    if status is None:
        status = ["done"]

    if warehouse_client := _make_warehouse_client_from_env(
        er_site_url=client.server,
        er_api_token=SecretStr(client.token) if client.token else None,
    ):
        import geopandas as gpd  # type: ignore[import-untyped]

        table = warehouse_client.get_patrol_observations_with_patrol_filter(
            since=time_range.since.isoformat(),
            until=time_range.until.isoformat(),
            patrol_type_value=patrol_types,
            status=status,
            include_patrol_details=include_patrol_details,
            sub_page_size=sub_page_size,
            patrols_overlap_daterange=patrols_overlap_daterange,
        )
        patrol_obs_relocs = gpd.GeoDataFrame.from_arrow(table)
    else:
        patrol_obs_relocs = client.get_patrol_observations_with_patrol_filter(
            since=time_range.since.isoformat(),
            until=time_range.until.isoformat(),
            patrol_type_value=patrol_types,
            status=status,
            include_patrol_details=include_patrol_details,
            sub_page_size=sub_page_size,
            patrols_overlap_daterange=patrols_overlap_daterange,
        )
        if isinstance(patrol_obs_relocs, Relocations):
            patrol_obs_relocs = patrol_obs_relocs.gdf

    if raise_on_empty and patrol_obs_relocs.empty:
        raise ValueError("No data returned from EarthRanger for get_patrol_observations_with_patrol_filter")

    return cast(PatrolObservationsGDF, patrol_obs_relocs)

get_patrol_observations_from_combined_params

get_patrol_observations_from_combined_params(combined_params: CombinedPatrolAndEventsParams) -> PatrolObservationsGDF | EmptyDataFrame
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_patrol_observations_from_combined_params(
    combined_params: CombinedPatrolAndEventsParams,
) -> PatrolObservationsGDF | EmptyDataFrame:
    return task(get_patrol_observations).validate().call(**combined_params.get_patrol_observations_params())

get_patrol_observations_from_patrols_df

get_patrol_observations_from_patrols_df(client: EarthRangerClient, patrols_df: PatrolsDF, include_patrol_details: IncludePatrolDetailsAnnotation = True, raise_on_empty: RaiseOnEmptyAnnotation = True, sub_page_size: SubPageSizeAnnotation = 100) -> PatrolObservationsGDF | EmptyDataFrame

Get observations for a patrol type from EarthRanger.

Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_patrol_observations_from_patrols_df(
    client: EarthRangerClient,
    patrols_df: PatrolsDF,
    include_patrol_details: IncludePatrolDetailsAnnotation = True,
    raise_on_empty: RaiseOnEmptyAnnotation = True,
    sub_page_size: SubPageSizeAnnotation = 100,
) -> PatrolObservationsGDF | EmptyDataFrame:
    """Get observations for a patrol type from EarthRanger."""
    from ecoscope.relocations import Relocations

    if warehouse_client := _make_warehouse_client_from_env(
        er_site_url=client.server,
        er_api_token=SecretStr(client.token) if client.token else None,
    ):
        import geopandas as gpd  # type: ignore[import-untyped]

        table = warehouse_client.get_patrol_observations(
            patrols_df=patrols_df,
            include_patrol_details=include_patrol_details,
            sub_page_size=sub_page_size,
        )
        patrol_obs_relocs = gpd.GeoDataFrame.from_arrow(table)
    else:
        patrol_obs_relocs = client.get_patrol_observations(
            patrols_df=patrols_df,
            include_patrol_details=include_patrol_details,
            sub_page_size=sub_page_size,
        )
        if isinstance(patrol_obs_relocs, Relocations):
            patrol_obs_relocs = patrol_obs_relocs.gdf

    if raise_on_empty and patrol_obs_relocs.empty:
        raise ValueError("No data returned from EarthRanger for get_patrol_observations_with_patrol_filter")

    return cast(PatrolObservationsGDF, patrol_obs_relocs)

get_patrol_observations_from_patrols_df_and_combined_params

get_patrol_observations_from_patrols_df_and_combined_params(patrols_df: PatrolsDF, combined_params: CombinedPatrolAndEventsParams) -> PatrolObservationsGDF | EmptyDataFrame
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_patrol_observations_from_patrols_df_and_combined_params(
    patrols_df: PatrolsDF,
    combined_params: CombinedPatrolAndEventsParams,
) -> PatrolObservationsGDF | EmptyDataFrame:
    return (
        task(get_patrol_observations_from_patrols_df)
        .validate()
        .call(
            patrols_df=patrols_df,
            **combined_params.get_patrol_observations_from_patrols_df_params(),
        )
    )

get_patrol_observations_from_smart

get_patrol_observations_from_smart(client: SmartClient, time_range: Annotated[TimeRange, Field(description='Time range filter')], ca_uuid: Annotated[str, Field(description='Conservation Area UUID', title='Conservation Area UUID')], language_uuid: Annotated[str, Field(description='Language UUID', title='Language UUID')], patrol_mandate: Annotated[str | SkipJsonSchema[None], AdvancedField(default=None, description='Patrol Mandate', title='Patrol Mandate')] = None, patrol_transport: Annotated[str | SkipJsonSchema[None], AdvancedField(default=None, description='Patrol Transport', title='Patrol Transport')] = None) -> PatrolObservationsGDF | EmptyDataFrame

Get observations for a patrol type from Smart.

Source code in ecoscope/platform/tasks/io/_smart.py
@register(tags=["io"])
def get_patrol_observations_from_smart(
    client: SmartClient,
    time_range: Annotated[TimeRange, Field(description="Time range filter")],
    ca_uuid: Annotated[str, Field(description="Conservation Area UUID", title="Conservation Area UUID")],
    language_uuid: Annotated[str, Field(description="Language UUID", title="Language UUID")],
    patrol_mandate: Annotated[
        str | SkipJsonSchema[None],
        AdvancedField(default=None, description="Patrol Mandate", title="Patrol Mandate"),
    ] = None,
    patrol_transport: Annotated[
        str | SkipJsonSchema[None],
        AdvancedField(default=None, description="Patrol Transport", title="Patrol Transport"),
    ] = None,
) -> PatrolObservationsGDF | EmptyDataFrame:
    """Get observations for a patrol type from Smart."""
    from ecoscope.relocations import Relocations

    patrol_obs_relocs = client.get_patrol_observations(
        start=time_range.since.isoformat(),
        end=time_range.until.isoformat(),
        ca_uuid=ca_uuid,
        language_uuid=language_uuid,
        patrol_mandate=patrol_mandate,
        patrol_transport=patrol_transport,
    )
    if isinstance(patrol_obs_relocs, Relocations):
        patrol_obs_relocs = patrol_obs_relocs.gdf

    return cast(PatrolObservationsGDF, patrol_obs_relocs)

get_patrols

get_patrols(client: EarthRangerClient, time_range: TimeRangeAnnotation, patrol_types: PatrolTypesAnnotation, status: PatrolStatusAnnotation = None, raise_on_empty: RaiseOnEmptyAnnotation = True, sub_page_size: SubPageSizeAnnotation = 100, patrols_overlap_daterange: PatrolsOverlapDateRangeAnnotation = True) -> PatrolsDF | EmptyDataFrame
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_patrols(
    client: EarthRangerClient,
    time_range: TimeRangeAnnotation,
    patrol_types: PatrolTypesAnnotation,
    status: PatrolStatusAnnotation = None,
    raise_on_empty: RaiseOnEmptyAnnotation = True,
    sub_page_size: SubPageSizeAnnotation = 100,
    patrols_overlap_daterange: PatrolsOverlapDateRangeAnnotation = True,
) -> PatrolsDF | EmptyDataFrame:
    if status is None:
        status = ["done"]

    patrols = client.get_patrols(
        since=time_range.since.isoformat(),
        until=time_range.until.isoformat(),
        patrol_type_value=patrol_types,
        status=status,
        sub_page_size=sub_page_size,
        patrols_overlap_daterange=patrols_overlap_daterange,
    )

    if raise_on_empty and patrols.empty:
        raise ValueError("No data returned from EarthRanger for get_patrols")

    return cast(PatrolsDF, patrols)

get_patrols_from_combined_params

get_patrols_from_combined_params(combined_params: CombinedPatrolAndEventsParams) -> PatrolsDF | EmptyDataFrame
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_patrols_from_combined_params(
    combined_params: CombinedPatrolAndEventsParams,
) -> PatrolsDF | EmptyDataFrame:
    return task(get_patrols).validate().call(**combined_params.get_patrols_params())

get_spatial_features_group

get_spatial_features_group(client: EarthRangerClient, spatial_features_group_name: Annotated[str, Field(description='The name of the group to fetch')]) -> RegionsGDF | EmptyDataFrame
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_spatial_features_group(
    client: EarthRangerClient,
    spatial_features_group_name: Annotated[str, Field(description="The name of the group to fetch")],
) -> RegionsGDF | EmptyDataFrame:
    spatial_features_group = client.get_spatial_features_group(
        spatial_features_group_name=spatial_features_group_name,
        spatial_features_group_id=None,
        with_group_data=True,
    )
    sfg = SpatialFeaturesGroup(**spatial_features_group)  # type: ignore[arg-type]
    regions_gdf = sfg.features
    regions_gdf["metadata"] = [{"id": sfg.id, "display_name": sfg.name}] * len(regions_gdf)  # type: ignore[assignment]
    return cast(RegionsGDF, regions_gdf)

get_subjectgroup_observations

get_subjectgroup_observations(client: EarthRangerClient, subject_group_name: Annotated[str, Field(description='⚠️ The use of a group with mixed subtypes could lead to unexpected results')], time_range: Annotated[TimeRange, Field(description='Time range filter')], raise_on_empty: Annotated[bool, AdvancedField(default=True, description='Whether or not to abort the workflow if no data is returned from EarthRanger')] = True, include_details: Annotated[bool, AdvancedField(default=False, title='Include Observation Details', description='Whether or not to include observation details')] = False, include_subjectsource_details: Annotated[bool, AdvancedField(default=False, title='Include Subject Source Details', description='Whether or not to include subject source details')] = False, filter: Annotated[ExclusionFilter, AdvancedField(default=clean, description='Filter observations based on exclusion flags.')] = 'clean') -> SubjectGroupObservationsGDF | EmptyDataFrame

Get observations for a subject group from EarthRanger.

Source code in ecoscope/platform/tasks/io/_earthranger.py
@register(tags=["io"])
def get_subjectgroup_observations(
    client: EarthRangerClient,
    subject_group_name: Annotated[
        str,
        Field(description="⚠️ The use of a group with mixed subtypes could lead to unexpected results"),
    ],
    time_range: Annotated[TimeRange, Field(description="Time range filter")],
    raise_on_empty: Annotated[
        bool,
        AdvancedField(
            default=True,
            description="Whether or not to abort the workflow if no data is returned from EarthRanger",
        ),
    ] = True,
    include_details: Annotated[
        bool,
        AdvancedField(
            default=False,
            title="Include Observation Details",
            description="Whether or not to include observation details",
        ),
    ] = False,
    include_subjectsource_details: Annotated[
        bool,
        AdvancedField(
            default=False,
            title="Include Subject Source Details",
            description="Whether or not to include subject source details",
        ),
    ] = False,
    filter: Annotated[
        ExclusionFilter,
        AdvancedField(
            default="clean",
            description="Filter observations based on exclusion flags.",
        ),
    ] = "clean",
) -> SubjectGroupObservationsGDF | EmptyDataFrame:
    """Get observations for a subject group from EarthRanger."""
    from ecoscope.relocations import Relocations

    filter_int = _EXCLUSION_FILTER_TO_INT[filter]

    if warehouse_client := _make_warehouse_client_from_env(
        er_site_url=client.server,
        er_api_token=SecretStr(client.token) if client.token else None,
    ):
        import geopandas as gpd  # type: ignore[import-untyped]

        table = warehouse_client.get_subjectgroup_observations(
            subject_group_name=subject_group_name,
            include_subject_details=True,
            include_inactive=True,
            include_details=include_details,
            include_subjectsource_details=include_subjectsource_details,
            since=time_range.since.isoformat(),
            until=time_range.until.isoformat(),
            filter=filter_int,
        )
        subject_group_obs_relocs = gpd.GeoDataFrame.from_arrow(table)
    else:
        subject_group_obs_relocs = client.get_subjectgroup_observations(
            subject_group_name=subject_group_name,
            include_subject_details=True,
            include_inactive=True,
            include_details=include_details,
            include_subjectsource_details=include_subjectsource_details,
            since=time_range.since.isoformat(),
            until=time_range.until.isoformat(),
            filter=filter_int,
        )
        if isinstance(subject_group_obs_relocs, Relocations):
            subject_group_obs_relocs = subject_group_obs_relocs.gdf

    if raise_on_empty and subject_group_obs_relocs.empty:
        raise ValueError("No data returned from EarthRanger for get_subjectgroup_observations")

    return cast(SubjectGroupObservationsGDF, subject_group_obs_relocs)

load_spatial_features_group

load_spatial_features_group(config: Annotated[SpatialFeaturesConfig, Field(title='Spatial Feature Data Source')]) -> RegionsGDF | EmptyDataFrame

Load spatial feature group from local file, remote URL, or EarthRanger.

Supports geoparquet and geopackage files. All geometries are converted to CRS 4326 and filtered to polygons only.

Parameters:

Name Type Description Default
config Annotated[SpatialFeaturesConfig, Field(title='Spatial Feature Data Source')]

Configuration specifying the source type and parameters.

required

Returns:

Type Description
RegionsGDF | EmptyDataFrame

RegionsGDF with columns: pk, name, short_name, feature_type, geometry, metadata.

RegionsGDF | EmptyDataFrame

Returns EmptyDataFrame if no polygons found.

Raises:

Type Description
ValueError

If file format is invalid or required columns are missing.

Source code in ecoscope/platform/tasks/io/_spatial_features.py
@register(tags=["io"])
def load_spatial_features_group(
    config: Annotated[
        SpatialFeaturesConfig,
        Field(title="Spatial Feature Data Source"),
    ],
) -> RegionsGDF | EmptyDataFrame:
    """
    Load spatial feature group from local file, remote URL, or EarthRanger.

    Supports geoparquet and geopackage files. All geometries are converted to
    CRS 4326 and filtered to polygons only.

    Args:
        config: Configuration specifying the source type and parameters.

    Returns:
        RegionsGDF with columns: pk, name, short_name, feature_type, geometry, metadata.
        Returns EmptyDataFrame if no polygons found.

    Raises:
        ValueError: If file format is invalid or required columns are missing.
    """

    if isinstance(config, EarthRangerSpatialFeatures):
        client = EarthRangerConnection.client_from_named_connection(config.data_source)
        regions_gdf = get_spatial_features_group(
            client=client,
            spatial_features_group_name=config.spatial_features_group_name,
        )

        # Filter to polygon geometries only
        polygon_mask = regions_gdf.geometry.geom_type.isin({"Polygon", "MultiPolygon"})
        regions_gdf = regions_gdf[polygon_mask].reset_index(drop=True)

        if regions_gdf.empty:
            raise ValueError("No Polygon or MultiPolygon geometries found")

        _validate_regions(regions_gdf)
        return cast(RegionsGDF, regions_gdf)

    elif isinstance(config, RemoteFileSpatialFeatures):
        import tempfile

        from ecoscope.io import download_file  # type: ignore[import-untyped]

        # Derive display_name from URL filename
        display_name = config.url.split("/")[-1].rsplit(".", 1)[0].replace("-", " ").replace("_", " ").title()

        # Download to temp directory
        tmp_dir = tempfile.mkdtemp()
        download_file(config.url, tmp_dir)

        # Find the downloaded file
        downloaded = os.listdir(tmp_dir)
        if not downloaded:
            raise ValueError(f"No file downloaded from {config.url}")
        temp_path = os.path.join(tmp_dir, downloaded[0])

        return _load_spatial_regions_from_file(
            file_path=temp_path,
            layer=config.layer,
            name_column=config.name_column,
            display_name=display_name,
        )

    else:  # LocalFileSpatialFeatures
        # Derive display_name from filename
        display_name = os.path.basename(config.file_path).rsplit(".", 1)[0].replace("-", " ").replace("_", " ").title()

        return _load_spatial_regions_from_file(
            file_path=config.file_path,
            layer=config.layer,
            name_column=config.name_column,
            display_name=display_name,
        )

persist_df

persist_df(df: Annotated[AnyDataFrame, Field(description='Dataframe to persist')], root_path: Annotated[str, Field(description='Root path to persist text to')], filename: Annotated[str | None, Field(description='            Optional filename to persist text to within the `root_path`.\n            If not provided, a filename will be generated based on a hash of the df content.\n            ')] = None, filetype: Annotated[FileType, Field(description='The output format')] = 'csv') -> Annotated[str, Field(description='Path to persisted data')]

Persist dataframe to a file or cloud storage object.

Source code in ecoscope/platform/tasks/io/_persist.py
@register()
def persist_df(
    df: Annotated[AnyDataFrame, Field(description="Dataframe to persist")],
    root_path: Annotated[str, Field(description="Root path to persist text to")],
    filename: Annotated[
        str | None,
        Field(
            description="""\
            Optional filename to persist text to within the `root_path`.
            If not provided, a filename will be generated based on a hash of the df content.
            """,
        ),
    ] = None,
    filetype: Annotated[FileType, Field(description="The output format")] = "csv",
) -> Annotated[str, Field(description="Path to persisted data")]:
    """Persist dataframe to a file or cloud storage object."""
    import geopandas as gpd  # type: ignore[import-untyped]
    import pandas as pd

    if not filename:
        # generate a filename if none is explicitly provided
        # Use repr of the dataframe shape and first few values to create a hash
        # This avoids issues with unhashable types in the dataframe
        try:
            hash_values = pd.util.hash_pandas_object(df).values
            # Convert to bytes - works for both ndarray and ExtensionArray
            hash_input = bytes(hash_values)
        except (TypeError, ValueError):
            # Fallback for unhashable types: use shape and first few rows
            content = f"{df.shape}{df.head(5).to_dict()}"
            hash_input = content.encode()
        filename = hashlib.sha256(hash_input).hexdigest()[:7]
    if filetype == "csv":
        csv_buffer = io.StringIO()
        df.to_csv(csv_buffer)
        return _persist_text(csv_buffer.getvalue(), root_path, f"{filename}.{filetype}")
    elif filetype == "gpkg":
        buffer = io.BytesIO()
        gdf = gpd.GeoDataFrame(df)
        gdf.to_file(buffer, driver="GPKG")
        return _persist_bytes(buffer.getvalue(), root_path, f"{filename}.{filetype}")
    elif filetype == "geoparquet":
        buffer = io.BytesIO()
        gdf = gpd.GeoDataFrame(df)
        gdf.to_parquet(buffer, index=False)
        return _persist_bytes(buffer.getvalue(), root_path, f"{filename}.parquet")
    elif filetype == "parquet":
        buffer = io.BytesIO()
        has_geom = any(isinstance(df[col].dtype, gpd.array.GeometryDtype) for col in df.columns)
        if has_geom:
            gpd.GeoDataFrame(df).to_parquet(buffer, index=False)
        else:
            df.to_parquet(buffer, index=False)
        return _persist_bytes(buffer.getvalue(), root_path, f"{filename}.parquet")
    elif filetype == "geojson":
        gdf = gpd.GeoDataFrame(df)
        return _persist_text(gdf.to_json(), root_path, f"{filename}.{filetype}")
    elif filetype == "json":
        return _persist_text(df.to_json(), root_path, f"{filename}.{filetype}")
    else:
        raise ValueError(f"Unsupported file type: {filetype}")

persist_json

persist_json(data: Annotated[dict[str, Any] | BaseModel, Field(description='JSON-serializable dict or pydantic model to persist')], root_path: Annotated[str, Field(description='Root path to persist data to')], filename: Annotated[str | None, Field(description="            Optional filename (within `root_path`). If not provided, a filename will\n            be generated from a hash of the serialized JSON content. The `.json`\n            extension is appended automatically when one isn't already present.\n            ", exclude=True)] = None, filename_suffix: Annotated[str | None, Field(description='If present, appended to the filename stem before the extension.', exclude=True)] = None) -> Annotated[str, Field(description='Path to persisted JSON')]

Serialize JSON-shaped data and persist to a file or cloud storage object.

Source code in ecoscope/platform/tasks/io/_persist.py
@register()
def persist_json(
    data: Annotated[
        dict[str, Any] | BaseModel,
        Field(description="JSON-serializable dict or pydantic model to persist"),
    ],
    root_path: Annotated[str, Field(description="Root path to persist data to")],
    filename: Annotated[
        str | None,
        Field(
            description="""\
            Optional filename (within `root_path`). If not provided, a filename will
            be generated from a hash of the serialized JSON content. The `.json`
            extension is appended automatically when one isn't already present.
            """,
            exclude=True,
        ),
    ] = None,
    filename_suffix: Annotated[
        str | None,
        Field(
            description="If present, appended to the filename stem before the extension.",
            exclude=True,
        ),
    ] = None,
) -> Annotated[str, Field(description="Path to persisted JSON")]:
    """Serialize JSON-shaped data and persist to a file or cloud storage object."""
    if isinstance(data, BaseModel):
        payload = data.model_dump_json()
    else:
        payload = json.dumps(data)

    if not filename:
        filename = hashlib.sha256(payload.encode()).hexdigest()[:7] + ".json"
    elif not Path(filename).suffix:
        filename = f"{filename}.json"
    if filename_suffix:
        filepath = Path(filename)
        filename = f"{filepath.stem}_{filename_suffix}{filepath.suffix}"

    return _persist_text(payload, root_path, filename)

persist_text

persist_text(text: Annotated[str, Field(description='Text to persist')], root_path: Annotated[str, Field(description='Root path to persist text to')], filename: Annotated[str | None, Field(description='            Optional filename to persist text to within the `root_path`.\n            If not provided, a filename will be generated based on a hash of the text content.\n            ', exclude=True)] = None, filename_suffix: Annotated[str | None, Field(description='            If present, will be appended to the filename as filename_suffix.html\n            ', exclude=True)] = None) -> Annotated[str, Field(description='Path to persisted text')]

Persist text to a file or cloud storage object.

Source code in ecoscope/platform/tasks/io/_persist.py
@register()
def persist_text(
    text: Annotated[str, Field(description="Text to persist")],
    root_path: Annotated[str, Field(description="Root path to persist text to")],
    filename: Annotated[
        str | None,
        Field(
            description="""\
            Optional filename to persist text to within the `root_path`.
            If not provided, a filename will be generated based on a hash of the text content.
            """,
            exclude=True,
        ),
    ] = None,
    filename_suffix: Annotated[
        str | None,
        Field(
            description="""\
            If present, will be appended to the filename as filename_suffix.html
            """,
            exclude=True,
        ),
    ] = None,
) -> Annotated[str, Field(description="Path to persisted text")]:
    """Persist text to a file or cloud storage object."""

    if not filename:
        # generate a filename if none is explicitly provided
        filename = hashlib.sha256(text.encode()).hexdigest()[:7] + ".html"
    if filename_suffix:
        filepath = Path(filename)
        filename = f"{filepath.stem}_{filename_suffix}{filepath.suffix}"

    return _persist_text(text, root_path, filename)

set_er_connection

set_er_connection(data_source: Annotated[EarthRangerConnection, DataSourceField]) -> str
Source code in ecoscope/platform/tasks/io/_set_connection.py
@register()
def set_er_connection(
    data_source: Annotated[
        EarthRangerConnection,
        DataSourceField,
    ],
) -> str:
    return data_source.name

set_event_details_params

set_event_details_params(client: str, time_range: TimeRangeAnnotation, event_type: SingleEventTypeAnnotation, analysis_field: AnalysisFieldAnnotation, analysis_field_label: AnalysisFieldLabelAnnotation, analysis_field_unit: AnalysisFieldUnitAnnotation, event_columns: EventColumnsAnnotation = DefaultEventColumns, category_field: CategoryFieldAnnotation = '', category_field_label: CategoryFieldLabelAnnotation = '', include_null_geometry: IncludeNullGeometryAnnotation = True, raise_on_empty: RaiseOnEmptyAnnotation = True, include_details: IncludeDetailsAnnotation = True, include_updates: IncludeUpdatesAnnotation = False, include_related_events: IncludeRelatedEventsAnnotation = False, include_display_values: IncludeDisplayValuesAnnotation = False) -> Annotated[CombinedEventsAndDetailsParams, Field(description='Passthrough selected events settings for use in downstream tasks')]
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register()
def set_event_details_params(
    client: str,
    time_range: TimeRangeAnnotation,
    event_type: SingleEventTypeAnnotation,
    analysis_field: AnalysisFieldAnnotation,
    analysis_field_label: AnalysisFieldLabelAnnotation,
    analysis_field_unit: AnalysisFieldUnitAnnotation,
    event_columns: EventColumnsAnnotation = DefaultEventColumns,
    category_field: CategoryFieldAnnotation = "",
    category_field_label: CategoryFieldLabelAnnotation = "",
    include_null_geometry: IncludeNullGeometryAnnotation = True,
    raise_on_empty: RaiseOnEmptyAnnotation = True,
    include_details: IncludeDetailsAnnotation = True,
    include_updates: IncludeUpdatesAnnotation = False,
    include_related_events: IncludeRelatedEventsAnnotation = False,
    include_display_values: IncludeDisplayValuesAnnotation = False,
) -> Annotated[
    CombinedEventsAndDetailsParams,
    Field(description="Passthrough selected events settings for use in downstream tasks"),
]:
    return CombinedEventsAndDetailsParams(
        client=client,
        time_range=time_range,
        event_type=event_type,
        event_columns=event_columns,
        analysis_field=analysis_field,
        analysis_field_label=analysis_field_label,
        analysis_field_unit=analysis_field_unit,
        category_field=category_field,
        category_field_label=category_field_label,
        include_null_geometry=include_null_geometry,
        raise_on_empty=raise_on_empty,
        include_details=include_details,
        include_updates=include_updates,
        include_related_events=include_related_events,
        include_display_values=include_display_values,
    )

set_gee_connection

set_gee_connection(data_source: Annotated[GoogleEarthEngineConnection, DataSourceField]) -> str
Source code in ecoscope/platform/tasks/io/_set_connection.py
@register()
def set_gee_connection(
    data_source: Annotated[
        GoogleEarthEngineConnection,
        DataSourceField,
    ],
) -> str:
    return data_source.name

set_patrol_status

set_patrol_status(status: PatrolStatusAnnotation = None) -> PatrolStatusAnnotation
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register()
def set_patrol_status(
    status: PatrolStatusAnnotation = None,
) -> PatrolStatusAnnotation:
    if status is None:
        status = ["done"]
    return status

set_patrol_types

set_patrol_types(patrol_types: Annotated[list[str], Field(description='Specify the patrol type(s) to analyze (optional). Leave empty to analyze all patrol types.')]) -> Annotated[list[str], Field(description='Passthrough selected patrol types for use in downstream EarthRanger queries')]
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register()
def set_patrol_types(
    patrol_types: Annotated[
        list[str],
        Field(description="Specify the patrol type(s) to analyze (optional). Leave empty to analyze all patrol types."),
    ],
) -> Annotated[
    list[str],
    Field(description="Passthrough selected patrol types for use in downstream EarthRanger queries"),
]:
    return patrol_types

set_patrols_and_patrol_events_params

set_patrols_and_patrol_events_params(client: str, time_range: TimeRangeAnnotation, patrol_types: PatrolTypesAnnotation, event_types: EventTypesAnnotation, status: PatrolStatusAnnotation = None, include_patrol_details: IncludePatrolDetailsAnnotation = True, raise_on_empty: RaiseOnEmptyAnnotation = True, include_null_geometry: IncludeNullGeometryAnnotation = True, truncate_to_time_range: TruncateToTimeRangeAnnotation = True, sub_page_size: SubPageSizeAnnotation = 100, patrols_overlap_daterange: PatrolsOverlapDateRangeAnnotation = True) -> Annotated[CombinedPatrolAndEventsParams, Field(description='Passthrough selected patrol and event types for use in downstream EarthRanger queries')]
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register()
def set_patrols_and_patrol_events_params(
    client: str,
    time_range: TimeRangeAnnotation,
    patrol_types: PatrolTypesAnnotation,
    event_types: EventTypesAnnotation,
    status: PatrolStatusAnnotation = None,
    include_patrol_details: IncludePatrolDetailsAnnotation = True,
    raise_on_empty: RaiseOnEmptyAnnotation = True,
    include_null_geometry: IncludeNullGeometryAnnotation = True,
    truncate_to_time_range: TruncateToTimeRangeAnnotation = True,
    sub_page_size: SubPageSizeAnnotation = 100,
    patrols_overlap_daterange: PatrolsOverlapDateRangeAnnotation = True,
) -> Annotated[
    CombinedPatrolAndEventsParams,
    Field(description="Passthrough selected patrol and event types for use in downstream EarthRanger queries"),
]:
    return CombinedPatrolAndEventsParams(
        client=client,
        time_range=time_range,
        patrol_types=patrol_types,
        event_types=event_types,
        status=status,
        include_patrol_details=include_patrol_details,
        raise_on_empty=raise_on_empty,
        include_null_geometry=include_null_geometry,
        truncate_to_time_range=truncate_to_time_range,
        sub_page_size=sub_page_size,
        patrols_overlap_daterange=patrols_overlap_daterange,
    )

set_smart_connection

set_smart_connection(data_source: Annotated[SMARTConnection, DataSourceField]) -> str
Source code in ecoscope/platform/tasks/io/_set_connection.py
@register()
def set_smart_connection(
    data_source: Annotated[
        SMARTConnection,
        DataSourceField,
    ],
) -> str:
    return data_source.name

unpack_events_from_patrols_df

unpack_events_from_patrols_df(patrols_df: PatrolsDF, event_types: EventTypesAnnotation, time_range: TimeRangeAnnotation, include_null_geometry: IncludeNullGeometryAnnotation = True, truncate_to_time_range: TruncateToTimeRangeAnnotation = True, raise_on_empty: RaiseOnEmptyAnnotation = True) -> EventGDF | EmptyDataFrame
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register()
def unpack_events_from_patrols_df(
    patrols_df: PatrolsDF,
    event_types: EventTypesAnnotation,
    time_range: TimeRangeAnnotation,
    include_null_geometry: IncludeNullGeometryAnnotation = True,
    truncate_to_time_range: TruncateToTimeRangeAnnotation = True,
    raise_on_empty: RaiseOnEmptyAnnotation = True,
) -> EventGDF | EmptyDataFrame:
    from ecoscope.io.earthranger_utils import (
        unpack_events_from_patrols_df,
    )

    patrol_events = unpack_events_from_patrols_df(
        patrols_df=patrols_df,
        event_type=event_types,
        drop_null_geometry=not include_null_geometry,
    )

    if raise_on_empty and patrol_events.empty:
        raise ValueError("No event data in provided patrols_df")

    if truncate_to_time_range and not patrol_events.empty:
        patrol_events = patrol_events.loc[  # type: ignore[assignment]
            (patrol_events.time >= time_range.since) & (patrol_events.time <= time_range.until)
        ]

    return cast(EventGDF, patrol_events)

unpack_events_from_patrols_df_and_combined_params

unpack_events_from_patrols_df_and_combined_params(patrols_df: PatrolsDF, combined_params: CombinedPatrolAndEventsParams) -> EventGDF | EmptyDataFrame
Source code in ecoscope/platform/tasks/io/_earthranger.py
@register()
def unpack_events_from_patrols_df_and_combined_params(
    patrols_df: PatrolsDF,
    combined_params: CombinedPatrolAndEventsParams,
) -> EventGDF | EmptyDataFrame:
    return (
        task(unpack_events_from_patrols_df)
        .validate()
        .call(patrols_df=patrols_df, **combined_params.unpack_patrol_events_params())
    )