r"""
welltest_pta.welltest
=====================
Top-level :class:`WellTest` orchestrator — the user-facing entry point.
Workflow (matches the original V8.1 pipeline + analytics):
>>> from welltest_pta import WellTest
>>>
>>> # 1) Load + auto-detect (with optional CV scores)
>>> wt = WellTest.from_file("DST.txt", cross_validate=True)
>>>
>>> # 2) Inspect detected events
>>> wt.events.print()
>>> wt.events["BU-2"].print()
>>>
>>> # 3) Manual override (if CV score was low)
>>> wt.split_manual([
... ("DD", "2025-01-15 10:00", "2025-01-15 12:30"),
... ("BU", "2025-01-15 12:30", "2025-01-15 18:00"),
... ])
>>>
>>> # 4) Per-event analysis
>>> bu = wt.events["BU-2"]
>>> bu.plot_loglog()
>>> params = bu.reservoir_params(q=850, mu=0.45, B=1.18, h=18,
... phi=0.12, ct=1.2e-5, rw=0.108)
>>>
>>> # 5) Multi-event deconvolution
>>> from welltest_pta import deconvolve
>>> recon = deconvolve(wt.events, default_q=850)
>>> recon.plot()
>>>
>>> # 6) Composite report
>>> wt.plot_composite(out_path="report.pdf")
>>> wt.export_all("output_dir/")
"""
from __future__ import annotations
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Optional, Union
import numpy as np
import pandas as pd
from welltest_pta.detection.detector import (
EventDetector,
EventDetectorConfig,
)
from welltest_pta.events import Event, EventCollection
from welltest_pta.parser import WellTestParser
from welltest_pta.validation.cross_validation import (
DetectorCVResult,
cross_validate_detector,
)
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# WellTest
# ─────────────────────────────────────────────────────────────────────────────
[docs]
class WellTest:
"""
Top-level handle for one well-test interpretation.
A ``WellTest`` keeps three things in lock-step:
* ``self.df`` — the parsed + annotated gauge DataFrame
* ``self.events`` — the :class:`EventCollection` derived from ``self.df``
* ``self.metadata`` — file-level metadata + detector / CV results
Re-running event detection (via :meth:`detect` or :meth:`split_manual`)
rebuilds ``self.events`` and updates ``self.df["event"]`` in place.
"""
# ──────── construction ────────
[docs]
def __init__(
self,
df_raw: pd.DataFrame,
metadata: Optional[dict[str, Any]] = None,
):
if "timestamp" not in df_raw.columns or "pressure" not in df_raw.columns:
raise ValueError("Input DataFrame must contain 'timestamp' and 'pressure'.")
self._df_raw = df_raw.copy()
self.df: Optional[pd.DataFrame] = None # set by detect()
self.events = EventCollection()
self.metadata: dict[str, Any] = metadata or {}
self.detector: Optional[EventDetector] = None
self.cv_result: Optional[DetectorCVResult] = None
# ──────── factory: from file ────────
[docs]
@classmethod
def from_file(
cls,
filepath: str | Path,
cfg: Optional[EventDetectorConfig] = None,
cross_validate: bool = False,
cv_n_bootstrap: int = 8,
cv_print: bool = True,
auto_detect: bool = True,
) -> "WellTest":
"""
Parse an ASCII gauge file, run the V8.1 detector, optionally
cross-validate, and return a populated :class:`WellTest`.
"""
path = Path(filepath)
logger.info("Parsing %s", path.name)
parser = WellTestParser()
df_raw = parser.parse(path)
if df_raw.empty:
raise RuntimeError(f"Parser returned empty DataFrame for {path}")
logger.info("Parsed %d rows (cols: %s)", len(df_raw), list(df_raw.columns))
wt = cls(df_raw, metadata={
"filepath": str(path),
"filename": path.name,
"parser_metadata": parser.metadata,
"parser_mapping": parser.mapping,
"loaded_at": datetime.now().isoformat(timespec="seconds"),
})
if auto_detect:
wt.detect(cfg=cfg)
if cross_validate:
wt.cv_result = cross_validate_detector(
wt._df_raw,
cfg=cfg,
n_bootstrap=cv_n_bootstrap,
print_report=cv_print,
)
return wt
[docs]
@classmethod
def from_dataframe(
cls,
df: pd.DataFrame,
cfg: Optional[EventDetectorConfig] = None,
auto_detect: bool = True,
) -> "WellTest":
"""Build from an already-parsed DataFrame."""
wt = cls(df, metadata={"source": "dataframe",
"loaded_at": datetime.now().isoformat(timespec="seconds")})
if auto_detect:
wt.detect(cfg=cfg)
return wt
# ──────── detection ────────
[docs]
def detect(self, cfg: Optional[EventDetectorConfig] = None) -> "WellTest":
"""Run (or re-run) automatic V8.1 event detection."""
self.detector = EventDetector(cfg=cfg or EventDetectorConfig())
self.df = self.detector.detect(self._df_raw)
self.events = EventCollection.from_annotated_dataframe(
self.df, p_reservoir=self.detector._p_res
)
self.metadata["p_reservoir"] = self.detector._p_res
self.metadata["noise_floor"] = self.detector._noise_floor
return self
[docs]
def cross_validate(
self,
cfg: Optional[EventDetectorConfig] = None,
n_bootstrap: int = 8,
print_report: bool = True,
) -> DetectorCVResult:
"""Run the bootstrap + sensitivity + Jaccard CV on the current data."""
self.cv_result = cross_validate_detector(
self._df_raw,
cfg=cfg,
n_bootstrap=n_bootstrap,
print_report=print_report,
)
return self.cv_result
# ──────── manual splitting ────────
[docs]
def split_manual(
self,
spec: list[tuple[str, Any, Any]],
keep_existing_classifications: bool = False,
) -> "WellTest":
"""
Override the auto-detected events with a manual list.
Parameters
----------
spec
List of ``(type, t_start, t_end)`` tuples where:
* ``type`` is one of ``"DD"``, ``"BU"``, ``"drawdown"``, ``"buildup"``
* ``t_start``, ``t_end`` are timestamps (``str`` or ``pd.Timestamp``)
Example::
wt.split_manual([
("DD", "2025-01-15 10:00:00", "2025-01-15 12:30:00"),
("BU", "2025-01-15 12:30:00", "2025-01-15 18:00:00"),
])
keep_existing_classifications
If True, gaps between manual events keep their auto labels.
If False (default), they are reset to ``non_pta``.
Notes
-----
Calling this method requires that :meth:`detect` has already been
run at least once, since it relies on the ``p_smooth`` / ``elapsed_hr``
columns built in Phase 0.
"""
if self.df is None:
raise RuntimeError("Run .detect() at least once before manual splitting.")
type_map = {
"DD": "drawdown",
"BU": "buildup",
"drawdown": "drawdown",
"buildup": "buildup",
}
df = self.df
new_labels = (
df["event"].copy() if keep_existing_classifications
else pd.Series("non_pta", index=df.index, name="event")
)
ts = df["timestamp"]
for entry in spec:
if len(entry) != 3:
raise ValueError(f"Bad spec entry: {entry!r} — expected (type, t0, t1).")
etype, t0, t1 = entry
if etype not in type_map:
raise ValueError(f"Unknown type {etype!r}; use DD/BU/drawdown/buildup.")
t0 = pd.Timestamp(t0)
t1 = pd.Timestamp(t1)
if t1 <= t0:
raise ValueError(f"t_end ({t1}) must be > t_start ({t0}).")
mask = (ts >= t0) & (ts <= t1)
if not mask.any():
logger.warning("Manual split (%s, %s, %s) matched zero rows.", etype, t0, t1)
continue
new_labels.loc[mask] = type_map[etype]
df["event"] = new_labels
self.df = df
# Rebuild EventCollection
self.events = EventCollection.from_annotated_dataframe(
self.df, p_reservoir=self.metadata.get("p_reservoir")
)
logger.info("Manual splitting applied: %s", self.events)
return self
# ──────── shortcuts ────────
@property
def drawdowns(self) -> EventCollection:
return self.events.drawdowns
@property
def buildups(self) -> EventCollection:
return self.events.buildups
@property
def p_reservoir(self) -> Optional[float]:
return self.metadata.get("p_reservoir")
# ──────── summary / printing ────────
[docs]
def summary(self) -> dict[str, Any]:
"""Return a dict with file-, detector-, and event-level info."""
out = {
"filename": self.metadata.get("filename"),
"n_samples": len(self._df_raw),
"p_reservoir_psi": self.metadata.get("p_reservoir"),
"noise_floor_psi": self.metadata.get("noise_floor"),
"n_events": len(self.events),
"n_drawdowns": len(self.drawdowns),
"n_buildups": len(self.buildups),
}
if self.cv_result is not None:
out["cv_score"] = round(self.cv_result.overall_score, 1)
out["cv_grade"] = self.cv_result.grade
return out
[docs]
def print_summary(self) -> None:
"""Print the high-level summary + the event catalogue."""
s = self.summary()
sep = "═" * 72
print(f"\n{sep}\n WELL TEST SUMMARY\n{sep}")
print(f" File: {s['filename']}")
print(f" Samples: {s['n_samples']}")
print(f" P_reservoir: {s['p_reservoir_psi']:.2f} psi" if s['p_reservoir_psi'] else "")
print(f" Noise floor: {s['noise_floor_psi']:.2f} psi" if s['noise_floor_psi'] else "")
print(f" Events: {s['n_events']} ({s['n_drawdowns']} DD, {s['n_buildups']} BU)")
if "cv_score" in s:
print(f" CV score: {s['cv_score']:.1f} / 100 ({s['cv_grade']})")
print(sep)
self.events.print()
print()
# ──────── plotting ────────
[docs]
def plot_composite(
self,
out_path: Optional[str | Path] = None,
figsize: tuple[float, float] = (14, 10),
):
"""Composite 4-panel report (uses welltest_pta.visualization.composite)."""
from welltest_pta.visualization.composite import plot_composite_report
return plot_composite_report(self, out_path=out_path, figsize=figsize)
[docs]
def plot_overview(
self,
ax=None,
show_events: bool = True,
show_p_res: bool = True,
):
"""Single-panel pressure-vs-time overview with event shading."""
from welltest_pta.visualization.composite import plot_overview
return plot_overview(self, ax=ax, show_events=show_events,
show_p_res=show_p_res)
# ──────── export ────────
[docs]
def export_all(
self,
out_dir: str | Path,
prefix: str = "welltest",
per_event: bool = True,
catalogue_format: str = "csv",
) -> dict[str, Path]:
"""
Export everything to a directory:
* ``{prefix}_full_data.csv`` — the full annotated DataFrame
* ``{prefix}_catalogue.csv`` — one row per event
* ``{prefix}_metadata.json`` — file metadata + detector info
* ``{prefix}_events/{id}.csv`` — one CSV per event (if per_event)
Returns a dict of ``label → Path``.
"""
import json
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
paths: dict[str, Path] = {}
# 1. Full annotated data
full = out / f"{prefix}_full_data.csv"
self.df.to_csv(full, index=False)
paths["full_data"] = full
# 2. Catalogue
cat_ext = {"csv": "csv", "excel": "xlsx", "json": "json"}.get(catalogue_format, "csv")
cat = out / f"{prefix}_catalogue.{cat_ext}"
self.events.export(cat, format=catalogue_format)
paths["catalogue"] = cat
# 3. Metadata JSON
meta_path = out / f"{prefix}_metadata.json"
meta_serialisable = {}
for k, v in self.metadata.items():
try:
json.dumps(v, default=str)
meta_serialisable[k] = v
except TypeError:
meta_serialisable[k] = str(v)
if self.cv_result is not None:
meta_serialisable["cv_score"] = self.cv_result.overall_score
meta_serialisable["cv_grade"] = self.cv_result.grade
with open(meta_path, "w") as f:
json.dump(meta_serialisable, f, indent=2, default=str)
paths["metadata"] = meta_path
# 4. Per-event CSVs
if per_event:
ev_dir = out / f"{prefix}_events"
ev_dir.mkdir(exist_ok=True)
for ev in self.events:
p = ev_dir / f"{ev.event_id}.csv"
ev.export(p, format="csv")
paths[ev.event_id] = p
logger.info("Exported all → %s", out)
return paths
def __repr__(self) -> str:
return (
f"WellTest(file={self.metadata.get('filename')!r}, "
f"n={len(self._df_raw)}, events={len(self.events)})"
)