"""
welltest_pta.detection.detector
================================
Automatic PTA event detection (drawdown / buildup / non-PTA).
Algorithm — V8.1 "Spike-Boundary + Tail-Trim"
---------------------------------------------
Phase 0 Hampel-filter despike → Savitzky–Golay smoothing → noise-floor σ̂
Phase 1 Reservoir-pressure plateau detection
Phase 2 RIH / POOH edge masking
Phase 3 Spike-boundary + turning-point detection (with validation)
Phase 4 Zone classification using net-ΔP signed logic
Phase 5 Pause absorption → same-type merge → edge trimming
→ V8.1 post-plateau tail trim (H→I→J spike before POOH)
Adapted from the original ``WellTestEventDetector V8.1`` (Harkat 2025)
into a stateless ``detect_events()`` function plus the stateful
``EventDetector`` class for advanced use cases.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Optional
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Config dataclass
# ─────────────────────────────────────────────────────────────────────────────
[docs]
@dataclass
class EventDetectorConfig:
"""Configuration for the V8.1 event detector.
All fields are public; pass them as keyword arguments at construction
time (e.g. ``EventDetectorConfig(sg_window=31, hampel_sigma=4.0)``).
"""
# ── Smoothing (Savitzky–Golay) ─────────────────────────────────────
sg_window: int = 0 # 0 = auto
sg_polyorder: int = 3
sg_min_window: int = 15
sg_max_window: int = 301
# ── Robustness (Hampel filter) ─────────────────────────────────────
hampel_window: int = 0 # 0 = auto
hampel_sigma: float = 3.0
# ── Reservoir pressure detection ───────────────────────────────────
p_res_override: float = 0.0
p_res_stable_pct: float = 20.0
p_res_min_pts: int = 30
# ── Spike-boundary detection ───────────────────────────────────────
spike_percentile: float = 95.0
spike_min_gap_pts: int = 0 # 0 = auto
# ── Zone classification ────────────────────────────────────────────
min_zone_pts: int = 10
min_pta_dp_psi: float = 15.0
min_pta_duration_hr: float = 0.10
# ── Same-type merge ────────────────────────────────────────────────
merge_gap_max_pts: int = 0 # 0 = auto
# ── V8.1 tail trim (long-buildup post-plateau spike removal) ───────
tail_trim_enabled: bool = True
tail_trim_min_dur_hr: float = 4.0
tail_trim_min_plateau_frac: float = 0.40
tail_trim_dev_n_sigma: float = 8.0
tail_trim_min_tail_dur_hr: float = 0.30
# ─────────────────────────────────────────────────────────────────────────────
# Stateless wrapper
# ─────────────────────────────────────────────────────────────────────────────
[docs]
def detect_events(
df: pd.DataFrame,
cfg: Optional[EventDetectorConfig] = None,
) -> tuple[pd.DataFrame, "EventDetector"]:
"""
Detect drawdown / buildup events on a parsed gauge DataFrame.
Parameters
----------
df
Output of :func:`welltest_pta.parser.parse`. Must contain
``timestamp`` and ``pressure``.
cfg
Detector configuration. Default values cover most DST cases.
Returns
-------
annotated_df
Copy of ``df`` with extra columns ``p_smooth``, ``dpdt``,
``elapsed_hr``, ``event``, ``p_reservoir``.
detector
The fitted ``EventDetector`` (exposes ``_p_res``, ``_noise_floor``).
"""
detector = EventDetector(cfg=cfg)
annotated = detector.detect(df)
return annotated, detector
# ─────────────────────────────────────────────────────────────────────────────
# Detector
# ─────────────────────────────────────────────────────────────────────────────
[docs]
class EventDetector:
"""V8.1 robust event detector — see module docstring for the pipeline."""
[docs]
def __init__(self, cfg: Optional[EventDetectorConfig] = None) -> None:
self.cfg = cfg or EventDetectorConfig()
self._sg_window = 21
self._p_res = 0.0
self._spike_thr = 100.0
self._noise_floor = 1.0 # estimated noise σ̂ (psi)
# ── small helpers ────────────────────────────────────────────────────
@staticmethod
def _odd(w: int, mn: int = 5) -> int:
w = max(w, mn)
return w + 1 if w % 2 == 0 else w
@staticmethod
def _hampel_filter(
series: np.ndarray, window: int, n_sigmas: float = 3.0
) -> np.ndarray:
"""Vectorised Hampel filter using pandas rolling median + MAD."""
n = len(series)
if n == 0:
return series
window = max(3, window)
if window % 2 == 0:
window += 1
s = pd.Series(series)
med = s.rolling(window, center=True, min_periods=1).median()
mad = (s - med).abs().rolling(window, center=True, min_periods=1).median()
std_mad = 1.4826 * mad
outliers = (s - med).abs() > n_sigmas * std_mad
out = s.where(~outliers, med).to_numpy()
return out
@staticmethod
def _contiguous(mask: np.ndarray) -> list[tuple[int, int]]:
regions, in_r, start = [], False, 0
for i, m in enumerate(mask):
if m and not in_r:
start, in_r = i, True
elif not m and in_r:
regions.append((start, i))
in_r = False
if in_r:
regions.append((start, len(mask)))
return regions
@staticmethod
def _rle(labels: pd.Series) -> list[tuple[int, int, str]]:
groups, cur, start = [], labels.iloc[0], 0
for i in range(1, len(labels)):
if labels.iloc[i] != cur:
groups.append((start, i, cur))
cur, start = labels.iloc[i], i
groups.append((start, len(labels), cur))
return groups
# ── PHASE 0: smoothing & despike ────────────────────────────────────
def _compute_derivatives(self, df: pd.DataFrame) -> pd.DataFrame:
cfg = self.cfg
out = df.copy()
t0 = df["timestamp"].iloc[0]
hours = (df["timestamp"] - t0).dt.total_seconds().values / 3600.0
out["elapsed_hr"] = hours
n = len(df)
diffs = np.diff(hours)
mdt = max(np.median(diffs) if len(diffs) > 0 else 1e-3, 1e-6)
p_raw = df["pressure"].interpolate(limit_direction="both").values
# Hampel despike
if cfg.hampel_window > 0:
hw = cfg.hampel_window
else:
hw = max(int(180 / (mdt * 3600)), int(n * 0.005), 5)
hw = min(hw, 51)
logger.info(" Despike window: %d pts (Hampel filter)", hw)
p_clean = self._hampel_filter(p_raw, hw, cfg.hampel_sigma)
# SG smoothing
if cfg.sg_window > 0:
self._sg_window = cfg.sg_window
else:
w = max(int(n * 0.005), max(int(3.0 / 60 / mdt), 15), cfg.sg_min_window)
w = min(w, cfg.sg_max_window, n - 1)
self._sg_window = self._odd(w)
wf = self._odd(min(self._sg_window, n - 1), cfg.sg_polyorder + 2)
out["p_smooth"] = savgol_filter(p_clean, wf, cfg.sg_polyorder)
out["dpdt"] = savgol_filter(p_clean, wf, cfg.sg_polyorder, deriv=1) / mdt
# Optional temperature smoothing
if "temperature" in df.columns and df["temperature"].notna().sum() > wf:
tv = df["temperature"].interpolate(limit_direction="both").values
out["T_smooth"] = savgol_filter(tv, wf, cfg.sg_polyorder)
out["dTdt"] = savgol_filter(tv, wf, cfg.sg_polyorder, deriv=1) / mdt
else:
out["dTdt"] = np.nan
# Noise floor estimate (75th percentile of |residuals|)
residuals = p_raw - out["p_smooth"]
noise_est = np.percentile(np.abs(residuals), 75)
self._noise_floor = max(noise_est, 0.5)
logger.info(" SG window=%d, Noise Floor ≈ %.2f psi", wf, self._noise_floor)
return out
# ── PHASE 1: P_res ───────────────────────────────────────────────────
def _detect_pres(self, df: pd.DataFrame) -> float:
cfg = self.cfg
if cfg.p_res_override > 0:
return cfg.p_res_override
p = df["p_smooth"].values
dpdt = np.abs(df["dpdt"].values)
dpf = dpdt[np.isfinite(dpdt)]
if len(dpf) < 10:
return float(np.nanmax(p))
thr = max(np.percentile(dpf, cfg.p_res_stable_pct), 0.5)
stable = dpdt < thr
if stable.sum() < cfg.p_res_min_pts:
return float(np.percentile(p, 95))
regions = self._contiguous(stable)
valid = [(s, e) for s, e in regions if (e - s) >= cfg.p_res_min_pts]
if not valid:
return float(np.percentile(p[stable], 95))
best_m, best = -np.inf, None
for s, e in valid:
m = np.mean(p[s:e])
if m > best_m:
best_m, best = m, (s, e)
logger.info(" P_res = %.1f psi (plateau [%d-%d])", best_m, best[0], best[1])
return float(best_m)
# ── PHASE 2: edge masking ────────────────────────────────────────────
def _mask_edges(
self, df: pd.DataFrame, P_res: float
) -> tuple[np.ndarray, int, int]:
p = df["p_smooth"].values
n = len(p)
cfg = self.cfg
approach = P_res * (0.70 if P_res < 100 else 0.85)
confirm = max(cfg.p_res_min_pts // 3, 5)
# PTA start
pta_start = 0
for i in range(n):
if p[i] >= approach:
if np.mean(p[i:min(i + confirm, n)]) >= approach * 0.95:
pta_start = max(0, i - confirm)
break
# PTA end
pta_end = n - 1
last_at = -1
for i in range(n - 1, -1, -1):
if p[i] >= approach:
last_at = i
break
if last_at > pta_start:
ext = min(confirm * 3, n - 1 - last_at)
pta_end = last_at + ext
crash = P_res * 0.50
for j in range(last_at, min(pta_end + 1, n)):
if p[j] < crash:
pta_end = max(last_at, j - 1)
break
pta_end = min(pta_end, n - 1)
mask = np.zeros(n, dtype=bool)
mask[pta_start:pta_end + 1] = True
logger.info(" PTA window: [%d-%d], trimmed=%d pts", pta_start, pta_end, n - mask.sum())
return mask, pta_start, pta_end
# ── PHASE 3: boundaries ──────────────────────────────────────────────
def _detect_spike_boundaries(
self, df: pd.DataFrame, pta_start: int, pta_end: int
) -> list[int]:
cfg = self.cfg
dpdt = df["dpdt"].values
p = df["p_smooth"].values
n = len(df)
# 1. dp/dt spikes
pta_dpdt = np.abs(dpdt[pta_start:pta_end + 1])
pta_finite = pta_dpdt[np.isfinite(pta_dpdt)]
spike_bounds: list[int] = []
if len(pta_finite) > 10:
self._spike_thr = max(
np.percentile(pta_finite, cfg.spike_percentile),
5.0,
self._noise_floor * 5.0,
)
abs_dpdt = np.abs(dpdt)
is_spike = np.zeros(n, dtype=bool)
is_spike[pta_start:pta_end + 1] = (
abs_dpdt[pta_start:pta_end + 1] > self._spike_thr
)
min_gap = cfg.spike_min_gap_pts
if min_gap == 0:
min_gap = max(int(n * 0.005), 10)
in_spike = False
last_bnd = pta_start
for i in range(pta_start, pta_end + 1):
if is_spike[i] and not in_spike:
if i - last_bnd >= min_gap:
spike_bounds.append(i)
last_bnd = i
in_spike = True
elif not is_spike[i] and in_spike:
in_spike = False
logger.info(" Initial spike boundaries: %d", len(spike_bounds))
# 2. Pressure turning points
tp_bounds: list[int] = []
pta_p = p[pta_start:pta_end + 1]
pta_n = len(pta_p)
if pta_n > 50:
tp_win = self._odd(max(int(pta_n * 0.05), 15))
tp_win = min(tp_win, pta_n - 1)
try:
p_heavy = savgol_filter(pta_p, tp_win, 2)
except Exception:
p_heavy = pta_p
dp_heavy = np.diff(p_heavy)
sign_changes = np.where(np.diff(np.sign(dp_heavy)))[0]
p_range = np.ptp(pta_p)
min_prominence = max(p_range * 0.03, self._noise_floor * 10.0, 10.0)
for sc in sign_changes:
tp_idx = sc + pta_start
if tp_idx <= pta_start + 20 or tp_idx >= pta_end - 20:
continue
local_p = p[tp_idx]
window_size = max(50, int(pta_n * 0.05))
left_p = p[max(0, tp_idx - window_size):tp_idx]
right_p = p[tp_idx + 1:min(n, tp_idx + window_size)]
if len(left_p) < 5 or len(right_p) < 5:
continue
if (np.min(left_p) > local_p + min_prominence
and np.min(right_p) > local_p + min_prominence):
tp_bounds.append(tp_idx)
elif (np.max(left_p) < local_p - min_prominence
and np.max(right_p) < local_p - min_prominence):
tp_bounds.append(tp_idx)
# 3. Validate (sustained change across ±check_window)
all_bounds = sorted(set(spike_bounds + tp_bounds))
validated: list[int] = []
check_window = max(int(0.01 * len(df)), 10)
for b in all_bounds:
pre_start = max(pta_start, b - check_window)
post_end = min(pta_end, b + check_window)
p_before = np.median(p[pre_start:b])
p_after = np.median(p[b:post_end])
if abs(p_before - p_after) > (self._noise_floor * 5.0):
validated.append(b)
logger.info(" Final boundaries: %d (validated)", len(validated))
return validated
# ── PHASE 4: zone classification ────────────────────────────────────
def _classify_zones(
self,
df: pd.DataFrame,
boundaries: list[int],
pta_start: int,
pta_end: int,
valid_mask: np.ndarray,
) -> pd.Series:
cfg = self.cfg
p = df["p_smooth"].values
hours = df["elapsed_hr"].values
n = len(df)
labels = pd.Series("non_pta", index=df.index, name="event")
zone_edges = sorted(set([pta_start] + boundaries + [pta_end + 1]))
zones = [(zs, ze) for zs, ze in zip(zone_edges[:-1], zone_edges[1:]) if ze - zs >= 2]
logger.info(" %d zones to classify", len(zones))
for zs, ze in zones:
ze_safe = min(ze - 1, n - 1)
p_start = np.median(p[zs:min(zs + 5, ze)])
p_end = np.median(p[max(zs, ze - 5):ze])
net_dp = p_end - p_start
dur = hours[ze_safe] - hours[zs]
abs_dp = abs(net_dp)
if not valid_mask[zs]:
label = "non_pta"
elif (abs_dp < max(cfg.min_pta_dp_psi, self._noise_floor * 5.0)
or dur < cfg.min_pta_duration_hr):
label = "pause"
elif net_dp < 0:
label = "drawdown"
else:
label = "buildup"
labels.iloc[zs:ze] = label
return labels
# ── PHASE 5: cleanup ─────────────────────────────────────────────────
def _absorb_pauses(self, labels: pd.Series, df: pd.DataFrame) -> pd.Series:
refined = labels.copy()
p = df["p_smooth"].values
changed = True
while changed:
changed = False
groups = self._rle(refined)
for i, (s, e, l) in enumerate(groups):
if l != "pause":
continue
left = groups[i - 1][2] if i > 0 else "non_pta"
right = groups[i + 1][2] if i < len(groups) - 1 else "non_pta"
if left == right and left in ("drawdown", "buildup"):
refined.iloc[s:e] = left
changed = True
break
elif left in ("drawdown", "buildup") and right in ("drawdown", "buildup"):
net_dp = p[min(e - 1, len(p) - 1)] - p[s]
refined.iloc[s:e] = "drawdown" if net_dp < 0 else "buildup"
changed = True
break
elif left in ("drawdown", "buildup"):
refined.iloc[s:e] = left
changed = True
break
elif right in ("drawdown", "buildup"):
refined.iloc[s:e] = right
changed = True
break
else:
refined.iloc[s:e] = "non_pta"
changed = True
break
return refined.replace("pause", "non_pta")
def _merge_same_type(self, labels: pd.Series, df: pd.DataFrame) -> pd.Series:
p = df["p_smooth"].values
hours = df["elapsed_hr"].values
changed = True
while changed:
changed = False
groups = self._rle(labels)
if len(groups) < 3:
break
for i in range(len(groups) - 1):
s1, e1, l1 = groups[i]
s2, e2, l2 = groups[i + 1]
if l1 == l2 and l1 in ("drawdown", "buildup", "non_pta"):
labels.iloc[s1:e2] = l1
changed = True
break
if changed:
continue
for i in range(len(groups) - 2):
s1, e1, l1 = groups[i]
s2, e2, l2 = groups[i + 1]
s3, e3, l3 = groups[i + 2]
if not (l1 in ("drawdown", "buildup") and l1 == l3 and l2 != l1):
continue
es2 = min(e2 - 1, len(p) - 1)
mid_dp = abs(p[es2] - p[s2])
mid_dur = hours[es2] - hours[s2]
total_dp = abs(p[min(e3 - 1, len(p) - 1)] - p[s1])
if (mid_dp < max(total_dp * 0.2, self._noise_floor * 10.0)
and mid_dur < 0.5):
labels.iloc[s1:e3] = l1
changed = True
break
return labels
def _absorb_tiny_gaps(self, labels: pd.Series, df: pd.DataFrame) -> pd.Series:
refined = labels.copy()
groups = self._rle(labels)
for i, (s, e, l) in enumerate(groups):
if l != "non_pta" or i == 0 or i == len(groups) - 1:
continue
pts = e - s
left, right = groups[i - 1][2], groups[i + 1][2]
if left not in ("drawdown", "buildup") or right not in ("drawdown", "buildup"):
continue
thr = max(20, int(len(df) * 0.004))
if pts > thr:
continue
if left == right:
refined.iloc[s:e] = left
elif pts < 10:
refined.iloc[s:e] = left
return refined
def _trim_buildup_tails(self, labels: pd.Series, df: pd.DataFrame) -> pd.Series:
"""V8.1 — trim post-plateau tail spikes (H→I→J before POOH)."""
cfg = self.cfg
if not cfg.tail_trim_enabled:
return labels
refined = labels.copy()
p = df["p_smooth"].values
hours = df["elapsed_hr"].values
n = len(p)
plat_tol = max(self._noise_floor * 4.0, 3.0)
tail_thr_psi = max(self._noise_floor * cfg.tail_trim_dev_n_sigma, 5.0)
for s, e, l in self._rle(refined):
if l != "buildup":
continue
zone_n = e - s
es = min(e - 1, n - 1)
zone_dur = hours[es] - hours[s]
if zone_dur < cfg.tail_trim_min_dur_hr or zone_n < 200:
continue
zone_p = p[s:e]
if np.ptp(zone_p) < plat_tol * 4.0:
continue
n_bins = max(60, int(np.sqrt(zone_n)))
hist, edges = np.histogram(zone_p, bins=n_bins)
mode_idx = int(np.argmax(hist))
p_plateau = float((edges[mode_idx] + edges[mode_idx + 1]) / 2.0)
on_plateau = np.abs(zone_p - p_plateau) < plat_tol
late_half_cov = float(np.mean(on_plateau[zone_n // 2:]))
if late_half_cov < 0.30:
continue
idx_on = np.where(on_plateau)[0]
if idx_on.size == 0:
continue
last_pl_loc = int(idx_on.max())
last_pl_global = s + last_pl_loc + 1
plateau_frac = (last_pl_global - s) / zone_n
if plateau_frac < cfg.tail_trim_min_plateau_frac:
continue
tail_n = e - last_pl_global
tail_dur = (hours[es] - hours[last_pl_global]
if last_pl_global < e else 0.0)
if tail_n < 30 or tail_dur < cfg.tail_trim_min_tail_dur_hr:
continue
tail_p = zone_p[last_pl_loc + 1:]
tail_dev = float(np.max(np.abs(tail_p - p_plateau)))
if tail_dev <= tail_thr_psi:
continue
logger.info(
" ✂ Tail trim BU [%d-%d]: plateau=%.1f psi tail_dev=%.1f psi "
"tail_dur=%.2f hr late_cov=%d%% plateau_frac=%d%% "
"→ relabel [%d-%d] as non_pta",
s, e, p_plateau, tail_dev, tail_dur,
int(late_half_cov * 100), int(plateau_frac * 100),
last_pl_global, e,
)
refined.iloc[last_pl_global:e] = "non_pta"
return refined
def _validate(self, labels: pd.Series, df: pd.DataFrame) -> pd.Series:
cfg = self.cfg
refined = labels.copy()
p = df["p_smooth"].values
hours = df["elapsed_hr"].values
for s, e, l in self._rle(refined):
if l not in ("drawdown", "buildup"):
continue
es = min(e - 1, len(p) - 1)
dur = hours[es] - hours[s]
dp = p[es] - p[s]
if (dur < cfg.min_pta_duration_hr
or abs(dp) < max(cfg.min_pta_dp_psi, self._noise_floor * 5.0)):
refined.iloc[s:e] = "non_pta"
continue
if l == "drawdown" and dp > cfg.min_pta_dp_psi:
refined.iloc[s:e] = "buildup"
elif l == "buildup" and dp < -cfg.min_pta_dp_psi:
refined.iloc[s:e] = "drawdown"
return refined
def _trim_edges(self, labels: pd.Series, df: pd.DataFrame) -> pd.Series:
refined = labels.copy()
p = df["p_smooth"].values
changed = True
while changed:
changed = False
groups = self._rle(refined)
pta = [(i, s, e, l) for i, (s, e, l) in enumerate(groups)
if l in ("drawdown", "buildup")]
if not pta:
break
li, ls, le, ll = pta[-1]
if ll == "drawdown":
after = all(groups[j][2] == "non_pta" for j in range(li + 1, len(groups)))
if after or li == len(groups) - 1:
refined.iloc[ls:le] = "non_pta"
changed = True
continue
fi, fs, fe, fl = pta[0]
if fl == "buildup" and len(pta) >= 2 and pta[1][3] == "buildup":
if p[fs] > self._p_res * 0.80:
refined.iloc[fs:fe] = "non_pta"
changed = True
return refined
# ── PUBLIC API ───────────────────────────────────────────────────────
[docs]
def detect(self, df: pd.DataFrame) -> pd.DataFrame:
"""Run the full V8.1 detection pipeline and return annotated DataFrame."""
required = {"timestamp", "pressure"}
missing = required - set(df.columns)
if missing:
raise ValueError(f"Missing columns: {missing}")
df_work = df.dropna(subset=["timestamp", "pressure"]).copy()
df_work = df_work.sort_values("timestamp").reset_index(drop=True)
n = len(df_work)
if n < 20:
raise ValueError(f"Too few data points ({n})")
logger.info("─" * 60)
logger.info("PTA Event Detection V8.1 on %d samples", n)
logger.info("─" * 60)
df_work = self._compute_derivatives(df_work)
self._p_res = self._detect_pres(df_work)
valid_mask, pta_start, pta_end = self._mask_edges(df_work, self._p_res)
boundaries = self._detect_spike_boundaries(df_work, pta_start, pta_end)
labels = self._classify_zones(df_work, boundaries, pta_start, pta_end, valid_mask)
labels = self._absorb_pauses(labels, df_work)
labels = self._merge_same_type(labels, df_work)
labels = self._absorb_tiny_gaps(labels, df_work)
labels = self._merge_same_type(labels, df_work)
labels = self._validate(labels, df_work)
labels = self._trim_edges(labels, df_work)
labels = self._trim_buildup_tails(labels, df_work)
labels = self._merge_same_type(labels, df_work)
labels = self._absorb_tiny_gaps(labels, df_work)
labels = self._merge_same_type(labels, df_work)
df_work["event"] = labels
df_work["p_reservoir"] = self._p_res
# Summary
groups = self._rle(labels)
pta_g = [(s, e, l) for s, e, l in groups if l in ("drawdown", "buildup")]
logger.info("Detected P_res = %.1f psi", self._p_res)
logger.info("Noise floor = %.2f psi", self._noise_floor)
logger.info("Found %d PTA periods (%d segments).", len(pta_g), len(groups))
return df_work