Skip to content

Transformation Tasks

ecoscope.platform.tasks.transformation

Classes

BoundingBox

Bases: BaseModel

Attributes
max_x class-attribute instance-attribute
max_x: Annotated[float, AdvancedField(default=180.0, title='Max Longitude')] = 180.0
max_y class-attribute instance-attribute
max_y: Annotated[float, AdvancedField(default=90.0, title='Max Latitude')] = 90.0
min_x class-attribute instance-attribute
min_x: Annotated[float, AdvancedField(default=-180.0, title='Min Longitude')] = -180.0
min_y class-attribute instance-attribute
min_y: Annotated[float, AdvancedField(default=-90.0, title='Min Latitude')] = -90.0

Coordinate

Bases: BaseModel

Attributes
x instance-attribute
x: Annotated[float, Field(title=Longitude, description='Example 37.30906')]
y instance-attribute
y: Annotated[float, Field(title=Latitude, description=Example - 0.15293)]

RenameColumn

Bases: BaseModel

Attributes
new_name instance-attribute
new_name: str
original_name instance-attribute
original_name: str

Functions:

add_spatial_index

add_spatial_index(gdf: Annotated[TrajectoryGDF | EventGDF | EventsWithDisplayNamesGDF, Field(description='The dataframe to add the spatial index to.')], groupers: Annotated[AllGrouper | UserDefinedGroupers, Field(description='            A list of groupers which may contain SpatialGroupers. If SpatialGroupers are present,\n            additional indexes will be added to the `gdf` by taking a spatial join of each region\n            in the SpatialGrouper, and adding the joined region name\n            If no SpatialGroupers are present, this task will return the input `gdf` unchanged.\n            This parameter is excluded from the generated RJSF because it should only be set\n            programmatically in the `spec.yaml` file.\n            Note also that the type of this parameter is `AllGrouper | UserDefinedGroupers` to allow\n            passing a list of any type of Grouper from upstream tasks in the DAG; any elements of\n            the list which are not SpatialGrouper will simply be ignored here.\n            ', exclude=True)]) -> AnyGeoDataFrame
Source code in ecoscope/platform/tasks/transformation/_indexing.py
@register()
def add_spatial_index(
    gdf: Annotated[
        TrajectoryGDF | EventGDF | EventsWithDisplayNamesGDF,
        Field(description="The dataframe to add the spatial index to."),
    ],
    groupers: Annotated[
        AllGrouper | UserDefinedGroupers,
        Field(
            description="""\
            A list of groupers which may contain SpatialGroupers. If SpatialGroupers are present,
            additional indexes will be added to the `gdf` by taking a spatial join of each region
            in the SpatialGrouper, and adding the joined region name
            If no SpatialGroupers are present, this task will return the input `gdf` unchanged.
            This parameter is excluded from the generated RJSF because it should only be set
            programmatically in the `spec.yaml` file.
            Note also that the type of this parameter is `AllGrouper | UserDefinedGroupers` to allow
            passing a list of any type of Grouper from upstream tasks in the DAG; any elements of
            the list which are not SpatialGrouper will simply be ignored here.
            """,
            exclude=True,
        ),
    ],
) -> AnyGeoDataFrame:
    import geopandas as gpd  # type: ignore[import-untyped]
    import numpy as np

    from ecoscope.trajectory import Trajectory

    if not isinstance(groupers, AllGrouper):
        spatial_groupers = [g for g in groupers if isinstance(g, SpatialGrouper)]
        for sg in spatial_groupers:
            if sg.is_resolved and sg.spatial_regions is not None:
                # spatial_regions is typed as AnyGeoDataFrame,
                # but in our opinionated use here we expect a RegionsGDF
                TypeAdapter(RegionsGDF).validate_python(sg.spatial_regions)

                regions = sg.spatial_regions.copy()
                # Build the column to be used as the spatial index
                # Region display names can be empty, so use the UUID as a fallback
                regions[sg.index_name] = np.where(
                    regions["name"] != "",
                    regions["name"],
                    regions["pk"],
                )
                # Reduce down to only the spatial index and the geometry
                # This results in just a single column (the index) being added after the overlay/sjoin
                regions = regions.reset_index(drop=True)
                regions = regions.set_index(sg.index_name)
                regions = regions.drop(columns=["pk", "name", "short_name", "feature_type", "metadata"])

                if all(col in gdf.columns for col in ["segment_start", "segment_end"]):
                    traj = Trajectory(gdf=gdf)
                    overlay = traj.apply_spatial_classification(
                        spatial_regions=regions, output_column_name=sg.index_name
                    )
                else:
                    # Overlay has a call to reset_index,
                    # and we can't control the behavior of it from the params available in overlay
                    # So, drop any default columns that may have been created by earlier calls to reset_index
                    # In order to ensure that the reset_index inside overlay doesn't raise
                    gdf = gdf.drop(columns=["index", "level_0"], errors="ignore")
                    overlay = gpd.sjoin(
                        gdf,
                        regions,
                        how="inner",
                        predicate="intersects",
                    )
                gdf = overlay.set_index(sg.index_name, append=True)

    return cast(AnyGeoDataFrame, gdf)

add_temporal_index

add_temporal_index(df: Annotated[AnyDataFrame, Field(description='The dataframe to add the temporal index to.')], time_col: Annotated[str, Field(description='The name of the column containing time data.')], groupers: Annotated[AllGrouper | UserDefinedGroupers, Field(description='            A list of groupers which may contain TemporalGroupers. If TemporalGroupers are present,\n            additional indexes will be added to the `df` by formatting the `time_col` according to\n            the `index_name` attribute of each TemporalGrouper. If no TemporalGroupers are present,\n            this task will return the input `df` unchanged. This parameter is excluded from the\n            generated RJSF because it should only be set programmatically in the `spec.yaml` file.\n            Note also that the type of this parameter is `AllGrouper | UserDefinedGroupers` to allow\n            passing a list of any type of Grouper from upstream tasks in the DAG; any elements of\n            the list which are not TemporalGroupers will simply be ignored here.\n            ', exclude=True)], cast_to_datetime: Annotated[bool, AdvancedField(default=True, description='Whether to attempt casting `time_col` to datetime.')] = True, format: Annotated[str, AdvancedField(default=mixed, description='            If `cast_to_datetime=True`, the format to pass to `pd.to_datetime`\n            when attempting to cast `time_col` to datetime. Defaults to "mixed".\n            ')] = 'mixed') -> AnyDataFrame
Source code in ecoscope/platform/tasks/transformation/_indexing.py
@register()
def add_temporal_index(
    df: Annotated[AnyDataFrame, Field(description="The dataframe to add the temporal index to.")],
    time_col: Annotated[str, Field(description="The name of the column containing time data.")],
    groupers: Annotated[
        AllGrouper | UserDefinedGroupers,
        Field(
            description="""\
            A list of groupers which may contain TemporalGroupers. If TemporalGroupers are present,
            additional indexes will be added to the `df` by formatting the `time_col` according to
            the `index_name` attribute of each TemporalGrouper. If no TemporalGroupers are present,
            this task will return the input `df` unchanged. This parameter is excluded from the
            generated RJSF because it should only be set programmatically in the `spec.yaml` file.
            Note also that the type of this parameter is `AllGrouper | UserDefinedGroupers` to allow
            passing a list of any type of Grouper from upstream tasks in the DAG; any elements of
            the list which are not TemporalGroupers will simply be ignored here.
            """,
            exclude=True,
        ),
    ],
    cast_to_datetime: Annotated[
        bool,
        AdvancedField(
            default=True,
            description="Whether to attempt casting `time_col` to datetime.",
        ),
    ] = True,
    format: Annotated[
        str,
        AdvancedField(
            default="mixed",
            description="""\
            If `cast_to_datetime=True`, the format to pass to `pd.to_datetime`
            when attempting to cast `time_col` to datetime. Defaults to "mixed".
            """,
        ),
    ] = "mixed",
) -> AnyDataFrame:
    import pandas as pd

    if cast_to_datetime:
        df[time_col] = pd.to_datetime(df[time_col], format=format)

    if not isinstance(groupers, AllGrouper):
        temporal_groupers = [g for g in groupers if isinstance(g, TemporalGrouper)]
        for tg in temporal_groupers:
            df[tg.index_name] = df[time_col].dt.strftime(tg.temporal_index.directive)
            df = df.set_index(tg.index_name, append=True)  # type: ignore[assignment]

    return cast(AnyDataFrame, df)

apply_classification

