Skip to content

Analysis Tasks

ecoscope.platform.tasks.analysis

Classes

TimeDensityReturnGDFSchema

Bases: JsonSerializableDataFrameModel

Attributes
area_sqkm class-attribute instance-attribute
area_sqkm: Series[float] = Field()
geometry class-attribute instance-attribute
geometry: Series[Any] = Field()
percentile class-attribute instance-attribute
percentile: Series[float] = Field()

Functions:

aggregate_over_rows

aggregate_over_rows(df: AnyDataFrame, agg_ops: Annotated[AggOperations, Field(description='The parameters that define how to calculate summary statistics.')], output_column: Annotated[str, Field(description='The output column name.')], columns: Annotated[list[str], Field(description='The list of columns.')]) -> AnyDataFrame
Source code in ecoscope/platform/tasks/analysis/_summary.py
@register()
def aggregate_over_rows(
    df: AnyDataFrame,
    agg_ops: Annotated[
        AggOperations,
        Field(
            description="The parameters that define how to calculate summary statistics.",
        ),
    ],
    output_column: Annotated[
        str,
        Field(
            description="The output column name.",
        ),
    ],
    columns: Annotated[
        list[str],
        Field(
            description="The list of columns.",
        ),
    ],
) -> AnyDataFrame:
    df[output_column] = df[columns].agg(func=[agg_ops], axis=1)
    return cast(AnyDataFrame, df)

apply_arithmetic_operation

apply_arithmetic_operation(a: Annotated[float | int, Field(description='The first number')], b: Annotated[float | int, Field(description='The second number')], operation: Annotated[Operations, Field(description='The arithmetic operation to apply')]) -> Annotated[float | int, Field(description='The result of the arithmetic operation')]
Source code in ecoscope/platform/tasks/analysis/_aggregation.py
@register()
def apply_arithmetic_operation(
    a: Annotated[float | int, Field(description="The first number")],
    b: Annotated[float | int, Field(description="The second number")],
    operation: Annotated[Operations, Field(description="The arithmetic operation to apply")],
) -> Annotated[float | int, Field(description="The result of the arithmetic operation")]:
    return operations[operation](a, b)  # type: ignore[operator]

apply_arithmetic_operation_over_rows

apply_arithmetic_operation_over_rows(df: AnyDataFrame, column_a: Annotated[str, Field(description='The first column name')], column_b: Annotated[str, Field(description='The second column name')], output_column: Annotated[str, Field(description='The output column name')], operation: Annotated[Operations, Field(description='The arithmetic operation to apply')]) -> AnyDataFrame
Source code in ecoscope/platform/tasks/analysis/_aggregation.py
@register()
def apply_arithmetic_operation_over_rows(
    df: AnyDataFrame,
    column_a: Annotated[str, Field(description="The first column name")],
    column_b: Annotated[str, Field(description="The second column name")],
    output_column: Annotated[str, Field(description="The output column name")],
    operation: Annotated[Operations, Field(description="The arithmetic operation to apply")],
) -> AnyDataFrame:
    df[output_column] = operations[operation](df[column_a], df[column_b])  # type: ignore[operator]
    return cast(AnyDataFrame, df)

calculate_elliptical_time_density

