Source code for welltest_pta.events

r"""
welltest_pta.events
===================
First-class :class:`Event` objects + the :class:`EventCollection` container.

After event detection (or manual splitting) every drawdown / buildup is
exposed as a self-contained ``Event`` carrying its slice of the gauge
data plus all PTA methods relevant to that single event:

==================  ============================================================
Method              Returns
==================  ============================================================
``.print()``        Pretty-printed summary to stdout
``.summary()``      Dict of statistics (duration, ΔP, slopes, …)
``.export(...)``    CSV / Excel / JSON of this event's slice
``.plot()``         Pressure vs time (single-event view)
``.plot_loglog()``  Bourdet log–log diagnostic
``.plot_horner()``  Horner plot (buildups only)
``.plot_mdh()``     MDH semi-log plot (buildups only)
``.bourdet(...)``   Returns (dt, derivative) numpy arrays
``.horner()``       Dict from :func:`horner_extrapolation`
``.mdh()``          Dict from :func:`mdh_extrapolation`
``.flow_regimes()`` List of identified flow regimes
``.reservoir_params(...)``  k, kh, S, C from this event alone
==================  ============================================================
"""

from __future__ import annotations

import logging
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterable, Iterator, Optional

import numpy as np
import pandas as pd

from welltest_pta.analysis.bourdet import bourdet_derivative
from welltest_pta.analysis.flow_regimes import identify_flow_regimes
from welltest_pta.analysis.horner import horner_extrapolation
from welltest_pta.analysis.mdh import mdh_extrapolation
from welltest_pta.analysis.reservoir import reservoir_parameters

if TYPE_CHECKING:
    import matplotlib.pyplot as plt

logger = logging.getLogger(__name__)


EVENT_COLOURS = {
    "buildup": "#2ca02c",
    "drawdown": "#d62728",
    "non_pta": "#cccccc",
}


# ─────────────────────────────────────────────────────────────────────────────
# Event
# ─────────────────────────────────────────────────────────────────────────────