apply_classification(df: Annotated[AnyDataFrame, Field(description='The dataframe to classify.', exclude=True)], input_column_name: Annotated[str, Field(description='The dataframe column to classify.')], output_column_name: Annotated[str | SkipJsonSchema[None], Field(description='The dataframe column that will contain the classification values.')] = None, label_options: Annotated[DefaultLabels | CustomLabels, AdvancedField(default=None, description='Optional specification or formatting of classification values.')] = DefaultLabels(), classification_options: Annotated[ClassificationArgs, Field(description='Classification scheme and its arguments.')] = SharedArgs()) -> AnyDataFrame

Classifies a dataframe column using specified classification scheme.

Parameters:

Name Type Description Default
dataframe DatFrame

The input data.

required
input_column_name str

The dataframe column to classify.

required
output_column_name str

The dataframe column that will contain the classification. Defaults to "_classified"

None
labels list[str]

labels of bins, use bin edges if labels==None.

required
label_options DefaultLabels | CustomLabels

Optional specification or formatting of classification values.

DefaultLabels()
classification_options Annotated[ClassificationArgs, Field(description='Classification scheme and its arguments.')]

Classification scheme and its arguments. See below:

Applicable to equal_interval, natural_breaks, quantile, max_breaks & fisher_jenks: k (int): The number of classes required

Applicable only to natural_breaks: initial (int): The number of initial solutions generated with different centroids. The best of initial results are returned.

Applicable only to max_breaks: mindiff (float): The minimum difference between class breaks.

Applicable only to std_mean: multiples (numpy.array): The multiples of the standard deviation to add/subtract from the sample mean to define the bins. anchor (bool): Anchor upper bound of one class to the sample mean.

For more information, see https://pysal.org/mapclassify/api.html

SharedArgs()

Returns:

Type Description
AnyDataFrame

The input dataframe with a classification column appended.

Source code in ecoscope/platform/tasks/transformation/_classification.py
@register()
def apply_classification(
    df: Annotated[
        AnyDataFrame,
        Field(description="The dataframe to classify.", exclude=True),
    ],
    input_column_name: Annotated[str, Field(description="The dataframe column to classify.")],
    output_column_name: Annotated[
        str | SkipJsonSchema[None],
        Field(description="The dataframe column that will contain the classification values."),
    ] = None,
    label_options: Annotated[
        DefaultLabels | CustomLabels,
        AdvancedField(
            default=None,
            description="Optional specification or formatting of classification values.",
        ),
    ] = DefaultLabels(),
    classification_options: Annotated[
        ClassificationArgs,
        Field(description="Classification scheme and its arguments."),
    ] = SharedArgs(),
) -> AnyDataFrame:
    """
    Classifies a dataframe column using specified classification scheme.

    Args:
        dataframe (pd.DatFrame): The input data.
        input_column_name (str): The dataframe column to classify.
        output_column_name (str): The dataframe column that will contain the classification.
            Defaults to "<input_column_name>_classified"
        labels (list[str]): labels of bins, use bin edges if labels==None.
        label_options (DefaultLabels | CustomLabels): Optional specification or formatting of classification values.
        classification_options:
            Classification scheme and its arguments.
            See below:

            Applicable to equal_interval, natural_breaks, quantile, max_breaks & fisher_jenks:
                k (int): The number of classes required

            Applicable only to natural_breaks:
                initial (int): The number of initial solutions generated with different centroids.
                    The best of initial results are returned.

            Applicable only to max_breaks:
                mindiff (float): The minimum difference between class breaks.

            Applicable only to std_mean:
                multiples (numpy.array): The multiples of the standard deviation to add/subtract
                    from the sample mean to define the bins.
                anchor (bool): Anchor upper bound of one class to the sample mean.

            For more information, see https://pysal.org/mapclassify/api.html

    Returns:
        The input dataframe with a classification column appended.
    """
    from ecoscope.analysis.classifier import (
        apply_classification,
    )

    return apply_classification(  # type: ignore[return-value]
        df,
        input_column_name=input_column_name,
        output_column_name=output_column_name,
        scheme=classification_options.scheme,
        **classification_options.model_dump(exclude_none=True),  # type: ignore[union-attr]
        **label_options.model_dump(exclude_none=True),
    )

apply_color_map

apply_color_map(df: Annotated[AnyDataFrame, Field(description='The dataframe to apply the color map to.', exclude=True)], input_column_name: Annotated[str, Field(description='The name of the column with categorical values.')], colormap: Annotated[str | SkipJsonSchema[dict[ColorValue, HexColor]] | SkipJsonSchema[list[HexColor]], Field(description='A named matplotlib colormap.')] = 'viridis', output_column_name: Annotated[str | SkipJsonSchema[None], Field(description='The dataframe column that will contain the color values.')] = None) -> AnyDataFrame

Adds a color column to the dataframe based on the categorical values in the specified column.

Args: dataframe (pd.DataFrame): The input dataframe. column_name (str): The name of the column with categorical values. colormap (str): Either a named mpl.colormap or a list of string hex values. output_column_name (str): The dataframe column that will contain the classification. Defaults to "_colormap"

Returns: pd.DataFrame: The dataframe with an additional color column.

Source code in ecoscope/platform/tasks/transformation/_classification.py
@register()
def apply_color_map(
    df: Annotated[
        AnyDataFrame,
        Field(description="The dataframe to apply the color map to.", exclude=True),
    ],
    input_column_name: Annotated[str, Field(description="The name of the column with categorical values.")],
    colormap: Annotated[
        str | SkipJsonSchema[dict[ColorValue, HexColor]] | SkipJsonSchema[list[HexColor]],
        Field(description="A named matplotlib colormap."),
    ] = "viridis",
    output_column_name: Annotated[
        str | SkipJsonSchema[None],
        Field(description="The dataframe column that will contain the color values."),
    ] = None,
) -> AnyDataFrame:
    """
    Adds a color column to the dataframe based on the categorical values in the specified column.

    Args:
    dataframe (pd.DataFrame): The input dataframe.
    column_name (str): The name of the column with categorical values.
    colormap (str): Either a named mpl.colormap or a list of string hex values.
    output_column_name (str): The dataframe column that will contain the classification.
            Defaults to "<input_column_name>_colormap"

    Returns:
    pd.DataFrame: The dataframe with an additional color column.
    """
    from ecoscope.analysis.classifier import (
        apply_color_map,
    )

    return apply_color_map(  # type: ignore[return-value]
        dataframe=df,
        input_column_name=input_column_name,
        cmap=colormap,
        output_column_name=output_column_name,
    )

apply_reloc_coord_filter

apply_reloc_coord_filter(df: AnyGeoDataFrame, bounding_box: Annotated[BoundingBox | SkipJsonSchema[None], AdvancedField(default=BoundingBox(), description='Filter events to inside these bounding coordinates.')] = None, filter_point_coords: Annotated[list[Coordinate] | SkipJsonSchema[None], AdvancedField(default=[], title='Filter Exact Point Coordinates', description='By adding a filter, the workflow will not include events recorded at the specified coordinates.')] = None, roi_gdf: Annotated[AnyGeoDataFrame | SkipJsonSchema[None], AdvancedField(default=None, description='The ROI geopandas dataframe, in EPSG: 4326, indexed by ROI name')] = None, roi_name: Annotated[str | SkipJsonSchema[None], AdvancedField(default=None, description='The ROI name')] = None, reset_index: Annotated[bool | SkipJsonSchema[None], AdvancedField(default=True, description='Reset index after filtering')] = True) -> AnyGeoDataFrame
Source code in ecoscope/platform/tasks/transformation/_filtering.py
@register()
def apply_reloc_coord_filter(
    df: AnyGeoDataFrame,
    bounding_box: Annotated[
        BoundingBox | SkipJsonSchema[None],
        AdvancedField(
            default=BoundingBox(),
            description="Filter events to inside these bounding coordinates.",
        ),
    ] = None,
    filter_point_coords: Annotated[
        list[Coordinate] | SkipJsonSchema[None],
        AdvancedField(
            default=[],
            title="Filter Exact Point Coordinates",
            description=(
                "By adding a filter, the workflow will not include events recorded at the specified coordinates."
            ),
        ),
    ] = None,
    roi_gdf: Annotated[
        AnyGeoDataFrame | SkipJsonSchema[None],
        AdvancedField(
            default=None,
            description="The ROI geopandas dataframe, in EPSG: 4326, indexed by ROI name",
        ),
    ] = None,
    roi_name: Annotated[
        str | SkipJsonSchema[None],
        AdvancedField(default=None, description="The ROI name"),
    ] = None,
    reset_index: Annotated[
        bool | SkipJsonSchema[None],
        AdvancedField(default=True, description="Reset index after filtering"),
    ] = True,
) -> AnyGeoDataFrame:
    import geopandas  # type: ignore[import-untyped]
    import shapely  # type: ignore[import-untyped]
    import shapely.wkt  # type: ignore[import-untyped]

    if filter_point_coords is None:
        filter_point_coords = []
    if bounding_box is None:
        bounding_box = BoundingBox()

    # TODO: move it to ecoscope core
    filter_point_coords = geopandas.GeoSeries(shapely.geometry.Point(coord.x, coord.y) for coord in filter_point_coords)

    def envelope_reloc_filter(geometry) -> bool:
        # We want to 'pass-through' null geometry here
        if geometry is None:
            return True

        # For non-Point geometries (e.g. Polygon, MultiPolygon), keep rows whose
        # geometry intersects the bounding box. The exact-coord filter only
        # makes sense for point relocations, so it doesn't apply here.
        if geometry.geom_type != "Point":
            return geometry.intersects(
                shapely.geometry.box(
                    bounding_box.min_x,
                    bounding_box.min_y,
                    bounding_box.max_x,
                    bounding_box.max_y,
                )
            )

        return (
            geometry.x > bounding_box.min_x
            and geometry.x < bounding_box.max_x
            and geometry.y > bounding_box.min_y
            and geometry.y < bounding_box.max_y
            and geometry not in filter_point_coords  # type: ignore[operator]
        )

    filtered_df = df.loc[df["geometry"].apply(envelope_reloc_filter), :]  # type: ignore[index,assignment]

    if roi_gdf is not None and roi_name is not None:
        roi = roi_gdf.loc[roi_name, "geometry"]
        filtered_df = filtered_df.loc[filtered_df.intersects(roi), :]  # type: ignore[operator,index,assignment]

    if reset_index:
        filtered_df = filtered_df.reset_index(drop=True)
    return cast(AnyGeoDataFrame, filtered_df)

