import logging
import os
import uuid
import numpy as np
import pandas as pd
import plotly.graph_objs as go
import shapely
from plotly.subplots import make_subplots
from sklearn.neighbors import KernelDensity
from ecoscope.io.utils import extract_voltage
logger = logging.getLogger(__name__)
[docs]class EcoPlotData:
def __init__(self, grouped, x_col="x", y_col="y", groupby_style=None, **style):
self.grouped = grouped
self.x_col = x_col
self.y_col = y_col
self.groupby_style = {} if groupby_style is None else groupby_style
self.style = style
# Plotting Defaults
self.style["mode"] = self.style.get("mode", "lines+markers")
[docs]def ecoplot(
data,
title="",
out_path=None,
subplot_height=100,
subplot_width=700,
vertical_spacing=0.001,
annotate_name_pos=(0.01, 0.99),
y_title_2=None,
layout_kwargs=None,
**make_subplots_kwargs,
):
groups = sorted(list(set.union(*[set(datum.grouped.groups.keys()) for datum in data])))
datum_1 = data[0]
datum_2 = None
for datum in data[1:]:
if datum.y_col != datum_1.y_col:
datum_2 = datum
break
n = len(groups)
fig_height = n * subplot_height + 2 * subplot_height
fig = make_subplots(
**{
**dict(
rows=n,
cols=1,
shared_xaxes="all",
vertical_spacing=vertical_spacing,
x_title=data[0].x_col,
y_title=data[0].y_col,
row_heights=list(np.repeat(subplot_height, n)),
column_widths=[subplot_width],
specs=[[{"secondary_y": datum_2 is not None}]] * len(groups),
),
**make_subplots_kwargs,
}
)
for i, name in enumerate(groups, 1):
for datum in data:
try:
df = datum.grouped.get_group(name)
except KeyError:
continue
timeseries = go.Scatter(
x=df[datum.x_col],
y=df[datum.y_col],
name=name,
**{**datum.style, **datum.groupby_style.get(name, {})},
)
fig.add_trace(
timeseries,
row=i,
col=1,
secondary_y=datum.y_col is not datum_1.y_col,
)
fig.update_xaxes(tickformat="%b-%Y")
fig.layout.annotations[1]["font"]["color"] = datum_1.style.get("line", {}).get("color", "black")
if datum_2 is not None:
fig.layout.annotations = list(fig.layout.annotations) + [
go.layout.Annotation(
{
"font": {
"size": 16,
"color": datum_2.style.get("line", {}).get("color", "black"),
},
"showarrow": False,
"text": y_title_2 or datum_2.y_col,
"textangle": -90,
"x": 1,
"xanchor": "right",
"xref": "paper",
"xshift": +40,
"y": 0.5,
"yanchor": "middle",
"yref": "paper",
},
)
]
fig["layout"].update(
title={
"text": title,
"xanchor": "center",
"x": 0.5,
},
height=fig_height,
)
fig.update_layout(**{**dict(showlegend=False, autosize=False), **(layout_kwargs or {})})
if annotate_name_pos is not None:
for i, name in enumerate(groups, 1):
fig.add_annotation(
text=name,
showarrow=False,
xref="x domain",
yref="y domain",
x=annotate_name_pos[0],
y=annotate_name_pos[1],
row=i,
col=1,
)
if out_path is not None:
fig.write_image(out_path, height=n * subplot_height)
return fig
[docs]def add_seasons(fig, season_df):
fig = make_subplots(figure=fig, specs=[[{"secondary_y": True}]])
fig.add_trace(
go.Scatter(
x=pd.concat([season_df.start, season_df.end]).sort_values(),
y=season_df.season.repeat(2),
fill="tozeroy",
fillcolor="rgba(0,0,255,0.1)",
mode="none",
),
secondary_y=True,
)
fig.update_yaxes(categoryorder="array", categoryarray=["dry", "wet"])
return fig
[docs]def collar_event_timeline(relocations, collar_events):
fig = go.FigureWidget()
ys = [0]
if not collar_events.empty:
times = collar_events["time"].to_list()
times.append(relocations["fixtime"][-1])
xs = [[times[i]] * 3 + [times[i + 1]] for i in range(len(collar_events))]
ys = [[0, i + 1, 0, 0] for i in range(len(collar_events))]
colors = collar_events["colors"]
for x, y, color in zip(xs, ys, colors):
fig.add_trace(go.Scatter(x=x, y=y, line_color=color))
fig.update_layout(
annotations=[
go.layout.Annotation(x=row.time, y=i, text=f"{row.event_type}<br>{row.time.date()}")
for i, (_, row) in enumerate(collar_events.iterrows(), 1)
]
)
x = relocations.fixtime
y = np.full(len(x), np.max(ys) / 10)
fig.add_trace(go.Scatter(x=x, y=y, line_color="rgb(0,0,255)", mode="markers", marker_size=1))
fig.update_layout(
margin_l=0,
margin_r=0,
margin_t=0,
margin_b=15,
yaxis_visible=False,
showlegend=False,
)
return fig
[docs]def mcp(relocations):
relocations = relocations.to_crs(relocations.estimate_utm_crs())
areas = []
times = []
total = shapely.geometry.GeometryCollection()
for time, obs in relocations.groupby(pd.Grouper(key="fixtime", freq="1D"), as_index=False):
if obs.size:
total = total.union(obs.geometry.unary_union).convex_hull
areas.append(total.area)
times.append(time)
areas = np.array(areas)
times = np.array(times)
times[0] = relocations["fixtime"].iat[0]
times[-1] = relocations["fixtime"].iat[-1]
fig = go.FigureWidget()
fig.add_trace(go.Scatter(x=times, y=areas / (1000**2)))
fig.update_layout(
margin_b=15,
margin_l=50,
margin_r=10,
margin_t=25,
title="MCP Asymptote",
yaxis_title="MCP Area (km^2)",
showlegend=False,
)
return fig
[docs]def nsd(relocations):
relocations = relocations.to_crs(relocations.estimate_utm_crs())
times = relocations["fixtime"]
distances = relocations.distance(relocations.geometry[0]) ** 2
fig = go.FigureWidget()
fig.add_trace(go.Scatter(x=times, y=distances / (1000**2)))
fig.update_layout(
margin_b=15,
margin_l=50,
margin_r=10,
margin_t=25,
title="Net Square Displacement (NSD)",
yaxis_title="NSD (km^2)",
showlegend=False,
)
return fig
[docs]def speed(trajectory):
times = np.column_stack(
[
trajectory["segment_start"],
trajectory["segment_start"],
trajectory["segment_end"],
trajectory["segment_end"],
]
).flatten()
speeds = np.column_stack(
[
np.zeros(len(trajectory)),
trajectory["speed_kmhr"],
trajectory["speed_kmhr"],
np.zeros(len(trajectory)),
]
).flatten()
fig = go.FigureWidget()
fig.add_trace(go.Scatter(x=times, y=speeds))
fig.update_layout(
margin_b=15,
margin_l=50,
margin_r=10,
margin_t=25,
title="Speed",
yaxis_title="Speed (km/h)",
showlegend=False,
)
return fig
[docs]def plot_collar_voltage(
relocations,
start_time,
extract_fn=extract_voltage,
output_folder=None,
layout_kwargs=None,
hline_kwargs=None,
):
# @TODO Complete black-box re-write
assigned_range = (
relocations["extra__subjectsource__assigned_range"]
.apply(pd.Series)
.add_prefix("extra.extra.subjectsource__assigned_range.")
)
relocations = relocations.merge(assigned_range, right_index=True, left_index=True)
groups = relocations.groupby(by=["extra__subject__id", "extra__subjectsource__id"])
for group, dataframe in groups:
subject_name = relocations.loc[relocations["extra__subject__id"] == group[0]]["extra__subject__name"].unique()[
0
]
dataframe["extra__subjectsource__assigned_range__upper"] = pd.to_datetime(
dataframe["extra__subjectsource__assigned_range"].str["upper"],
errors="coerce",
)
subjectsource_upperbound = dataframe["extra__subjectsource__assigned_range__upper"].unique()
is_source_active = subjectsource_upperbound >= start_time or pd.isna(subjectsource_upperbound)[0]
if is_source_active:
logger.info(subject_name)
dataframe = dataframe.sort_values(by=["fixtime"])
dataframe["voltage"] = np.array(dataframe.apply(extract_fn, axis=1), dtype=np.float64)
time = dataframe[dataframe.fixtime >= start_time].fixtime.tolist()
voltage = dataframe[dataframe.fixtime >= start_time].voltage.tolist()
# Calculate the historic voltage
hist_voltage = dataframe[dataframe.fixtime <= start_time].voltage.tolist()
if hist_voltage:
volt_upper, volt_lower = np.nanpercentile(hist_voltage, [97.5, 2.5])
hist_voltage_mean = np.nanmean(hist_voltage)
else:
volt_upper, volt_lower = np.nan, np.nan
hist_voltage_mean = None
volt_diff = volt_upper - volt_lower
volt_upper = np.full((len(time)), volt_upper, dtype=np.float32)
volt_lower = np.full((len(time)), volt_lower, dtype=np.float32)
if np.all(volt_diff == 0):
# jitter = np.random.random_sample((len(volt_upper,)))
volt_upper = volt_upper + 0.025 * max(volt_upper)
volt_lower = volt_lower - 0.025 * max(volt_lower)
if not any(hist_voltage or voltage):
continue
try:
lower_y = min(np.nanmin(np.array(hist_voltage)), np.nanmin(np.array(voltage)))
upper_y = max(np.nanmax(np.array(hist_voltage)), np.nanmax(np.array(voltage)))
except ValueError:
lower_y = min(hist_voltage or voltage)
upper_y = max(hist_voltage or voltage)
finally:
lower_y = lower_y - 0.1 * lower_y
upper_y = upper_y + 0.1 * upper_y
if not len(voltage):
continue
if not layout_kwargs:
layout = go.Layout(
xaxis={"title": "Time"},
yaxis={"title": "Collar Voltage"},
margin={"l": 40, "b": 40, "t": 50, "r": 50},
hovermode="closest",
)
else:
layout = go.Layout(**layout_kwargs)
# Add the current voltage
trace = go.Scatter(
x=time,
y=voltage,
fill=None,
showlegend=True,
mode="lines",
line={
"width": 1,
"shape": "spline",
},
line_color="rgb(0,0,246)",
marker={
"colorscale": "Viridis",
"color": voltage,
"colorbar": dict(title="Colorbar"),
"cmax": np.max(voltage),
"cmin": np.min(voltage),
},
name=subject_name,
)
# Add the historical lower HPD value
trace_lower = go.Scatter(
x=time,
y=volt_lower,
fill=None,
line_color="rgba(255,255,255,0)",
mode="lines",
showlegend=False,
)
# Add the historical max HPD value
trace_upper = go.Scatter(
x=time,
y=volt_upper,
fill="tonexty", # fill area between trace0 and trace1
mode="lines",
fillcolor="rgba(0,176,246,0.2)",
line_color="rgba(255,255,255,0)",
showlegend=True,
name="Historic 2.5% - 97.5%",
)
fig = go.Figure(layout=layout)
fig.add_trace(trace_lower)
fig.add_trace(trace_upper)
fig.add_trace(trace)
if hist_voltage_mean:
if not hline_kwargs:
fig.add_hline(
y=hist_voltage_mean,
line_dash="dot",
line_width=1.5,
line_color="Red",
annotation_text="Historic Mean",
annotation_position="right",
)
else:
fig.add_hline(**hline_kwargs)
fig.update_layout(yaxis=dict(range=[lower_y, upper_y]))
if output_folder:
fig.write_image(os.path.join(f"{output_folder}/_{group}_{str(uuid.uuid4())[:4]}.png"))
else:
fig.show()
[docs]def plot_seasonal_dist(ndvi_vals, cuts, bandwidth=0.05, output_file=None):
x = ndvi_vals.sort_values().to_numpy().reshape(-1, 1)
kde = KernelDensity(kernel="gaussian", bandwidth=bandwidth).fit(x)
dens = np.exp(kde.score_samples(x))
fig = go.Figure(
data=go.Scatter(
x=x.ravel(),
y=dens,
fill=None,
showlegend=False,
mode="lines",
line={
"width": 1,
"shape": "spline",
},
)
)
[
fig.add_vline(
x=i, line_width=3, line_dash="dash", line_color="red", annotation_text=" Cut Val: {:.2f}".format(i)
)
for i in cuts[1:-1]
]
fig.update_layout(xaxis_title="NDVI")
if output_file:
fig.write_image(output_file)
return fig