calculate_elliptical_time_density(trajectory_gdf: TrajectoryAnnotation, auto_scale_or_custom_cell_size: AutoScaleOrCustomAnnotation = None, crs: CrsAnnotation = 'EPSG:3857', nodata_value: NoDataAnnotation = 'nan', band_count: BandCountAnnotation = 1, max_speed_factor: MaxSpeedFactorAnnotation = 1.05, expansion_factor: ExpansionFactorAnnotation = 1.3, percentiles: EtdPercentileAnnotation = None) -> DataFrame[TimeDensityReturnGDFSchema]
Source code in ecoscope/platform/tasks/analysis/_time_density.py
@register()
def calculate_elliptical_time_density(
    trajectory_gdf: TrajectoryAnnotation,
    auto_scale_or_custom_cell_size: AutoScaleOrCustomAnnotation = None,
    crs: CrsAnnotation = "EPSG:3857",
    nodata_value: NoDataAnnotation = "nan",
    band_count: BandCountAnnotation = 1,
    # time density
    max_speed_factor: MaxSpeedFactorAnnotation = 1.05,
    expansion_factor: ExpansionFactorAnnotation = 1.3,
    percentiles: EtdPercentileAnnotation = None,
) -> DataFrame[TimeDensityReturnGDFSchema]:
    import geopandas as gpd  # type: ignore[import-untyped]
    import pandas as pd  # type: ignore[import-untyped]

    from ecoscope.analysis.percentile import (
        get_percentile_area,
    )
    from ecoscope.analysis.UD import (
        calculate_etd_range,
        grid_size_from_geographic_extent,
    )
    from ecoscope.io.raster import RasterProfile
    from ecoscope.trajectory import Trajectory

    if percentiles is not None and len(percentiles) == 0:
        raise ValueError("Percentile values, if provided, cannot be empty.")
    percentiles = (
        sorted(list(set(percentiles)))  # type: ignore[assignment]
        if percentiles is not None
        else [50.0, 60.0, 70.0, 80.0, 90.0, 99.999]
    )

    result = pd.DataFrame(
        {
            "percentile": pd.Series(dtype="float64"),
            "geometry": gpd.GeoSeries(dtype="geometry"),
            "area_sqkm": pd.Series(dtype="float64"),
        }
    )

    if auto_scale_or_custom_cell_size is None:
        auto_scale_or_custom_cell_size = AutoScaleGridCellSize()

    if isinstance(auto_scale_or_custom_cell_size, CustomGridCellSize):
        pixel_size = auto_scale_or_custom_cell_size.grid_cell_size
    else:
        pixel_size = grid_size_from_geographic_extent(trajectory_gdf, scale_factor=500)

    raster_profile = RasterProfile(
        pixel_size=pixel_size,  # type: ignore[arg-type]
        crs=crs,
        nodata_value=nodata_value,  # type: ignore[arg-type]
        band_count=band_count,
    )
    trajectory_gdf.sort_values("segment_start", inplace=True)

    raster_data = calculate_etd_range(
        trajectory=Trajectory(gdf=trajectory_gdf),
        max_speed_kmhr=max_speed_factor * trajectory_gdf["speed_kmhr"].max(),
        raster_profile=raster_profile,
        expansion_factor=expansion_factor,
    )

    if raster_data is None or raster_data.data is None or raster_data.data.size == 0:
        logger.warning("No raster data was generated.")
        return cast(DataFrame[TimeDensityReturnGDFSchema], result)

    result = get_percentile_area(
        percentile_levels=percentiles,  # type: ignore[arg-type]
        raster_data=raster_data,
    )
    result.drop(columns="subject_id", inplace=True)
    result["area_sqkm"] = result.area / 1000000.0

    return cast(DataFrame[TimeDensityReturnGDFSchema], result)

calculate_feature_density

calculate_feature_density(geodataframe: Annotated[AnyGeoDataFrame, Field(description='The feature data to count or sum per grid cell.', exclude=True)], meshgrid: Annotated[AnyGeoDataFrame, Field(description='The grid cells used to aggregate the feature data.', exclude=True)], geometry_type: Annotated[Literal['point', 'line'], Field(description='The geometry type of the provided geodataframe')], sum_column: Annotated[str | SkipJsonSchema[None], Field(description='Sum values in this column per grid cell, rather than counting rows')] = None) -> AnyGeoDataFrame

Count features or sum column values per grid cell.

Source code in ecoscope/platform/tasks/analysis/_calculate_feature_density.py
@register()
def calculate_feature_density(
    geodataframe: Annotated[
        AnyGeoDataFrame,
        Field(description="The feature data to count or sum per grid cell.", exclude=True),
    ],
    meshgrid: Annotated[
        AnyGeoDataFrame,
        Field(
            description="The grid cells used to aggregate the feature data.",
            exclude=True,
        ),
    ],
    geometry_type: Annotated[
        Literal["point", "line"],
        Field(description="The geometry type of the provided geodataframe"),
    ],
    sum_column: Annotated[
        str | SkipJsonSchema[None],
        Field(description="Sum values in this column per grid cell, rather than counting rows"),
    ] = None,
) -> AnyGeoDataFrame:
    """
    Count features or sum column values per grid cell.
    """
    from ecoscope.analysis.feature_density import (
        calculate_feature_density,
    )

    result = calculate_feature_density(
        selection=geodataframe,
        grid=meshgrid,
        geometry_type=geometry_type,
        sum_column=sum_column,
    )

    return result

calculate_linear_time_density

calculate_linear_time_density(trajectory_gdf: TrajectoryAnnotation, meshgrid: MeshGridAnnotation, percentiles: LtdPercentileAnnotation = None) -> AnyGeoDataFrame
Source code in ecoscope/platform/tasks/analysis/_time_density.py
@register()
def calculate_linear_time_density(
    trajectory_gdf: TrajectoryAnnotation,
    meshgrid: MeshGridAnnotation,
    percentiles: LtdPercentileAnnotation = None,
) -> AnyGeoDataFrame:
    from ecoscope import Trajectory
    from ecoscope.analysis.classifier import (
        classify_percentile,
    )
    from ecoscope.analysis.linear_time_density import (
        calculate_ltd,
    )

    if percentiles is not None and len(percentiles) == 0:
        raise ValueError("Percentile values, if provided, cannot be empty.")
    percentiles = (
        sorted(list(set(percentiles)))  # type: ignore[assignment]
        if percentiles is not None
        else [50.0, 60.0, 70.0, 80.0, 90.0, 100.0]
    )

    density_grid = calculate_ltd(traj=Trajectory(trajectory_gdf), grid=meshgrid)
    result = classify_percentile(
        df=density_grid,
        percentile_levels=percentiles,  # type: ignore[arg-type]
        input_column_name="density",
    )
    return cast(AnyGeoDataFrame, result)