assign_subject_colors

assign_subject_colors(df: AnyDataFrame, subject_id_column: Annotated[str, Field(description="Column containing subject identifiers (e.g., 'groupby_col', 'subject__id')")] = 'subject__id', additional_column: Annotated[str, Field(description="Column containing subject additional data as JSON (e.g., 'subject__additional')")] = 'subject__additional', output_column: Annotated[str, Field(description='Name of the output column for assigned colors')] = 'subject_color', fallback_strategy: Annotated[Literal['default_color', 'palette'], Field(description="Strategy for subjects with missing or duplicate rgb values: 'default_color' keeps original rgb (even duplicates) and uses default_color for missing; 'palette' assigns palette colors to both duplicates and missing")] = 'default_color', default_color: Annotated[str, AdvancedField(description="Hex color for subjects without rgb (used when fallback_strategy='default_color')", default='#FFFF00')] = '#FFFF00', default_palette: Annotated[str, AdvancedField(description="Color palette for fallback colors (used when fallback_strategy='palette')", default=tab20)] = 'tab20') -> AnyDataFrame

Assign colors to subjects based on rgb field from subject__additional JSON.

Strategy: 1. Parse rgb from subject__additional JSON field 2. Identify subjects with unique vs duplicate rgb values 3. Based on fallback_strategy: - 'default_color': Keep original rgb (even duplicates), use default_color for missing - 'palette': Only unique rgb kept, duplicates and missing get palette colors 4. Return dataframe with new color column

Parameters:

Name Type Description Default
df AnyDataFrame

Input dataframe with subject observations

required
subject_id_column Annotated[str, Field(description="Column containing subject identifiers (e.g., 'groupby_col', 'subject__id')")]

Column name containing subject identifiers

'subject__id'
additional_column Annotated[str, Field(description="Column containing subject additional data as JSON (e.g., 'subject__additional')")]

Column name containing JSON with rgb data

'subject__additional'
output_column Annotated[str, Field(description='Name of the output column for assigned colors')]

Name for the output color column

'subject_color'
fallback_strategy Annotated[Literal['default_color', 'palette'], Field(description="Strategy for subjects with missing or duplicate rgb values: 'default_color' keeps original rgb (even duplicates) and uses default_color for missing; 'palette' assigns palette colors to both duplicates and missing")]

Strategy for handling duplicates and missing rgb values

'default_color'
default_color Annotated[str, AdvancedField(description="Hex color for subjects without rgb (used when fallback_strategy='default_color')", default='#FFFF00')]

Hex color string for missing rgb (when fallback_strategy='default_color')

'#FFFF00'
default_palette Annotated[str, AdvancedField(description="Color palette for fallback colors (used when fallback_strategy='palette')", default=tab20)]

Matplotlib palette name for fallback colors (when fallback_strategy='palette')

'tab20'

Returns:

Type Description
AnyDataFrame

DataFrame with added color column

Source code in ecoscope/platform/tasks/transformation/_subjects.py
@register()
def assign_subject_colors(
    df: AnyDataFrame,
    subject_id_column: Annotated[
        str,
        Field(description="Column containing subject identifiers (e.g., 'groupby_col', 'subject__id')"),
    ] = "subject__id",
    additional_column: Annotated[
        str,
        Field(description="Column containing subject additional data as JSON (e.g., 'subject__additional')"),
    ] = "subject__additional",
    output_column: Annotated[
        str,
        Field(description="Name of the output column for assigned colors"),
    ] = "subject_color",
    fallback_strategy: Annotated[
        Literal["default_color", "palette"],
        Field(
            description="Strategy for subjects with missing or duplicate rgb values: "
            "'default_color' keeps original rgb (even duplicates) and uses default_color for missing; "
            "'palette' assigns palette colors to both duplicates and missing"
        ),
    ] = "default_color",
    default_color: Annotated[
        str,
        AdvancedField(
            description="Hex color for subjects without rgb (used when fallback_strategy='default_color')",
            default="#FFFF00",
        ),
    ] = "#FFFF00",
    default_palette: Annotated[
        str,
        AdvancedField(
            description="Color palette for fallback colors (used when fallback_strategy='palette')",
            default="tab20",
        ),
    ] = "tab20",
) -> AnyDataFrame:
    """
    Assign colors to subjects based on rgb field from subject__additional JSON.

    Strategy:
    1. Parse rgb from subject__additional JSON field
    2. Identify subjects with unique vs duplicate rgb values
    3. Based on fallback_strategy:
       - 'default_color': Keep original rgb (even duplicates), use default_color for missing
       - 'palette': Only unique rgb kept, duplicates and missing get palette colors
    4. Return dataframe with new color column

    Args:
        df: Input dataframe with subject observations
        subject_id_column: Column name containing subject identifiers
        additional_column: Column name containing JSON with rgb data
        output_column: Name for the output color column
        fallback_strategy: Strategy for handling duplicates and missing rgb values
        default_color: Hex color string for missing rgb (when fallback_strategy='default_color')
        default_palette: Matplotlib palette name for fallback colors (when fallback_strategy='palette')

    Returns:
        DataFrame with added color column
    """
    if subject_id_column not in df.columns:
        raise ValueError(f"Subject ID column '{subject_id_column}' not found in dataframe")

    # Define NAN_COLOR to match apply_color_map behavior
    NAN_COLOR = (0, 0, 0, 0)

    # Step 1: Parse rgb values from subject__additional
    subject_rgb_map = {}

    if additional_column in df.columns:
        for subject_id in df[subject_id_column].unique():
            subject_rows = df[df[subject_id_column] == subject_id]
            # Get the first non-null additional data for this subject
            for additional_data in subject_rows[additional_column]:
                if pd.notna(additional_data):
                    try:
                        if isinstance(additional_data, str):
                            additional_dict = json.loads(additional_data)
                        elif isinstance(additional_data, dict):
                            additional_dict = additional_data
                        else:
                            continue

                        rgb_value = additional_dict.get("rgb")
                        if rgb_value:
                            subject_rgb_map[subject_id] = rgb_value
                            break
                    except (json.JSONDecodeError, AttributeError) as e:
                        logger.warning(f"Failed to parse additional data for subject {subject_id}: {e}")
                        continue
    else:
        logger.warning(f"Column '{additional_column}' not found in dataframe. All subjects will use palette colors.")

    # Step 2: Identify duplicate rgb values
    rgb_counts = pd.Series(subject_rgb_map).value_counts()
    duplicate_rgb_values = set(rgb_counts[rgb_counts > 1].index)

    # Helper functions
    def hex_to_rgba(hex_color: str) -> tuple[float, float, float, float]:
        hex_color = hex_color.lstrip("#")
        r, g, b = (
            int(hex_color[0:2], 16),
            int(hex_color[2:4], 16),
            int(hex_color[4:6], 16),
        )
        return (r / 255.0, g / 255.0, b / 255.0, 1.0)

    def parse_rgb_str(rgb_str: str) -> tuple[float, float, float, float] | None:
        try:
            r, g, b = [int(x.strip()) for x in rgb_str.split(",")]
            return (r / 255.0, g / 255.0, b / 255.0, 1.0)
        except (ValueError, AttributeError):
            return None

    default_color_rgba = hex_to_rgba(default_color)

    # Step 3: Get palette colors (only if needed)
    palette_colors = None
    if fallback_strategy == "palette":
        try:
            cmap = plt.get_cmap(default_palette)
            palette_colors = [cmap(i) for i in range(cmap.N)]
        except ValueError:
            logger.warning(f"Palette '{default_palette}' not found, falling back to 'tab20'")
            cmap = plt.get_cmap("tab20")
            palette_colors = [cmap(i) for i in range(cmap.N)]

    # Step 4: Build final color mapping
    final_color_map = {}
    subjects_needing_fallback = []

    for subject_id in df[subject_id_column].unique():
        rgb_str = subject_rgb_map.get(subject_id)

        if rgb_str is None:
            subjects_needing_fallback.append(subject_id)
            continue

        is_duplicate = rgb_str in duplicate_rgb_values
        if is_duplicate and fallback_strategy == "palette":
            subjects_needing_fallback.append(subject_id)
            continue

        parsed = parse_rgb_str(rgb_str)
        if parsed:
            final_color_map[subject_id] = parsed
        else:
            logger.warning(f"Failed to parse rgb '{rgb_str}' for subject {subject_id}.")
            subjects_needing_fallback.append(subject_id)

    # Assign fallback colors
    for idx, subject in enumerate(subjects_needing_fallback):
        if fallback_strategy == "palette" and palette_colors:
            final_color_map[subject] = palette_colors[idx % len(palette_colors)]
        else:
            final_color_map[subject] = default_color_rgba

    # Step 5: Apply color mapping to dataframe, using NAN_COLOR for null subjects
    def apply_color(subject_id):
        if pd.isna(subject_id):
            return NAN_COLOR
        color = final_color_map.get(subject_id)
        if color is None:
            return NAN_COLOR
        # Convert from 0-1 range to 0-255 range, keeping as tuple
        return tuple(round(chan * 255) for chan in color)

    df[output_column] = df[subject_id_column].apply(apply_color)
    return cast(AnyDataFrame, df)

