"""
welltest_pta.analysis.flow_regimes
==================================
Automatic identification of flow regimes from the Bourdet derivative.
The log–log slope of the derivative is the diagnostic signature:
============================== ========== =========================================
Regime Slope Physical interpretation
============================== ========== =========================================
Wellbore storage (WBS) +1 Compressibility of fluid in tubing/annulus
Bilinear flow +¼ Finite-conductivity hydraulic fracture
Linear flow +½ Infinite-conductivity fracture / channel
Infinite-acting radial (IARF) 0 Cylindrical radial flow in homogeneous media
Spherical flow −½ Limited-entry / partially-penetrating well
Pseudo-steady (closed bndry) +1 All boundaries reached
Constant-pressure boundary Sharp drop Aquifer support / gas cap
============================== ========== =========================================
This module fits piece-wise log–log slopes to the derivative curve and
reports the dominant regime in each segment, along with goodness-of-fit
and the time-window covered.
"""
from __future__ import annotations
import numpy as np
# Reference slopes for each regime
REGIME_SLOPES = {
"wellbore_storage": 1.0,
"bilinear": 0.25,
"linear": 0.5,
"iarf": 0.0,
"spherical": -0.5,
"boundary_closed": 1.0,
"boundary_constant_p": -1.0, # rough — actual signature is steep drop
}
def _moving_slope(log_x: np.ndarray, log_y: np.ndarray, window: int) -> np.ndarray:
"""Centred moving slope of log_y vs log_x (least-squares per window)."""
n = len(log_x)
slopes = np.full(n, np.nan)
half = window // 2
for i in range(half, n - half):
x_w = log_x[i - half: i + half + 1]
y_w = log_y[i - half: i + half + 1]
if len(x_w) < 3:
continue
m, _ = np.polyfit(x_w, y_w, 1)
slopes[i] = m
return slopes
[docs]
def identify_flow_regimes(
dt: np.ndarray,
deriv: np.ndarray,
window_decades: float = 0.4,
slope_tol: float = 0.10,
min_segment_pts: int = 3,
) -> list[dict]:
"""
Segment the Bourdet derivative into flow-regime intervals.
Parameters
----------
dt
Elapsed time array (hr) — the output of :func:`bourdet_derivative`.
deriv
Bourdet derivative magnitude (psi).
window_decades
Width of the moving-window slope estimator, in log decades. The
window length in points is set so that it spans roughly this many
decades on the log axis.
slope_tol
Tolerance around the reference slope to call a match (e.g. 0.10
means slopes in [reference−0.10, reference+0.10]).
min_segment_pts
Minimum contiguous points to accept a regime segment.
Returns
-------
list of dicts, each with keys
``regime``, ``slope_mean``, ``r2``, ``dt_start``, ``dt_end``, ``n_pts``.
"""
dt = np.asarray(dt, dtype=float)
deriv = np.asarray(deriv, dtype=float)
if len(dt) < 5:
return []
mask = (dt > 0) & (deriv > 0) & np.isfinite(deriv)
dt = dt[mask]
deriv = deriv[mask]
if len(dt) < 5:
return []
log_x = np.log10(dt)
log_y = np.log10(deriv)
# Pick window length to cover ~window_decades on x
span = log_x[-1] - log_x[0]
if span <= 0:
return []
pts_per_decade = len(log_x) / span
window = max(5, int(window_decades * pts_per_decade))
if window % 2 == 0:
window += 1
if window >= len(log_x):
window = max(5, len(log_x) // 3)
if window % 2 == 0:
window += 1
slopes = _moving_slope(log_x, log_y, window)
# Classify each point by closest reference slope (within tolerance)
classifications = np.full(len(slopes), "unknown", dtype=object)
for i, s in enumerate(slopes):
if not np.isfinite(s):
continue
# Pick the closest regime within tolerance
best, best_diff = None, np.inf
for name, ref in REGIME_SLOPES.items():
d = abs(s - ref)
if d < best_diff and d <= slope_tol:
best, best_diff = name, d
if best is None:
classifications[i] = "transition"
else:
classifications[i] = best
# Run-length encode + filter short segments
segments: list[dict] = []
if len(classifications) == 0:
return segments
cur, cur_start = classifications[0], 0
for i in range(1, len(classifications)):
if classifications[i] != cur:
seg = _build_segment(
cur, cur_start, i, dt, slopes, log_x, log_y, min_segment_pts
)
if seg is not None:
segments.append(seg)
cur, cur_start = classifications[i], i
seg = _build_segment(
cur, cur_start, len(classifications), dt, slopes, log_x, log_y, min_segment_pts
)
if seg is not None:
segments.append(seg)
return segments
def _build_segment(
name: str,
s: int,
e: int,
dt: np.ndarray,
slopes: np.ndarray,
log_x: np.ndarray,
log_y: np.ndarray,
min_segment_pts: int,
) -> dict | None:
if e - s < min_segment_pts or name in ("unknown", "transition"):
return None
seg_slopes = slopes[s:e]
seg_slopes = seg_slopes[np.isfinite(seg_slopes)]
if len(seg_slopes) == 0:
return None
# R² of full log-log linear fit on the segment
x_seg = log_x[s:e]
y_seg = log_y[s:e]
if len(x_seg) >= 3:
m_lin, b_lin = np.polyfit(x_seg, y_seg, 1)
y_pred = m_lin * x_seg + b_lin
ss_res = float(np.sum((y_seg - y_pred) ** 2))
ss_tot = float(np.sum((y_seg - y_seg.mean()) ** 2))
r2 = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0.0
else:
r2 = np.nan
return {
"regime": name,
"slope_mean": float(np.mean(seg_slopes)),
"r2": float(r2),
"dt_start": float(dt[s]),
"dt_end": float(dt[e - 1]),
"n_pts": int(e - s),
}