Source code for ecoscope.analysis.ecograph

from math import ceil, floor

import geopandas as gpd
import igraph
import networkx as nx
import numpy as np
import pandas as pd
import rasterio
import sklearn.base
from affine import Affine
from shapely.geometry import shape
from skimage.draw import line

import ecoscope


[docs] class Ecograph: """ A class that analyzes movement tracking data using Network Theory. Parameters ---------- trajectory : ecoscope.base.Trajectory Trajectory dataframe resolution : float Pixel size, in meters radius : int Radius to compute Collective Influence (Default : 2) cutoff : int Cutoff to compute an approximation of betweenness index if the standard algorithm is too slow. Can be useful for very large graphs (Default : None) tortuosity_length : int The number of steps used to compute the two tortuosity metrics (Default : 3 steps) """ def __init__(self, trajectory, resolution=15, radius=2, cutoff=None, tortuosity_length=3): self.graphs = {} self.trajectory = trajectory self.resolution = ceil(resolution) self.utm_crs = trajectory.estimate_utm_crs() self.trajectory.to_crs(self.utm_crs, inplace=True) self.features = [ "dot_product", "speed", "step_length", "sin_time", "cos_time", "weight", "degree", "betweenness", "collective_influence", "tortuosity_1", "tortuosity_2", ] geom = self.trajectory["geometry"] eastings = np.array([geom.iloc[i].coords.xy[0] for i in range(len(geom))]).flatten() northings = np.array([geom.iloc[i].coords.xy[1] for i in range(len(geom))]).flatten() self.xmin = floor(np.min(eastings)) - self.resolution self.ymin = floor(np.min(northings)) - self.resolution self.xmax = ceil(np.max(eastings)) + self.resolution self.ymax = ceil(np.max(northings)) + self.resolution self.xmax += self.resolution - ((self.xmax - self.xmin) % self.resolution) self.ymax += self.resolution - ((self.ymax - self.ymin) % self.resolution) self.transform = Affine(self.resolution, 0.00, self.xmin, 0.00, -self.resolution, self.ymax) self.inverse_transform = ~self.transform self.n_rows = int((self.xmax - self.xmin) // self.resolution) self.n_cols = int((self.ymax - self.ymin) // self.resolution) def compute(df): subject_name = df.name print(f"Computing EcoGraph for subject {subject_name}") G = self._get_ecograph(self, df, subject_name, radius, cutoff, tortuosity_length) self.graphs[subject_name] = G self.trajectory.groupby("groupby_col").progress_apply(compute)
[docs] def to_csv(self, output_path): """ Saves the features of all nodes in a CSV file Parameters ---------- output_path : str, Pathlike Output path for the CSV file """ features_id = ["individual_name", "grid_id"] + self.features df = {feat_id: [] for feat_id in features_id} for individual_name, G in self.graphs.items(): for node in G.nodes(): df["individual_name"].append(individual_name) df["grid_id"].append(node) for feature in self.features: df[feature].append(G.nodes[node][feature]) (pd.DataFrame.from_dict(df)).to_csv(output_path, index=False)
[docs] def to_geotiff(self, feature, output_path, individual="all", interpolation=None, transform=None): """ Saves a specific node feature as a GeoTIFF Parameters ---------- feature : str Feature of interest output_path : str, Pathlike Output path for the GeoTIFF file individual : str The individual for which we want to output the node feature (Default : "all") interpolation : str or None Whether to interpolate the feature for each step in the trajectory (Default : None). If provided, has to be one of those four types of interpolation : "mean", "median", "max" or "min" transform : sklearn.base.TransformerMixin or None A feature transform method (Default : None) """ if feature in self.features: if individual == "all": feature_ndarray = self._get_feature_mosaic(self, feature, interpolation) elif individual in self.graphs.keys(): feature_ndarray = self._get_feature_map(self, feature, individual, interpolation) else: raise IndividualNameError("This individual is not in the dataset") else: raise FeatureNameError("This feature was not computed by EcoGraph") if isinstance(transform, sklearn.base.TransformerMixin): nan_mask = ~np.isnan(feature_ndarray) feature_ndarray[nan_mask] = transform.fit_transform(feature_ndarray[nan_mask].reshape(-1, 1)).reshape( feature_ndarray[nan_mask].shape ) raster_profile = ecoscope.io.raster.RasterProfile( pixel_size=self.resolution, crs=self.utm_crs, nodata_value=np.nan, band_count=1, ) raster_profile.raster_extent = ecoscope.io.raster.RasterExtent( x_min=self.xmin, x_max=self.xmax, y_min=self.ymin, y_max=self.ymax ) ecoscope.io.raster.RasterPy.write( ndarray=feature_ndarray, fp=output_path, **raster_profile, )
[docs] @staticmethod def _get_ecograph(self, trajectory_gdf, individual_name, radius, cutoff, tortuosity_length): G = nx.Graph() geom = trajectory_gdf["geometry"] for i in range(len(geom) - (tortuosity_length - 1)): step_attributes = trajectory_gdf.iloc[i] lines = [list(geom.iloc[i + j].coords) for j in range(tortuosity_length)] p1, p2, p3, p4 = lines[0][0], lines[0][1], lines[1][1], lines[1][0] pixel1, pixel2 = ( self.inverse_transform * p1, self.inverse_transform * p2, ) row1, row2 = floor(pixel1[0]), floor(pixel2[0]) col1, col2 = ceil(pixel1[1]), ceil(pixel2[1]) t = step_attributes["segment_start"] seconds_in_day = 24 * 60 * 60 seconds_past_midnight = (t.hour * 3600) + (t.minute * 60) + t.second + (t.microsecond / 1000000.0) time_diff = pd.to_datetime( trajectory_gdf.iloc[i + (tortuosity_length - 1)]["segment_end"] ) - pd.to_datetime(t) time_delta = time_diff.total_seconds() / 3600.0 tortuosity_1, tortuosity_2 = self._get_tortuosities(self, lines, time_delta) attributes = { "dot_product": self._get_dot_product(p1, p2, p3, p4), "speed": step_attributes["speed_kmhr"], "step_length": step_attributes["dist_meters"], "sin_time": np.sin(2 * np.pi * seconds_past_midnight / seconds_in_day), "cos_time": np.cos(2 * np.pi * seconds_past_midnight / seconds_in_day), "tortuosity_1": tortuosity_1, "tortuosity_2": tortuosity_2, } if G.has_node((row1, col1)): self._update_node(G, (row1, col1), attributes) else: self._initialize_node(G, (row1, col1), attributes) if not G.has_node((row2, col2)): self._initialize_node(G, (row2, col2), attributes, empty=True) if (row1, col1) != (row2, col2): G.add_edge((row1, col1), (row2, col2)) for node in G.nodes(): for key in attributes.keys(): if len(G.nodes[node][key]) != 0: G.nodes[node][key] = np.nanmean(G.nodes[node][key]) else: G.nodes[node][key] = np.nan self._compute_network_metrics(self, G, radius, cutoff) return G
[docs] @staticmethod def _update_node(G, node_id, attributes): G.add_node(node_id) G.nodes[node_id]["weight"] += 1 for key, value in attributes.items(): G.nodes[node_id][key].append(value)
[docs] @staticmethod def _get_day_night_value(day_night_value): if day_night_value == "day": return 0 elif day_night_value == "night": return 1
[docs] @staticmethod def _initialize_node(G, node_id, attributes, empty=False): G.add_node(node_id) G.nodes[node_id]["weight"] = 1 for key, value in attributes.items(): if empty: G.nodes[node_id][key] = [] else: G.nodes[node_id][key] = [value]
[docs] @staticmethod def _get_dot_product(x, y, z, w): if (floor(y[0]) == floor(w[0])) and (floor(y[1]) == floor(w[1])): v = [y[0] - x[0], y[1] - x[1]] w = [z[0] - y[0], z[1] - y[1]] angle = np.arctan2(w[1], w[0]) - np.arctan2(v[1], v[0]) while angle <= -np.pi: angle = angle + 2 * np.pi while angle > np.pi: angle = angle - 2 * np.pi return np.cos(angle) else: return np.nan
[docs] @staticmethod def _get_tortuosities(self, lines, time_delta): point1, point2 = lines[0][0], lines[len(lines) - 1][1] beeline_dist = np.sqrt((point2[0] - point1[0]) ** 2 + (point2[1] - point1[1]) ** 2) total_length = 0 for i in range(len(lines) - 1): point1, point2, point3 = lines[i][0], lines[i][1], lines[i + 1][0] if (floor(point2[0]) == floor(point3[0])) and (floor(point2[1]) == floor(point3[1])): total_length += ((point2[0] - point1[0]) ** 2 + (point2[1] - point1[1]) ** 2) ** 0.5 else: if beeline_dist != 0: return np.nan, np.log(time_delta / (beeline_dist**2)) else: return np.nan, np.nan point1, point2 = lines[len(lines) - 1][0], lines[len(lines) - 1][1] total_length += ((point2[0] - point1[0]) ** 2 + (point2[1] - point1[1]) ** 2) ** 0.5 if (total_length != 0) and (beeline_dist != 0): return (beeline_dist / total_length), np.log(time_delta / (beeline_dist**2)) elif (total_length == 0) and (beeline_dist > 0): return np.nan, np.log(time_delta / (beeline_dist**2)) elif (total_length > 0) and (beeline_dist == 0): return 0, np.nan else: return np.nan, np.nan
[docs] @staticmethod def _compute_network_metrics(self, G, radius, cutoff): self._compute_degree(G) self._compute_betweenness(G, cutoff) self._compute_collective_influence(self, G, radius)
[docs] @staticmethod def _compute_degree(G): for node in G.nodes(): G.nodes[node]["degree"] = G.degree[node]
[docs] @staticmethod def _compute_collective_influence(self, G, radius): for node in G.nodes(): G.nodes[node]["collective_influence"] = self._get_collective_influence(G, node, radius)
[docs] @staticmethod def _get_collective_influence(G, start, radius): sub_g = nx.generators.ego_graph(G, start, radius=radius) collective_influence = 0 for n in sub_g.nodes(): collective_influence += G.degree[n] - 1 collective_influence -= G.degree[start] return (G.degree[start]) * collective_influence
[docs] @staticmethod def _compute_betweenness(G, cutoff): g = igraph.Graph.from_networkx(G) btw_idx = g.betweenness(cutoff=cutoff) for v in g.vs: node = v["_nx_name"] G.nodes[node]["betweenness"] = btw_idx[v.index]
[docs] @staticmethod def _get_feature_mosaic(self, feature, interpolation=None): features = [] for individual in self.graphs.keys(): features.append(self._get_feature_map(self, feature, individual, interpolation)) mosaic = np.full((self.n_cols, self.n_rows), np.nan) for i in range(self.n_cols): for j in range(self.n_rows): values = [] for feature_map in features: grid_val = feature_map[i][j] if not np.isnan(grid_val): values.append(grid_val) if len(values) >= 2: mosaic[i][j] = np.mean(values) elif len(values) == 1: mosaic[i][j] = values[0] return mosaic
[docs] @staticmethod def _get_feature_map(self, feature, individual, interpolation): if interpolation is not None: return self._get_interpolated_feature_map(self, feature, individual, interpolation) else: return self._get_regular_feature_map(self, feature, individual)
[docs] @staticmethod def _get_regular_feature_map(self, feature, individual): feature_ndarray = np.full((self.n_cols, self.n_rows), np.nan) for node in self.graphs[individual].nodes(): feature_ndarray[node[1]][node[0]] = (self.graphs[individual]).nodes[node][feature] return feature_ndarray
[docs] @staticmethod def _get_interpolated_feature_map(self, feature, individual, interpolation): feature_ndarray = self._get_regular_feature_map(self, feature, individual) individual_trajectory = self.trajectory[self.trajectory["groupby_col"] == individual] geom = individual_trajectory["geometry"] idxs_dict = {} for i in range(len(geom)): line1 = list(geom.iloc[i].coords) p1, p2 = line1[0], line1[1] pixel1, pixel2 = self.inverse_transform * p1, self.inverse_transform * p2 row1, row2 = floor(pixel1[0]), floor(pixel2[0]) col1, col2 = ceil(pixel1[1]), ceil(pixel2[1]) rr, cc = line(col1, row1, col2, row2) for j in range(len(rr)): if np.isnan(feature_ndarray[rr[j], cc[j]]): if (rr[j], cc[j]) in idxs_dict: idxs_dict[(rr[j], cc[j])].append(feature_ndarray[rr[0], cc[0]]) else: idxs_dict[(rr[j], cc[j])] = [feature_ndarray[rr[0], cc[0]]] for key, value in idxs_dict.items(): if interpolation == "max": feature_ndarray[key[0], key[1]] = np.max(value) elif interpolation == "mean": feature_ndarray[key[0], key[1]] = np.mean(value) elif interpolation == "median": feature_ndarray[key[0], key[1]] = np.median(value) elif interpolation == "min": feature_ndarray[key[0], key[1]] = np.min(value) else: raise InterpolationError("Interpolation type not supported by EcoGraph") return feature_ndarray
[docs] class InterpolationError(Exception): pass
[docs] class IndividualNameError(Exception): pass
[docs] class FeatureNameError(Exception): pass
[docs] def get_feature_gdf(input_path): """ Convert a GeoTIFF feature map into a GeoDataFrame Parameters ---------- input_path : str, Pathlike Input path for the GeoTIFF file """ shapes = [] with rasterio.open(input_path) as src: crs = src.crs.to_wkt() data_array = src.read(1).astype(np.float32) data_array[data_array == src.nodata] = np.nan shapes.extend(rasterio.features.shapes(data_array, transform=src.transform)) data = {"value": [], "geometry": []} for geom, value in shapes: if not np.isnan(value): data["geometry"].append(shape(geom)) data["value"].append(value) df = pd.DataFrame.from_dict(data) return gpd.GeoDataFrame(df, geometry=df.geometry, crs=crs)