assign_value

assign_value(df: AnyDataFrame, column_name: Annotated[str, Field(description='The column name to map.')], value: Annotated[str | int | float | bool | SkipJsonSchema[None], Field(description='The column value.')], noop_if_column_exists: Annotated[bool, Field(description='If set to true and column_name exists on df, do nothing', default=False)] = False) -> AnyDataFrame
Source code in ecoscope/platform/tasks/transformation/_mapping.py
@register()
def assign_value(
    df: AnyDataFrame,
    column_name: Annotated[str, Field(description="The column name to map.")],
    value: Annotated[
        str | int | float | bool | SkipJsonSchema[None],
        Field(description="The column value."),
    ],
    noop_if_column_exists: Annotated[
        bool,
        Field(
            description="If set to true and column_name exists on df, do nothing",
            default=False,
        ),
    ] = False,
) -> AnyDataFrame:
    if not noop_if_column_exists or column_name not in df.columns:
        df[column_name] = value
    return cast(AnyDataFrame, df)

classify_is_night

classify_is_night(relocations: Annotated[AnyDataFrame, Field(description='The dataframe to classify.', exclude=True)]) -> AnyDataFrame

Classifies if segments occur at night in a trajectory dataframe

Parameters:

Name Type Description Default
dataframe DatFrame

The input data.

required

Returns:

Type Description
AnyDataFrame

The input dataframe with a is_night column appended.

Source code in ecoscope/platform/tasks/transformation/_classification.py
@register()
def classify_is_night(
    relocations: Annotated[
        AnyDataFrame,
        Field(description="The dataframe to classify.", exclude=True),
    ],
) -> AnyDataFrame:
    """
    Classifies if segments occur at night in a trajectory dataframe

    Args:
        dataframe (pd.DatFrame): The input data.

    Returns:
        The input dataframe with a `is_night` column appended.
    """
    from astropy.utils import iers  # type: ignore[import-untyped]

    from ecoscope.analysis.astronomy import is_night

    # Don't download IERS Earth Orientation tables.
    # Bundled IERS data is accurate to ~few-ms, well below
    # the minute-scale precision needed here.
    with iers.conf.set_temp("auto_download", False):
        relocations["is_night"] = is_night(relocations.geometry, relocations.fixtime)

    return relocations

classify_seasons

classify_seasons(trajectory: AnyDataFrame, season_windows: AnyDataFrame) -> AnyDataFrame
Source code in ecoscope/platform/tasks/transformation/_classification.py
@register()
def classify_seasons(
    trajectory: AnyDataFrame,
    season_windows: AnyDataFrame,
) -> AnyDataFrame:
    import pandas as pd

    intervals_list = season_windows.apply(lambda x: pd.Interval(x["start"], x["end"]), axis=1).tolist()

    season_mapping: Dict[pd.Interval, str] = dict(zip(intervals_list, season_windows.season))

    trajectory["season"] = (
        pd.cut(
            trajectory["segment_start"],
            bins=pd.IntervalIndex(list(season_mapping.keys())),
        )
        .map(season_mapping)
        .astype(str)
    )
    return cast(AnyDataFrame, trajectory)

convert_column_values_to_numeric

convert_column_values_to_numeric(df: AnyDataFrame, columns: Annotated[list[str], Field(description='The columns to convert.')]) -> AnyDataFrame

Casts the values of the listed columns to numbers Values that cannot be casted will be converted to NaN

Parameters:

Name Type Description Default
df AnyDataFrame

The input DataFrame.

required
columns list[str]

List of columns to cast.

required

Returns:

Name Type Description
AnyDataFrame AnyDataFrame

The modified DataFrame.

Source code in ecoscope/platform/tasks/transformation/_conversion.py
@register()
def convert_column_values_to_numeric(
    df: AnyDataFrame,
    columns: Annotated[list[str], Field(description="The columns to convert.")],
) -> AnyDataFrame:
    """
    Casts the values of the listed columns to numbers
    Values that cannot be casted will be converted to NaN

    Args:
        df (AnyDataFrame): The input DataFrame.
        columns (list[str]): List of columns to cast.

    Returns:
        AnyDataFrame: The modified DataFrame.
    """
    for col in columns:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    return cast(AnyDataFrame, df)

convert_column_values_to_string

convert_column_values_to_string(df: AnyDataFrame, columns: Annotated[list[str], Field(description='The columns to convert.')]) -> AnyDataFrame

Casts the values of the listed columns to type string None and NaN values will also be converted to string

Parameters:

Name Type Description Default
df AnyDataFrame

The input DataFrame.

required
columns list[str]

List of columns to cast to string.

required

Returns:

Name Type Description
AnyDataFrame AnyDataFrame

The modified DataFrame.

Source code in ecoscope/platform/tasks/transformation/_conversion.py
@register()
def convert_column_values_to_string(
    df: AnyDataFrame,
    columns: Annotated[list[str], Field(description="The columns to convert.")],
) -> AnyDataFrame:
    """
    Casts the values of the listed columns to type string
    None and NaN values will also be converted to string

    Args:
        df (AnyDataFrame): The input DataFrame.
        columns (list[str]): List of columns to cast to string.

    Returns:
        AnyDataFrame: The modified DataFrame.
    """
    for col in columns:
        df[col] = df[col].astype(str)

    return cast(AnyDataFrame, df)

convert_crs

convert_crs(df: AnyGeoDataFrame, crs: CrsAnnotation = 'EPSG:4326') -> AnyGeoDataFrame

Re-project a GeoDataFrame's geometries to the given CRS.

Parameters:

Name Type Description Default
df AnyGeoDataFrame

Input GeoDataFrame. Must have CRS metadata set.

required
crs CrsAnnotation

Target CRS authority code (e.g. "EPSG:4326").

'EPSG:4326'

Returns:

Type Description
AnyGeoDataFrame

GeoDataFrame with geometries re-projected to crs.

Raises:

Type Description
ValueError

If the input GeoDataFrame has no CRS metadata.