[docs] @dataclass class Event: """A single PTA event (drawdown or buildup) with its data slice and methods.""" event_id: str # e.g. "DD-1", "BU-2" event_type: str # "drawdown" or "buildup" idx_start: int # row index in the parent annotated DataFrame idx_end: int # exclusive t_start: pd.Timestamp t_end: pd.Timestamp elapsed_start_hr: float # hours from t=0 of the test elapsed_end_hr: float data: pd.DataFrame = field(repr=False) # slice of parent DataFrame for this event rate: Optional[float] = None # flow rate during this event (STB/D) preceding_dd_dur_hr: Optional[float] = None parent_p_reservoir: Optional[float] = None metadata: dict[str, Any] = field(default_factory=dict, repr=False) # ──────── basic stats ──────── @property def duration_hr(self) -> float: return float(self.elapsed_end_hr - self.elapsed_start_hr) @property def p_initial(self) -> float: return float(self.data["p_smooth"].iloc[0]) @property def p_final(self) -> float: return float(self.data["p_smooth"].iloc[-1]) @property def delta_p(self) -> float: return self.p_final - self.p_initial @property def rate_psi_per_hr(self) -> float: return self.delta_p / self.duration_hr if self.duration_hr > 0 else 0.0
[docs] def summary(self) -> dict[str, Any]: """Return a summary dictionary of this event's statistics.""" return { "event_id": self.event_id, "type": self.event_type, "idx_start": self.idx_start, "idx_end": self.idx_end, "t_start": self.t_start, "t_end": self.t_end, "duration_hr": round(self.duration_hr, 4), "p_initial": round(self.p_initial, 2), "p_final": round(self.p_final, 2), "delta_p": round(self.delta_p, 2), "rate_psi_hr": round(self.rate_psi_per_hr, 2), "p_max": round(float(self.data["p_smooth"].max()), 2), "p_min": round(float(self.data["p_smooth"].min()), 2), "preceding_dd_dur_hr": ( round(self.preceding_dd_dur_hr, 4) if self.preceding_dd_dur_hr is not None else None ), "rate_stb_d": self.rate, "n_points": len(self.data), }
[docs] def print(self) -> None: """Pretty-print the event summary.""" s = self.summary() sep = "─" * 60 print(f"\n{sep}") print(f" Event {s['event_id']} ({s['type']})") print(sep) print(f" Time range: {s['t_start']}{s['t_end']}") print(f" Duration: {s['duration_hr']:.4f} hr") print(f" P_initial: {s['p_initial']:.2f} psi") print(f" P_final: {s['p_final']:.2f} psi") print(f" ΔP: {s['delta_p']:+.2f} psi") print(f" Slope: {s['rate_psi_hr']:.2f} psi/hr") print(f" Samples: {s['n_points']}") if s.get("preceding_dd_dur_hr") is not None: print(f" tp (prev DD): {s['preceding_dd_dur_hr']:.4f} hr") if self.rate is not None: print(f" Flow rate q: {self.rate:.2f} STB/D") print(sep)
# ──────── export ────────
[docs] def export(self, path: str | Path, format: str = "auto") -> None: """Save this event's data slice to CSV / Excel / JSON.""" path = Path(path) if format == "auto": ext = path.suffix.lower() if ext == ".csv": format = "csv" elif ext in (".xlsx", ".xls"): format = "excel" elif ext == ".json": format = "json" else: format = "csv" if format == "csv": self.data.to_csv(path, index=False) elif format == "excel": self.data.to_excel(path, index=False) elif format == "json": self.data.to_json(path, orient="records", date_format="iso") else: raise ValueError(f"Unsupported format: {format}") logger.info("Event %s exported → %s", self.event_id, path)
# ──────── analysis methods ────────
[docs] def bourdet(self, L: float = 0.2) -> tuple[np.ndarray, np.ndarray]: """Return (Δt, Bourdet derivative) for this event.""" seg_p = self.data["p_smooth"].to_numpy() seg_hr = self.data["elapsed_hr"].to_numpy() dt = seg_hr - seg_hr[0] dt[0] = 1e-6 dp = seg_p - seg_p[0] return bourdet_derivative(dt, dp, L=L)
[docs] def horner( self, fit_start_frac: float = 0.30, fit_end_frac: float = 0.85, ) -> dict[str, float]: """Horner P\\* extrapolation. Buildup only.""" if self.event_type != "buildup": raise ValueError("Horner analysis applies to buildups only.") if self.preceding_dd_dur_hr is None or self.preceding_dd_dur_hr <= 0: raise ValueError("Cannot run Horner: preceding_dd_dur_hr unknown or non-positive.") return horner_extrapolation( self.data["p_smooth"].to_numpy(), self.data["elapsed_hr"].to_numpy(), tp_hr=self.preceding_dd_dur_hr, fit_start_frac=fit_start_frac, fit_end_frac=fit_end_frac, )
[docs] def mdh( self, fit_start_frac: float = 0.30, fit_end_frac: float = 0.85, ) -> dict[str, float]: """MDH semi-log analysis. Buildup only.""" if self.event_type != "buildup": raise ValueError("MDH analysis applies to buildups only.") return mdh_extrapolation( self.data["p_smooth"].to_numpy(), self.data["elapsed_hr"].to_numpy(), fit_start_frac=fit_start_frac, fit_end_frac=fit_end_frac, )
[docs] def flow_regimes( self, L: float = 0.2, slope_tol: float = 0.10, ) -> list[dict[str, Any]]: """Auto-identify flow regimes from the Bourdet derivative.""" dt, deriv = self.bourdet(L=L) return identify_flow_regimes(dt, deriv, slope_tol=slope_tol)
[docs] def reservoir_params( self, q: float, mu: float, B: float, h: float, phi: float, ct: float, rw: float, method: str = "horner", ) -> dict[str, float]: """ Compute :math:`k`, :math:`kh`, skin :math:`S` (and :math:`C` if early-storage slope detected) for this buildup. Parameters ---------- q, mu, B, h, phi, ct, rw Fluid / reservoir / well properties (see :func:`welltest_pta.analysis.reservoir.reservoir_parameters`). method ``"horner"`` (default) or ``"mdh"``. """ if self.event_type != "buildup": raise ValueError("Reservoir parameters from semilog analysis require a buildup.") if method == "horner": res = self.horner() slope = res["slope_m"] # p_1hr from Horner line: p_1hr = m * log10((tp+1)/1) + p_star tp = self.preceding_dd_dur_hr or 0.0 p_1hr = slope * np.log10((tp + 1.0) / 1.0) + res["p_star"] elif method == "mdh": res = self.mdh() slope = res["slope_m"] p_1hr = res["intercept_p1hr"] else: raise ValueError("method must be 'horner' or 'mdh'") # p_wf at instant of shut-in = pressure at start of buildup p_wf = self.p_initial # Try to estimate early-time storage slope (psi/hr) from first 5% of data early_slope = None try: n_early = max(5, int(len(self.data) * 0.05)) t_early = self.data["elapsed_hr"].iloc[:n_early].to_numpy() p_early = self.data["p_smooth"].iloc[:n_early].to_numpy() dt_early = t_early - t_early[0] dp_early = p_early - p_early[0] if len(dt_early) >= 3 and dt_early[-1] > 0: A = np.vstack([dt_early, np.ones_like(dt_early)]).T m_early, _ = np.linalg.lstsq(A, dp_early, rcond=None)[0] if m_early > 0: early_slope = float(m_early) except Exception: pass return reservoir_parameters( slope_m=slope, p_1hr=p_1hr, p_wf_at_shut_in=p_wf, q=q, mu=mu, B=B, h=h, phi=phi, ct=ct, rw=rw, early_storage_slope=early_slope, )
# ──────── plotting ────────
[docs] def plot(self, ax: "plt.Axes | None" = None, raw: bool = True): """Pressure vs time for this single event.""" import matplotlib.pyplot as plt if ax is None: fig, ax = plt.subplots(figsize=(9, 5)) else: fig = ax.figure if raw and "pressure" in self.data.columns: ax.plot(self.data["timestamp"], self.data["pressure"], color="#b0b0b0", lw=0.4, alpha=0.6, label="Raw") ax.plot(self.data["timestamp"], self.data["p_smooth"], color=EVENT_COLOURS.get(self.event_type, "#1f77b4"), lw=1.2, label=f"{self.event_id} ({self.event_type})") ax.set_xlabel("Time") ax.set_ylabel("Pressure (psi)") ax.set_title(f"{self.event_id}{self.event_type}", fontweight="bold") ax.legend() ax.grid(True, alpha=0.3) return fig
[docs] def plot_loglog(self, ax: "plt.Axes | None" = None, L: float = 0.2): """Log–log diagnostic plot (|ΔP| and Bourdet derivative).""" import matplotlib.pyplot as plt if ax is None: fig, ax = plt.subplots(figsize=(9, 6)) else: fig = ax.figure seg_p = self.data["p_smooth"].to_numpy() seg_hr = self.data["elapsed_hr"].to_numpy() dt = seg_hr - seg_hr[0] dt[0] = 1e-6 dp = seg_p - seg_p[0] ax.loglog(dt, np.abs(dp), "o-", ms=2, lw=0.8, color="#1f77b4", label=r"$|\Delta P|$") dt_d, deriv_d = bourdet_derivative(dt, dp, L=L) if len(dt_d) > 3: ax.loglog(dt_d, deriv_d, "s-", ms=2, lw=0.8, color="#d62728", label="Bourdet derivative") ax.set_xlabel(r"$\Delta t$ (hr)") ax.set_ylabel(r"$|\Delta P|$, $dP/d(\ln t)$ (psi)") ax.set_title(f"Log–Log Diagnostic — {self.event_id}", fontweight="bold") ax.grid(True, which="both", alpha=0.3) ax.legend(loc="lower right") return fig
[docs] def plot_horner(self, ax: "plt.Axes | None" = None): """Horner plot (buildups only).""" if self.event_type != "buildup": raise ValueError("Horner plot requires a buildup.") if self.preceding_dd_dur_hr is None or self.preceding_dd_dur_hr <= 0: raise ValueError("preceding_dd_dur_hr is not set.") import matplotlib.pyplot as plt if ax is None: fig, ax = plt.subplots(figsize=(9, 5)) else: fig = ax.figure seg_p = self.data["p_smooth"].to_numpy() seg_hr = self.data["elapsed_hr"].to_numpy() dt = seg_hr - seg_hr[0] dt = np.where(dt <= 0, 1e-6, dt) tp = self.preceding_dd_dur_hr horner_x = (tp + dt) / dt ax.semilogx(horner_x, seg_p, "o-", ms=2, lw=0.8, color="#1f77b4", label=self.event_id) # IARF fit annotation try: res = self.horner() ax.axhline(res["p_star"], color="#006400", ls="--", lw=1.5, label=fr"$P^*$ = {res['p_star']:.1f} psi ($R^2$ = {res['r2']:.4f})") except Exception: pass ax.set_xlabel(r"Horner time $(t_p + \Delta t)/\Delta t$") ax.set_ylabel(r"$p_{ws}$ (psi)") ax.invert_xaxis() ax.set_title(f"Horner Plot — {self.event_id} (tp = {tp:.2f} hr)", fontweight="bold") ax.grid(True, which="both", alpha=0.3) ax.legend() return fig
[docs] def plot_mdh(self, ax: "plt.Axes | None" = None): """MDH semi-log plot (buildups only).""" if self.event_type != "buildup": raise ValueError("MDH plot requires a buildup.") import matplotlib.pyplot as plt if ax is None: fig, ax = plt.subplots(figsize=(9, 5)) else: fig = ax.figure seg_p = self.data["p_smooth"].to_numpy() seg_hr = self.data["elapsed_hr"].to_numpy() dt = seg_hr - seg_hr[0] dt = np.where(dt <= 0, 1e-6, dt) ax.semilogx(dt, seg_p, "o-", ms=2, lw=0.8, color="#1f77b4", label=self.event_id) try: res = self.mdh() ax.axhline(res["intercept_p1hr"], color="#006400", ls="--", lw=1.5, label=fr"$p_{{1hr}}$ = {res['intercept_p1hr']:.1f} psi") except Exception: pass ax.set_xlabel(r"$\Delta t$ (hr)") ax.set_ylabel(r"$p_{ws}$ (psi)") ax.set_title(f"MDH Plot — {self.event_id}", fontweight="bold") ax.grid(True, which="both", alpha=0.3) ax.legend() return fig
def __repr__(self) -> str: return ( f"Event(id={self.event_id!r}, type={self.event_type!r}, " f"duration_hr={self.duration_hr:.3f}, " f"ΔP={self.delta_p:+.1f} psi, n={len(self.data)})" )
# ───────────────────────────────────────────────────────────────────────────── # EventCollection # ─────────────────────────────────────────────────────────────────────────────
[docs] class EventCollection: """ An ordered, indexable container of :class:`Event` objects. Behaves like a list with extra convenience methods: >>> wt.events # EventCollection >>> wt.events[0] # first event >>> wt.events["BU-2"] # by event_id >>> for e in wt.events: ... >>> len(wt.events) """
[docs] def __init__(self, events: Iterable[Event] = ()): self._events: list[Event] = list(events)
# ── container protocol ── def __len__(self) -> int: return len(self._events) def __iter__(self) -> Iterator[Event]: return iter(self._events) def __getitem__(self, key) -> Event | "EventCollection": if isinstance(key, int): return self._events[key] if isinstance(key, slice): return EventCollection(self._events[key]) if isinstance(key, str): for e in self._events: if e.event_id == key: return e raise KeyError(f"No event with id {key!r}") raise TypeError(f"Bad key type: {type(key)}") def __repr__(self) -> str: n_dd = sum(1 for e in self._events if e.event_type == "drawdown") n_bu = sum(1 for e in self._events if e.event_type == "buildup") return f"EventCollection({len(self._events)} events: {n_dd} DD, {n_bu} BU)" # ── filters ── @property def drawdowns(self) -> "EventCollection": return EventCollection(e for e in self._events if e.event_type == "drawdown") @property def buildups(self) -> "EventCollection": return EventCollection(e for e in self._events if e.event_type == "buildup") @property def longest_buildup(self) -> Optional[Event]: bus = list(self.buildups) return max(bus, key=lambda e: e.duration_hr) if bus else None # ── tabular ──
[docs] def to_dataframe(self) -> pd.DataFrame: return pd.DataFrame([e.summary() for e in self._events])
[docs] def print(self) -> None: df = self.to_dataframe() if df.empty: print("(no events)") return cols = ["event_id", "type", "duration_hr", "p_initial", "p_final", "delta_p", "rate_psi_hr", "n_points"] print(df[cols].to_string(index=False, float_format="%.2f"))
[docs] def export(self, path: str | Path, format: str = "auto") -> None: """Export the catalogue (one row per event).""" path = Path(path) df = self.to_dataframe() if format == "auto": ext = path.suffix.lower() if ext == ".csv": format = "csv" elif ext in (".xlsx", ".xls"): format = "excel" elif ext == ".json": format = "json" else: format = "csv" if format == "csv": df.to_csv(path, index=False) elif format == "excel": df.to_excel(path, index=False) elif format == "json": df.to_json(path, orient="records", date_format="iso") else: raise ValueError(f"Unsupported format: {format}") logger.info("Event catalogue exported → %s", path)
# ── factories ──
[docs] @classmethod def from_annotated_dataframe( cls, df: pd.DataFrame, p_reservoir: Optional[float] = None, ) -> "EventCollection": """ Build an :class:`EventCollection` from a DataFrame annotated by the :class:`EventDetector` (i.e. with an ``event`` column). """ if "event" not in df.columns: raise ValueError("DataFrame missing 'event' column — run the detector first.") if "p_smooth" not in df.columns or "elapsed_hr" not in df.columns: raise ValueError("DataFrame missing 'p_smooth' / 'elapsed_hr' — run the detector.") labels = df["event"] groups, cur, start = [], labels.iloc[0], 0 for i in range(1, len(labels)): if labels.iloc[i] != cur: groups.append((start, i, cur)) cur, start = labels.iloc[i], i groups.append((start, len(labels), cur)) events: list[Event] = [] dd_n = bu_n = 0 prev_dd_dur: Optional[float] = None for s, e, lbl in groups: if lbl not in ("drawdown", "buildup"): continue if lbl == "drawdown": dd_n += 1 eid = f"DD-{dd_n}" else: bu_n += 1 eid = f"BU-{bu_n}" sub = df.iloc[s:e].reset_index(drop=True).copy() ev = Event( event_id=eid, event_type=lbl, idx_start=int(s), idx_end=int(e), t_start=df["timestamp"].iloc[s], t_end=df["timestamp"].iloc[min(e - 1, len(df) - 1)], elapsed_start_hr=float(df["elapsed_hr"].iloc[s]), elapsed_end_hr=float(df["elapsed_hr"].iloc[min(e - 1, len(df) - 1)]), data=sub, preceding_dd_dur_hr=prev_dd_dur if lbl == "buildup" else None, parent_p_reservoir=p_reservoir, ) events.append(ev) if lbl == "drawdown": prev_dd_dur = ev.duration_hr return cls(events)