create_meshgrid

create_meshgrid(aoi: AoiAnnotation, auto_scale_or_custom_cell_size: AutoScaleOrCustomAnnotation = None, crs: CrsAnnotation = 'EPSG:3857', intersecting_only: IntersectingOnlyAnnotation = False) -> AnyGeoDataFrame

Create a grid from the provided area of interest.

Source code in ecoscope/platform/tasks/analysis/_create_meshgrid.py
@register()
def create_meshgrid(
    aoi: AoiAnnotation,
    auto_scale_or_custom_cell_size: AutoScaleOrCustomAnnotation = None,
    crs: CrsAnnotation = "EPSG:3857",
    intersecting_only: IntersectingOnlyAnnotation = False,
) -> AnyGeoDataFrame:
    """
    Create a grid from the provided area of interest.
    """
    import os

    import geopandas as gpd  # type: ignore[import-untyped]
    from shapely.geometry import box

    from ecoscope.analysis.UD import (
        grid_size_from_geographic_extent,
    )
    from ecoscope.base.utils import create_meshgrid

    if auto_scale_or_custom_cell_size is None:
        auto_scale_or_custom_cell_size = AutoScaleGridCellSize()

    if isinstance(auto_scale_or_custom_cell_size, CustomGridCellSize):
        cell_size = auto_scale_or_custom_cell_size.grid_cell_size

        # Approximate the number of grid cells we'll generate
        # and error if it's above the acceptable threshold
        CONTAINER_MEMORY = int(os.getenv("ECOSCOPE_WORKFLOWS_CONTAINER_MEMORY", 32e10))
        # Roughly, 75% of container mem / 10 columns of traj data / 8 bytes per dataframe 'cell'
        MAX_CELL_COUNT = CONTAINER_MEMORY * 0.75 / 80

        bounds = aoi.to_crs(crs).unary_union.bounds  # type: ignore[operator]

        extent_lat = bounds[3] - bounds[1]
        extent_lon = bounds[2] - bounds[0]
        num_cells_lat = extent_lat / cell_size
        num_cells_lon = extent_lon / cell_size

        if num_cells_lat * num_cells_lon > MAX_CELL_COUNT:
            raise ValueError("Custom grid cell size is too small for the extent of the area of interest")
    else:
        cell_size = grid_size_from_geographic_extent(aoi)

    result = create_meshgrid(
        box(*aoi.total_bounds),
        in_crs=aoi.crs,
        out_crs=crs,
        xlen=cell_size,  # type: ignore[arg-type]
        ylen=cell_size,  # type: ignore[arg-type]
        return_intersecting_only=intersecting_only,
    )

    return gpd.GeoDataFrame(geometry=result)

dataframe_column_first_unique

dataframe_column_first_unique(df: AnyDataFrame, column_name: ColumnName) -> Annotated[int, Field(description='The first unique value in the column')]
Source code in ecoscope/platform/tasks/analysis/_aggregation.py
@register()
def dataframe_column_first_unique(
    df: AnyDataFrame,
    column_name: ColumnName,
) -> Annotated[int, Field(description="The first unique value in the column")]:
    return df[column_name].unique()[0]

dataframe_column_max

dataframe_column_max(df: AnyDataFrame, column_name: ColumnName) -> Annotated[float, Field(description='The max of the column')]
Source code in ecoscope/platform/tasks/analysis/_aggregation.py
@register()
def dataframe_column_max(
    df: AnyDataFrame,
    column_name: ColumnName,
) -> Annotated[float, Field(description="The max of the column")]:
    return df[column_name].max()

dataframe_column_mean

dataframe_column_mean(df: AnyDataFrame, column_name: ColumnName) -> Annotated[float, Field(description='The mean of the column')]
Source code in ecoscope/platform/tasks/analysis/_aggregation.py
@register()
def dataframe_column_mean(
    df: AnyDataFrame,
    column_name: ColumnName,
) -> Annotated[float, Field(description="The mean of the column")]:
    return df[column_name].mean()

dataframe_column_min

dataframe_column_min(df: AnyDataFrame, column_name: ColumnName) -> Annotated[float, Field(description='The min of the column')]
Source code in ecoscope/platform/tasks/analysis/_aggregation.py
@register()
def dataframe_column_min(
    df: AnyDataFrame,
    column_name: ColumnName,
) -> Annotated[float, Field(description="The min of the column")]:
    return df[column_name].min()