Source code in ecoscope/platform/tasks/transformation/_crs.py
@register()
def convert_crs(
    df: AnyGeoDataFrame,
    crs: CrsAnnotation = "EPSG:4326",
) -> AnyGeoDataFrame:
    """
    Re-project a GeoDataFrame's geometries to the given CRS.

    Args:
        df: Input GeoDataFrame. Must have CRS metadata set.
        crs: Target CRS authority code (e.g. ``"EPSG:4326"``).

    Returns:
        GeoDataFrame with geometries re-projected to ``crs``.

    Raises:
        ValueError: If the input GeoDataFrame has no CRS metadata.
    """
    if df.crs is None:  # type: ignore[attr-defined]
        raise ValueError(
            "GeoDataFrame has no CRS information. "
            f"Cannot safely convert to {crs} without knowing the source CRS. "
            "Please ensure the source data includes CRS metadata."
        )
    return cast(AnyGeoDataFrame, df.to_crs(crs))  # type: ignore[operator]

convert_values_to_timezone

convert_values_to_timezone(df: AnyDataFrame, timezone: Annotated[str | tzinfo | TimezoneInfo, Field()], columns: Annotated[list[str], Field(description='The columns to convert.')], auto_detect: Annotated[bool, Field(description='Auto-detect all timezone-aware datetime columns to convert, ignoring the columns list.', exclude=True)] = False) -> AnyDataFrame

Converts the listed columns in the df to the timezone provided NOTE: Timezone naive timestamps are ignored Args: df (AnyDataFrame): The input DataFrame. timezone (str | datetime.tzinfo | TimezoneInfo): The timezone to convert to columns (list[str]): List of columns to cast to string. auto_detect (bool): If True, auto-detect all timezone-aware datetime columns, ignoring the columns list.

Returns:

Name Type Description
AnyDataFrame AnyDataFrame

The modified DataFrame.

Source code in ecoscope/platform/tasks/transformation/_conversion.py
@register()
def convert_values_to_timezone(
    df: AnyDataFrame,
    timezone: Annotated[str | datetime.tzinfo | TimezoneInfo, Field()],
    columns: Annotated[list[str], Field(description="The columns to convert.")],
    auto_detect: Annotated[
        bool,
        Field(
            description="Auto-detect all timezone-aware datetime columns to convert, ignoring the columns list.",
            exclude=True,
        ),
    ] = False,
) -> AnyDataFrame:
    """
    Converts the listed columns in the df to the timezone provided
    NOTE: Timezone naive timestamps are ignored
    Args:
        df (AnyDataFrame): The input DataFrame.
        timezone (str | datetime.tzinfo | TimezoneInfo): The timezone to convert to
        columns (list[str]): List of columns to cast to string.
        auto_detect (bool): If True, auto-detect all timezone-aware datetime columns,
            ignoring the columns list.

    Returns:
        AnyDataFrame: The modified DataFrame.
    """
    if isinstance(timezone, TimezoneInfo):
        timezone = timezone.utc_offset
    if auto_detect and columns:
        raise ValueError("Only one of auto_detect and columns may be passed.")
    if auto_detect:
        columns = [col for col in df.columns if isinstance(df[col].dtype, pd.DatetimeTZDtype)]
    for col in columns:
        if col in df and isinstance(df[col].dtype, pd.DatetimeTZDtype):
            df[col] = df[col].dt.tz_convert(timezone).dt.as_unit("ns")

    return cast(AnyDataFrame, df)

drop_nan_values_by_column

drop_nan_values_by_column(df: AnyDataFrame, column_name: Annotated[str, Field(description='The column to check')]) -> AnyDataFrame
Source code in ecoscope/platform/tasks/transformation/_filtering.py
@register()
def drop_nan_values_by_column(
    df: AnyDataFrame,
    column_name: Annotated[str, Field(description="The column to check")],
) -> AnyDataFrame:
    import numpy as np

    return cast(AnyDataFrame, df[~np.isnan(df[column_name])])

drop_null_geometry

drop_null_geometry(gdf: AnyGeoDataFrame) -> AnyGeoDataFrame
Source code in ecoscope/platform/tasks/transformation/_filtering.py
@register()
def drop_null_geometry(
    gdf: AnyGeoDataFrame,
) -> AnyGeoDataFrame:
    return cast(AnyGeoDataFrame, gdf.loc[(~gdf.geometry.isna()) & (~gdf.geometry.is_empty)])

explode

explode(df: AnyDataFrame, column_name: Annotated[str, Field(description='The column name to explode.')], ignore_index: Annotated[bool, Field(description='Whether to ignore the index.')]) -> AnyDataFrame
Source code in ecoscope/platform/tasks/transformation/_exploding.py
@register()
def explode(
    df: AnyDataFrame,
    column_name: Annotated[str, Field(description="The column name to explode.")],
    ignore_index: Annotated[bool, Field(description="Whether to ignore the index.")],
) -> AnyDataFrame:
    return cast(
        AnyDataFrame,
        df.explode(column_name, ignore_index),
    )

extract_column_as_type

extract_column_as_type(df: Annotated[AnyDataFrame, Field(description='The dataframe.', exclude=True)], column_name: Annotated[str, Field(description='The column name to extract the value from.')], output_type: Annotated[FieldType, Field(description='The output type of the extracted value.')], output_column_name: Annotated[str, Field(description="The output column name to store the extracted value. If it's a pandas series, then the output_column_name will be the column prefix.")]) -> AnyDataFrame
Source code in ecoscope/platform/tasks/transformation/_extract.py
@register()
def extract_column_as_type(
    df: Annotated[
        AnyDataFrame,
        Field(
            description="The dataframe.",
            exclude=True,
        ),
    ],
    column_name: Annotated[str, Field(description="The column name to extract the value from.")],
    output_type: Annotated[FieldType, Field(description="The output type of the extracted value.")],
    output_column_name: Annotated[
        str,
        Field(
            description=(
                "The output column name to store the extracted value."
                " If it's a pandas series, then the output_column_name will be the column prefix."
            )
        ),
    ],
) -> AnyDataFrame:
    output = df[column_name].apply(lambda x: extract_value_as_type(x, output_type))
    if output_type == FieldType.SERIES:
        output_df = output.add_prefix(output_column_name)
        result_df = df.merge(output_df, right_index=True, left_index=True)
    else:
        df[output_column_name] = output
        result_df = df

    return cast(
        AnyDataFrame,
        result_df,
    )

extract_spatial_grouper_feature_group_names

extract_spatial_grouper_feature_group_names(groupers: AllGrouper | UserDefinedGroupers) -> list[FeatureGroupId]

If there are spatial groupers, extract and return feature group names

Source code in ecoscope/platform/tasks/transformation/_indexing.py
@register()
def extract_spatial_grouper_feature_group_names(
    groupers: AllGrouper | UserDefinedGroupers,
) -> list[FeatureGroupId]:
    """If there are spatial groupers, extract and return feature group names"""
    if isinstance(groupers, AllGrouper):
        return []
    return [grouper.spatial_index_name for grouper in groupers if isinstance(grouper, SpatialGrouper)]

extract_value_from_json_column

extract_value_from_json_column(df: Annotated[AnyDataFrame, Field(description='The dataframe.', exclude=True)], column_name: Annotated[str, Field(description='The json column name to extract the value from.')], field_name_options: Annotated[list[str], Field(description='A list of field name options to extract the value from. The first field name that is found will be used.')], output_type: Annotated[FieldType, Field(description='The output type of the extracted value.')], output_column_name: Annotated[str, Field(description="The output column name to store the extracted value. If it's a pandas series, then the output_column_name will be the column prefix.")]) -> AnyDataFrame
Source code in ecoscope/platform/tasks/transformation/_extract.py
@register()
def extract_value_from_json_column(
    df: Annotated[
        AnyDataFrame,
        Field(
            description="The dataframe.",
            exclude=True,
        ),
    ],
    column_name: Annotated[str, Field(description="The json column name to extract the value from.")],
    field_name_options: Annotated[
        list[str],
        Field(
            description=(
                "A list of field name options to extract the value from."
                " The first field name that is found will be used."
            )
        ),
    ],
    output_type: Annotated[FieldType, Field(description="The output type of the extracted value.")],
    output_column_name: Annotated[
        str,
        Field(
            description=(
                "The output column name to store the extracted value."
                " If it's a pandas series, then the output_column_name will be the column prefix."
            )
        ),
    ],
) -> AnyDataFrame:
    def extract_value_from_row(row):
        additional = row[column_name] or {}
        if isinstance(additional, str):
            additional = json.loads(additional)

        value = None
        for field in field_name_options:
            value = additional.get(field, None)
            if value is not None:
                break

        if value is None:
            return value

        return extract_value_as_type(value, output_type)

    output = df.apply(extract_value_from_row, axis=1)

    if output_type == FieldType.SERIES:
        output_df = output.add_prefix(output_column_name)
        result_df = df.merge(output_df, right_index=True, left_index=True)
    else:
        df[output_column_name] = output
        result_df = df

    return cast(
        AnyDataFrame,
        result_df,
    )

fill_na

fill_na(df: AnyDataFrame, value: Annotated[str | int | float | bool | SkipJsonSchema[None], Field(description='The value to fill.')], columns: Annotated[list[str] | SkipJsonSchema[None], Field(description='Provided columns will have nan values filled.')] = None) -> AnyDataFrame

Fill NA values the with the input value.

Parameters:

Name Type Description Default
df AnyDataFrame

The input DataFrame.

required
value str | int | float | bool | None

The value to fill NaN with.

required
columns list[str]

If provided, fill these column only.

None

Returns:

Name Type Description
AnyDataFrame AnyDataFrame

The updated DataFrame.

Source code in ecoscope/platform/tasks/transformation/_mapping.py
@register()
def fill_na(
    df: AnyDataFrame,
    value: Annotated[
        str | int | float | bool | SkipJsonSchema[None],
        Field(description="The value to fill."),
    ],
    columns: Annotated[
        list[str] | SkipJsonSchema[None],
        Field(description="Provided columns will have nan values filled."),
    ] = None,
) -> AnyDataFrame:
    """
    Fill NA values the with the input value.

    Args:
        df (AnyDataFrame): The input DataFrame.
        value (str | int | float | bool | None): The value to fill NaN with.
        columns (list[str]): If provided, fill these column only.

    Returns:
        AnyDataFrame: The updated DataFrame.
    """
    df = df.fillna(value) if columns is None else df.fillna({col: value for col in columns})
    return cast(AnyDataFrame, df)

filter_by_geometry_type

filter_by_geometry_type(df: AnyGeoDataFrame, geometry_types: Annotated[list[str], Field(description="Shapely geometry type names to keep (e.g. ['Point'], ['Polygon', 'MultiPolygon']).")]) -> AnyGeoDataFrame

Filter a GeoDataFrame to rows whose geometry type is in geometry_types.

Parameters:

Name Type Description Default
df AnyGeoDataFrame

Input GeoDataFrame.

required
geometry_types Annotated[list[str], Field(description="Shapely geometry type names to keep (e.g. ['Point'], ['Polygon', 'MultiPolygon']).")]

List of shapely geom_type names to retain.

required

Returns:

Type Description
AnyGeoDataFrame

GeoDataFrame containing only rows whose geometry's geom_type is in

AnyGeoDataFrame

geometry_types.

Source code in ecoscope/platform/tasks/transformation/_filter_by_geometry_type.py
@register()
def filter_by_geometry_type(
    df: AnyGeoDataFrame,
    geometry_types: Annotated[
        list[str],
        Field(
            description=("Shapely geometry type names to keep " "(e.g. ['Point'], ['Polygon', 'MultiPolygon'])."),
        ),
    ],
) -> AnyGeoDataFrame:
    """
    Filter a GeoDataFrame to rows whose geometry type is in ``geometry_types``.

    Args:
        df: Input GeoDataFrame.
        geometry_types: List of shapely ``geom_type`` names to retain.

    Returns:
        GeoDataFrame containing only rows whose geometry's ``geom_type`` is in
        ``geometry_types``.
    """
    filtered = df[df.geometry.geom_type.isin(geometry_types)]
    return cast(AnyGeoDataFrame, filtered)

filter_df

filter_df(df: Annotated[AnyDataFrame, Field(description='The dataframe.', exclude=True)], column_name: Annotated[str, Field(description='The column name to filter on.')], op: Annotated[ComparisonOperator, Field(description='The comparison operator')], value: Annotated[str, Field(description='The comparison operand')], reset_index: Annotated[bool, Field(description='If reset index, default is False')] = False) -> AnyDataFrame
Source code in ecoscope/platform/tasks/transformation/_filter.py
@register()
def filter_df(
    df: Annotated[
        AnyDataFrame,
        Field(
            description="The dataframe.",
            exclude=True,
        ),
    ],
    column_name: Annotated[str, Field(description="The column name to filter on.")],
    op: Annotated[ComparisonOperator, Field(description="The comparison operator")],
    value: Annotated[str, Field(description="The comparison operand")],
    reset_index: Annotated[bool, Field(description="If reset index, default is False")] = False,
) -> AnyDataFrame:
    match op:
        case ComparisonOperator.EQUAL:
            result_df = df[df[column_name] == value]
        case ComparisonOperator.GE:
            result_df = df[df[column_name] >= value]
        case ComparisonOperator.GT:
            result_df = df[df[column_name] > value]
        case ComparisonOperator.LE:
            result_df = df[df[column_name] <= value]
        case ComparisonOperator.LT:
            result_df = df[df[column_name] < value]
        case ComparisonOperator.NE:
            result_df = df[df[column_name] != value]

    if reset_index:
        result_df = result_df.reset_index()

    return cast(AnyDataFrame, result_df)

lookup_string_var

lookup_string_var(var: Annotated[str, Field(...)], value_map: Annotated[dict[str, str], Field(default={}, description='A dictionary of values.')], raise_if_not_found: Annotated[bool, Field(description='Whether or not to raise if var is not in value_map.')] = True) -> str

Lookup var in value_map and return the string mapped by var If raise_if_not_found is true, raises KeyError if var is not in value_map If raise_if_not_found is false, var is passed through unchanged

Parameters:

Name Type Description Default
var str

The input var.

required
value_map dict[str, str]

The map to lookup var in.

required
raise_if_not_found bool

Whether or not to raise in the event var is not found.

True

Returns:

Name Type Description
str str

The mapped value, or var.

Raises: KeyError: If var is not found in value_map.

Source code in ecoscope/platform/tasks/transformation/_mapping.py
@register()
def lookup_string_var(
    var: Annotated[str, Field(...)],
    value_map: Annotated[dict[str, str], Field(default={}, description="A dictionary of values.")],
    raise_if_not_found: Annotated[
        bool, Field(description="Whether or not to raise if var is not in value_map.")
    ] = True,
) -> str:
    """
    Lookup `var` in `value_map` and return the string mapped by `var`
    If `raise_if_not_found` is true, raises `KeyError` if `var` is not in `value_map`
    If `raise_if_not_found` is false, `var` is passed through unchanged

    Args:
        var (str): The input var.
        value_map (dict[str, str]): The map to lookup `var` in.
        raise_if_not_found (bool): Whether or not to raise in the event `var` is not found.

    Returns:
        str: The mapped value, or `var`.
    Raises:
        KeyError: If  `var` is not found in `value_map`.
    """
    if raise_if_not_found:
        return value_map[var]
    else:
        return value_map.get(var, var)

map_columns

map_columns(df: AnyDataFrame, drop_columns: Annotated[list[str] | SkipJsonSchema[None], AdvancedField(default=[], description='List of columns to drop.')] = None, retain_columns: Annotated[list[str] | SkipJsonSchema[None], AdvancedField(default=[], description='List of columns to retain with the order specified by the list.\n                        Keep all the columns if the list is empty.')] = None, rename_columns: Annotated[list[RenameColumn] | SkipJsonSchema[dict[str, str]] | SkipJsonSchema[None], AdvancedField(default={}, description='Dictionary of columns to rename.')] = None, raise_if_not_found: Annotated[bool, Field(description='Whether or not to raise if var is not in value_map.')] = True) -> AnyDataFrame

Maps and transforms the columns of a DataFrame based on the provided parameters. The order of the operations is as follows: drop columns, retain/reorder columns, and rename columns.

Parameters:

Name Type Description Default
df AnyDataFrame

The input DataFrame to be transformed.

required
drop_columns list[str]

List of columns to drop from the DataFrame.

None
retain_columns list[str]

List of columns to retain. The order of columns will be preserved.

None
rename_columns dict[str, str]

Dictionary of columns to rename.

None
raise_if_not_found bool

Whether or not to raise in the event a column is not found.

True

Returns:

Name Type Description
AnyDataFrame AnyDataFrame

The transformed DataFrame.

Raises:

Type Description
KeyError

If any of the columns specified are not found in the DataFrame.