dataframe_column_nunique

dataframe_column_nunique(df: AnyDataFrame, column_name: ColumnName) -> Annotated[int, Field(description='The number of unique values in the column')]
Source code in ecoscope/platform/tasks/analysis/_aggregation.py
@register()
def dataframe_column_nunique(
    df: AnyDataFrame,
    column_name: ColumnName,
) -> Annotated[int, Field(description="The number of unique values in the column")]:
    return df[column_name].nunique()

dataframe_column_percentile

dataframe_column_percentile(df: AnyDataFrame, column_name: ColumnName, percentile: float) -> Annotated[int, Field(description='The percentile to calculate (e.g., 50 for median, 90 for 90th percentile).')]
Source code in ecoscope/platform/tasks/analysis/_aggregation.py
@register()
def dataframe_column_percentile(
    df: AnyDataFrame,
    column_name: ColumnName,
    percentile: float,
) -> Annotated[
    int,
    Field(description="The percentile to calculate (e.g., 50 for median, 90 for 90th percentile)."),
]:
    return np.nanpercentile(df[column_name].to_list(), percentile)

dataframe_column_sum

dataframe_column_sum(df: AnyDataFrame, column_name: ColumnName) -> Annotated[float, Field(description='The sum of the column')]
Source code in ecoscope/platform/tasks/analysis/_aggregation.py
@register()
def dataframe_column_sum(
    df: AnyDataFrame,
    column_name: ColumnName,
) -> Annotated[float, Field(description="The sum of the column")]:
    return df[column_name].sum()

dataframe_count

dataframe_count(df: AnyDataFrame) -> Annotated[int, Field(description='The number of rows in the DataFrame')]
Source code in ecoscope/platform/tasks/analysis/_aggregation.py
@register()
def dataframe_count(
    df: AnyDataFrame,
) -> Annotated[int, Field(description="The number of rows in the DataFrame")]:
    return len(df)

get_night_day_ratio

get_night_day_ratio(df: AnyGeoDataFrame) -> Annotated[float, Field(description='Night/Day ratio')]
Source code in ecoscope/platform/tasks/analysis/_aggregation.py
@register()
def get_night_day_ratio(
    df: AnyGeoDataFrame,
) -> Annotated[float, Field(description="Night/Day ratio")]:
    from astropy.utils import iers  # type: ignore[import-untyped]

    from ecoscope.analysis import astronomy

    # See classify_is_night for rationale on disabling auto-download.
    with iers.conf.set_temp("auto_download", False):
        return astronomy.get_nightday_ratio(df)

summarize_df

summarize_df(df: AnyDataFrame, summary_params: Annotated[list[SummaryParam], Field(description='The parameters that define how to calculate summary statistics.')], groupby_cols: Annotated[list[str] | SkipJsonSchema[None], Field(default=None, description='The columns to group by. If None, the summary is calculated for the entire DataFrame.')] = None, reset_index: Annotated[bool | SkipJsonSchema[None], AdvancedField(default=False, description='Whether to reset the dataframe index after summarizing.')] = False) -> Annotated[AnyDataFrame, Field(description='Summary Table')]
Source code in ecoscope/platform/tasks/analysis/_summary.py
@register()
def summarize_df(
    df: AnyDataFrame,
    summary_params: Annotated[
        list[SummaryParam],
        Field(
            description="The parameters that define how to calculate summary statistics.",
        ),
    ],
    groupby_cols: Annotated[
        list[str] | SkipJsonSchema[None],
        Field(
            default=None,
            description="The columns to group by. If None, the summary is calculated for the entire DataFrame.",
        ),
    ] = None,
    reset_index: Annotated[
        bool | SkipJsonSchema[None],
        AdvancedField(
            default=False,
            description="Whether to reset the dataframe index after summarizing.",
        ),
    ] = False,
) -> Annotated[AnyDataFrame, Field(description="Summary Table")]:
    def summarize_column(df, param):
        result = 0
        if param.aggregator == "night_day_ratio":
            result = get_night_day_ratio(df)
        else:
            result = df[param.column].agg(param.aggregator)

        if param.original_unit and param.new_unit:
            result = with_unit(result, param.original_unit, param.new_unit).value

        if param.decimal_places:
            result = round(result, param.decimal_places)

        return result

    def summarize(df, summary_params):
        return pd.Series({param.display_name: summarize_column(df, param) for param in summary_params})

    if groupby_cols:
        result_df = df.groupby(groupby_cols).apply(  # type: ignore[call-overload]
            lambda x: summarize(x, summary_params), include_groups=False
        )
    else:
        series = summarize(df, summary_params)
        result_df = pd.DataFrame([series], columns=series.index)

    if reset_index:
        result_df.reset_index(drop=False, inplace=True)
    return cast(AnyDataFrame, result_df)