Source code in ecoscope/platform/tasks/transformation/_mapping.py
@register()
def map_columns(
    df: AnyDataFrame,
    drop_columns: Annotated[
        list[str] | SkipJsonSchema[None],
        AdvancedField(default=[], description="List of columns to drop."),
    ] = None,
    retain_columns: Annotated[
        list[str] | SkipJsonSchema[None],
        AdvancedField(
            default=[],
            description="""List of columns to retain with the order specified by the list.
                        Keep all the columns if the list is empty.""",
        ),
    ] = None,
    rename_columns: Annotated[
        list[RenameColumn] | SkipJsonSchema[dict[str, str]] | SkipJsonSchema[None],
        AdvancedField(default={}, description="Dictionary of columns to rename."),
    ] = None,
    raise_if_not_found: Annotated[
        bool, Field(description="Whether or not to raise if var is not in value_map.")
    ] = True,
) -> AnyDataFrame:
    """
    Maps and transforms the columns of a DataFrame based on the provided parameters. The order of the operations is as
    follows: drop columns, retain/reorder columns, and rename columns.

    Args:
        df (AnyDataFrame): The input DataFrame to be transformed.
        drop_columns (list[str]): List of columns to drop from the DataFrame.
        retain_columns (list[str]): List of columns to retain. The order of columns will be preserved.
        rename_columns (dict[str, str]): Dictionary of columns to rename.
        raise_if_not_found (bool): Whether or not to raise in the event a column is not found.

    Returns:
        AnyDataFrame: The transformed DataFrame.

    Raises:
        KeyError: If any of the columns specified are not found in the DataFrame.
    """

    if drop_columns:
        if "geometry" in drop_columns:
            logger.warning("'geometry' found in drop_columns, which may affect spatial operations.")
        df = df.drop(
            columns=drop_columns,
            errors="ignore" if not raise_if_not_found else "raise",
        )

    if retain_columns:
        if raise_if_not_found and any(col not in df.columns for col in retain_columns):
            raise KeyError(f"Columns {retain_columns} not all found in DataFrame.")
        df = df.reindex(columns=retain_columns)  # type: ignore[assignment]

    if rename_columns:
        if isinstance(rename_columns, list):
            rename_columns = {item.original_name: item.new_name for item in rename_columns}

        if "geometry" in rename_columns.keys():
            logger.warning("'geometry' found in rename_columns, which may affect spatial operations.")
        if raise_if_not_found and any(col not in df.columns for col in rename_columns.keys()):
            raise KeyError(
                f"Columns {list(rename_columns.keys())} not all found in DataFrame. Existing columns: {df.columns}"
            )
        df = df.rename(columns=rename_columns)  # type: ignore[assignment]

    return cast(AnyDataFrame, df)

map_values

map_values(df: AnyDataFrame, column_name: Annotated[str, Field(description='The column name to map.')], value_map: Annotated[dict[str, str], Field(default={}, description='A dictionary of values to map.')], missing_values: Annotated[Literal['preserve', 'remove', 'replace'], Field(default=remove, description="How to handle values that aren't in value_map.")], replacement: Annotated[str | SkipJsonSchema[None], Field(default=None, description='The replacement for values not in value_map.')] = None) -> AnyDataFrame
Source code in ecoscope/platform/tasks/transformation/_mapping.py
@register()
def map_values(
    df: AnyDataFrame,
    column_name: Annotated[str, Field(description="The column name to map.")],
    value_map: Annotated[dict[str, str], Field(default={}, description="A dictionary of values to map.")],
    missing_values: Annotated[
        Literal["preserve", "remove", "replace"],
        Field(
            default="remove",
            description="How to handle values that aren't in value_map.",
        ),
    ],
    replacement: Annotated[
        str | SkipJsonSchema[None],
        Field(default=None, description="The replacement for values not in value_map."),
    ] = None,
) -> AnyDataFrame:
    match missing_values:
        case "preserve":
            df[column_name] = df[column_name].map(value_map).fillna(df[column_name])
        case "remove":
            df[column_name] = df[column_name].map(value_map)
        case "replace":
            if replacement is None:
                raise ValueError("replacement param must be provided if missing_values is 'replace'")
            df[column_name] = df[column_name].map(value_map).fillna(replacement)
        case _:
            raise ValueError("Invalid selection for missing_values")

    return cast(AnyDataFrame, df)

map_values_with_unit

map_values_with_unit(df: AnyDataFrame, input_column_name: Annotated[str, Field(description='The column name to map.')], output_column_name: Annotated[str, Field(description='The new column name.')], original_unit: Annotated[Unit | SkipJsonSchema[None], Field(description='The original unit of measurement.')] = None, new_unit: Annotated[Unit | SkipJsonSchema[None], Field(description='The unit to convert to.')] = None, decimal_places: Annotated[int, AdvancedField(default=1, description='The number of decimal places to display.')] = 1) -> AnyDataFrame
Source code in ecoscope/platform/tasks/transformation/_mapping.py
@register()
def map_values_with_unit(
    df: AnyDataFrame,
    input_column_name: Annotated[str, Field(description="The column name to map.")],
    output_column_name: Annotated[str, Field(description="The new column name.")],
    original_unit: Annotated[
        Unit | SkipJsonSchema[None],
        Field(description="The original unit of measurement."),
    ] = None,
    new_unit: Annotated[
        Unit | SkipJsonSchema[None],
        Field(description="The unit to convert to."),
    ] = None,
    decimal_places: Annotated[
        int,
        AdvancedField(default=1, description="The number of decimal places to display."),
    ] = 1,
) -> AnyDataFrame:
    if new_unit is None or original_unit == new_unit:
        # no conversion: just format with the original (or absent) unit
        suffix = f" {original_unit}".rstrip() if original_unit else ""
        values = df[input_column_name].to_numpy()
    elif is_linear_unit_conversion(original_unit, new_unit):
        # multiplicative conversion: probe the factor once and broadcast
        quantity = with_unit(1.0, original_unit=original_unit, new_unit=new_unit)
        suffix = f" {quantity.unit}".rstrip() if quantity.unit else ""
        values = df[input_column_name].to_numpy() * quantity.value
    else:
        # non-linear units ie. dB
        def format_row(x):
            data = with_unit(x, original_unit=original_unit, new_unit=new_unit)
            return f"{data.value:.{decimal_places}f} {data.unit or ''}".strip()

        df[output_column_name] = df[input_column_name].apply(format_row)
        return df

    df[output_column_name] = [f"{v:.{decimal_places}f}{suffix}" for v in values]
    return df

normalize_json_column

normalize_json_column(df: AnyDataFrame, column: Annotated[str, Field(description='The column name.')], skip_if_not_exists: Annotated[bool, AdvancedField(description='Skip if the column does not exist.', default=True)] = True, sort_columns: Annotated[bool, AdvancedField(description='Sort new columns alphabetically.', default=True)] = True) -> AnyDataFrame
Source code in ecoscope/platform/tasks/transformation/_normalize.py
@register()
def normalize_json_column(
    df: AnyDataFrame,
    column: Annotated[str, Field(description="The column name.")],
    skip_if_not_exists: Annotated[
        bool,
        AdvancedField(description="Skip if the column does not exist.", default=True),
    ] = True,
    sort_columns: Annotated[
        bool,
        AdvancedField(description="Sort new columns alphabetically.", default=True),
    ] = True,
) -> AnyDataFrame:
    import ecoscope

    if skip_if_not_exists and column not in df.columns:
        logger.warning("Column '%s' does not exist in DataFrame. Skipping normalization.", column)
    else:
        ecoscope.io.earthranger_utils.normalize_column(df, column, sort_columns)

    return cast(
        AnyDataFrame,
        df,
    )

normalize_numeric_column

normalize_numeric_column(df: AnyDataFrame, column: Annotated[str, Field(description='The column to normalize, values must be numeric.')], output_column_name: Annotated[str | None, Field(description='If provided, normalized values will be added as a new column.')]) -> AnyDataFrame
Source code in ecoscope/platform/tasks/transformation/_normalize.py
@register()
def normalize_numeric_column(
    df: AnyDataFrame,
    column: Annotated[str, Field(description="The column to normalize, values must be numeric.")],
    output_column_name: Annotated[
        str | None,
        Field(description="If provided, normalized values will be added as a new column."),
    ],
) -> AnyDataFrame:
    from pandas.api.types import is_numeric_dtype

    if not is_numeric_dtype(df[column]):
        raise ValueError(f"Provided column {column} must contain only numeric values")

    normalized_values = (df[column] - df[column].mean()) / df[column].std()
    df[output_column_name if output_column_name else column] = normalized_values

    return cast(
        AnyDataFrame,
        df,
    )

reorder_columns

reorder_columns(df: AnyDataFrame, columns: Annotated[list[str], Field(description='Provided column names will be first in the dataframe.')]) -> AnyDataFrame

Reorder columns in the provided dataframe to the order of the provided column names.

Parameters:

Name Type Description Default
df AnyDataFrame

The input DataFrame.

required
columns list[str]

Provided column names will be first in the dataframe.

required

Returns:

Name Type Description
AnyDataFrame AnyDataFrame

The updated DataFrame.

Source code in ecoscope/platform/tasks/transformation/_mapping.py
@register()
def reorder_columns(
    df: AnyDataFrame,
    columns: Annotated[
        list[str],
        Field(description="Provided column names will be first in the dataframe."),
    ],
) -> AnyDataFrame:
    """
    Reorder columns in the provided dataframe to the order of the provided column names.

    Args:
        df (AnyDataFrame): The input DataFrame.
        columns (list[str]): Provided column names will be first in the dataframe.

    Returns:
        AnyDataFrame: The updated DataFrame.
    """
    assert all([col in df for col in columns])

    reorderd = columns + [col for col in df.columns if col not in columns]

    df = df.reindex(columns=reorderd)

    return cast(AnyDataFrame, df)

resolve_spatial_feature_groups_for_spatial_groupers

resolve_spatial_feature_groups_for_spatial_groupers(groupers: AllGrouper | UserDefinedGroupers, spatial_feature_groups: list[RegionsGDF | EmptyDataFrame] | RegionsGDF | EmptyDataFrame) -> AllGrouper | UserDefinedGroupers

Resolves feature groups for SpatialGroupers, if necessary

Source code in ecoscope/platform/tasks/transformation/_indexing.py
@register()
def resolve_spatial_feature_groups_for_spatial_groupers(
    groupers: AllGrouper | UserDefinedGroupers,
    spatial_feature_groups: list[RegionsGDF | EmptyDataFrame] | RegionsGDF | EmptyDataFrame,
) -> AllGrouper | UserDefinedGroupers:
    """Resolves feature groups for SpatialGroupers, if necessary"""
    if not isinstance(groupers, AllGrouper) and spatial_feature_groups is not None:
        if not isinstance(spatial_feature_groups, list):
            spatial_feature_groups = [spatial_feature_groups]

        lookup: dict[str, RegionsGDF | EmptyDataFrame] = {
            sfg["metadata"].iloc[0]["display_name"]: sfg for sfg in spatial_feature_groups if not sfg.empty
        }
        for grouper in groupers:
            if isinstance(grouper, SpatialGrouper):
                sfg = lookup.get(grouper.spatial_index_name)
                if sfg is not None:
                    # We want to filter out non-polys since lines/points don't make sense as spatial groupers
                    sfg = sfg[sfg.geometry.geom_type.isin(["Polygon", "MultiPolygon"])]
                    if sfg.empty:  # type: ignore[union-attr]
                        raise ValueError(
                            f"There are no polygons in Feature Group {grouper.display_name},"
                            " you must select a feature collection that contains at least 1 polygon."
                        )
                    grouper.resolve(spatial_regions=sfg)  # type: ignore[arg-type]

    return groupers

sort_values

sort_values(df: AnyDataFrame, column_name: Annotated[str, Field(description='The column name to sort values by.')], ascending: Annotated[bool, Field(description='Sort ascending if true')] = True, na_position: Annotated[Literal['first', 'last'], AdvancedField(description='Where to place NaN values in the sort', default=last)] = 'last') -> AnyDataFrame
Source code in ecoscope/platform/tasks/transformation/_sorting.py
@register()
def sort_values(
    df: AnyDataFrame,
    column_name: Annotated[str, Field(description="The column name to sort values by.")],
    ascending: Annotated[bool, Field(description="Sort ascending if true")] = True,
    na_position: Annotated[
        Literal["first", "last"],
        AdvancedField(description="Where to place NaN values in the sort", default="last"),
    ] = "last",
) -> AnyDataFrame:
    return cast(
        AnyDataFrame,
        df.sort_values(by=column_name, ascending=ascending, na_position=na_position),
    )

strip_prefix_from_column_names

strip_prefix_from_column_names(df: AnyDataFrame, prefix: Annotated[str, Field(description='The prefix to remove.')]) -> AnyDataFrame

Strip the provided prefix from column names that have it.

Parameters:

Name Type Description Default
df AnyDataFrame

The input DataFrame.

required
prefix str

The prefix to remove from column names in this dataframe.

required

Returns:

Name Type Description
AnyDataFrame AnyDataFrame

The updated DataFrame.

Source code in ecoscope/platform/tasks/transformation/_mapping.py
@register()
def strip_prefix_from_column_names(
    df: AnyDataFrame,
    prefix: Annotated[
        str,
        Field(description="The prefix to remove."),
    ],
) -> AnyDataFrame:
    """
    Strip the provided prefix from column names that have it.

    Args:
        df (AnyDataFrame): The input DataFrame.
        prefix (str): The prefix to remove from column names in this dataframe.

    Returns:
        AnyDataFrame: The updated DataFrame.
    """
    df = df.rename(columns={col: col.removeprefix(prefix) for col in df.columns})  # type: ignore[assignment]
    return cast(AnyDataFrame, df)

title_case_columns_by_prefix

title_case_columns_by_prefix(df: AnyDataFrame, prefix: Annotated[str, Field(description='Column names prefixed with this value will be converted to title case.')]) -> AnyDataFrame

Convert the column names beginning with the provided prefix to title case.

Parameters:

Name Type Description Default
df AnyDataFrame

The input DataFrame.

required
prefix str

Column names prefixed with this value will be converted to title case.

required

Returns:

Name Type Description
AnyDataFrame AnyDataFrame

The updated DataFrame.

Source code in ecoscope/platform/tasks/transformation/_mapping.py
@register()
def title_case_columns_by_prefix(
    df: AnyDataFrame,
    prefix: Annotated[
        str,
        Field(description="Column names prefixed with this value will be converted to title case."),
    ],
) -> AnyDataFrame:
    """
    Convert the column names beginning with the provided prefix to title case.

    Args:
        df (AnyDataFrame): The input DataFrame.
        prefix (str): Column names prefixed with this value will be converted to title case.

    Returns:
        AnyDataFrame: The updated DataFrame.
    """

    mapping = {col: col.removeprefix(prefix).replace("_", " ").title() for col in df.columns if col.startswith(prefix)}
    df = df.rename(columns=mapping)  # type: ignore[assignment]

    return cast(AnyDataFrame, df)

transpose

transpose(df: AnyDataFrame, transposed_column_name: Annotated[str | SkipJsonSchema[None], Field(description='If provided, the transposed index will be a column with this name')] = None) -> AnyDataFrame
Source code in ecoscope/platform/tasks/transformation/_transpose.py
@register()
def transpose(
    df: AnyDataFrame,
    transposed_column_name: Annotated[
        str | SkipJsonSchema[None],
        Field(description="If provided, the transposed index will be a column with this name"),
    ] = None,
) -> AnyDataFrame:
    from pandas import RangeIndex

    transposed = df.transpose()
    if transposed_column_name:
        transposed = transposed.reset_index(names=transposed_column_name)

    if isinstance(df.index, RangeIndex):
        transposed = transposed.rename(columns={col: str(col) for col in transposed.columns if isinstance(col, int)})

    return cast(AnyDataFrame, transposed)

with_unit

with_unit(value: Annotated[float, Field(description='The original value.')], original_unit: Annotated[Unit | SkipJsonSchema[None], Field(description='The original unit of measurement.')] = None, new_unit: Annotated[Unit | SkipJsonSchema[None], Field(description='The unit to convert to.')] = None) -> Annotated[Quantity, Field(description='The value with an optional unit.')]
Source code in ecoscope/platform/tasks/transformation/_unit.py
@register()
def with_unit(
    value: Annotated[float, Field(description="The original value.")],
    original_unit: Annotated[
        Unit | SkipJsonSchema[None],
        Field(description="The original unit of measurement."),
    ] = None,
    new_unit: Annotated[
        Unit | SkipJsonSchema[None],
        Field(description="The unit to convert to."),
    ] = None,
) -> Annotated[Quantity, Field(description="The value with an optional unit.")]:
    if not original_unit:
        return Quantity(value=value)

    if not new_unit:
        return Quantity(value=value, unit=original_unit)

    import astropy.units as u  # type: ignore[import-untyped]

    original = value * u.Unit(original_unit.value)
    new_quantity = original.to(u.Unit(new_unit.value))
    return Quantity(value=new_quantity.value, unit=new_unit)