Source code for welltest_pta.analysis.deconvolution

r"""
welltest_pta.analysis.deconvolution
====================================
Multi-rate pressure-transient deconvolution.

The convolution problem for a variable-rate well test is

.. math::
    p_i - p_{wf}(t) = \int_0^t q(\tau)\, p_u'(t-\tau)\, d\tau

where :math:`p_u(t)` is the unit-rate constant-rate drawdown response and
:math:`p_u'(t) = dp_u/dt`. Deconvolution recovers :math:`p_u(t)` from the
measured :math:`p_{wf}(t)` and known rate history :math:`q(t)`. The result
is a **single equivalent buildup** that effectively merges all buildups
(and drawdowns) in the test, dramatically extending the radius of
investigation and revealing late-time boundaries that no individual
buildup is long enough to see.

Algorithm — von Schroeter–Hollaender–Gringarten (vSH04)
-------------------------------------------------------
Direct deconvolution is ill-posed (Hadamard-unstable to noise). vSH04
solve the regularised non-linear problem in the **encoded variable**

.. math::
    z(\sigma) \;=\; \ln\!\left[t\, \frac{dp_u}{dt}\right]
              \;=\; \ln\!\left[\frac{dp_u}{d\ln t}\right],
    \qquad \sigma = \ln t

so that :math:`p_u'(t) = e^{z(\ln t)}/t \ge 0` automatically (positivity
of the derivative enforced by construction). The objective is

.. math::
    J(z, p_i) \;=\; \|y - C(q,z) - p_i\|^2
                  \,+\, \nu \,\|D\,z\|^2

with :math:`D` a second-difference (curvature) operator and :math:`\nu`
the regularisation weight. The user specifies only :math:`\nu`
(typically :math:`10^{-3}\!-\!10^{-1}`); both :math:`z` and
:math:`p_i` are recovered.

References
----------
- von Schroeter, T., Hollaender, F., & Gringarten, A. C. (2004).
  *Deconvolution of well-test data as a nonlinear total least-squares
  problem.* SPE Journal **9** (4), 375–390.
- Levitan, M. M. (2005). *Practical application of pressure/rate
  deconvolution to analysis of real well tests.* SPE 84290.
"""

from __future__ import annotations

import logging
from dataclasses import dataclass, field
from typing import Optional, TYPE_CHECKING

import numpy as np
import pandas as pd
from scipy.optimize import least_squares

if TYPE_CHECKING:
    import matplotlib.pyplot as plt

logger = logging.getLogger(__name__)


# ─────────────────────────────────────────────────────────────────────────────
# Result container
# ─────────────────────────────────────────────────────────────────────────────

[docs] @dataclass class DeconvolutionResult: """Output of :func:`deconvolve`.""" t: np.ndarray # response time grid (hr), log-spaced pu: np.ndarray # unit-rate cumulative response (psi per unit q) dpu_dlnt: np.ndarray # log-derivative dp_u/d(ln t) (psi per unit q) z: np.ndarray # solved encoded variable p_initial: float # solved (or fixed) initial pressure (psi) nu: float # regularisation used converged: bool # solver convergence flag iterations: int # solver iterations residual_norm: float # ||r||_2 (psi) fit_pressure: np.ndarray # reconstructed p(t_obs) (psi) obs_pressure: np.ndarray # observed p(t_obs) (psi) obs_time: np.ndarray # observation time grid (hr from t=0) rate_history: pd.DataFrame = field(repr=False) # input rate steps metadata: dict = field(default_factory=dict, repr=False) # ──────── methods ────────
[docs] def to_dataframe(self) -> pd.DataFrame: """Long-form DataFrame of the recovered response.""" return pd.DataFrame({ "t_hr": self.t, "pu_psi_per_unit_q": self.pu, "dpu_dlnt_psi_per_unit_q": self.dpu_dlnt, })
[docs] def export(self, path: str, format: str = "csv") -> None: """Save the response to CSV / Excel / JSON.""" df = self.to_dataframe() path_lower = str(path).lower() if format == "csv" or path_lower.endswith(".csv"): df.to_csv(path, index=False) elif format == "excel" or path_lower.endswith((".xlsx", ".xls")): df.to_excel(path, index=False) elif format == "json" or path_lower.endswith(".json"): df.to_json(path, orient="records", date_format="iso") else: raise ValueError(f"Unsupported format: {format}") logger.info("Deconvolution exported → %s", path)
[docs] def plot(self, ax: "plt.Axes | None" = None, show_obs_fit: bool = False): """Log-log plot of the recovered :math:`p_u` and its derivative.""" import matplotlib.pyplot as plt if ax is None: fig, ax = plt.subplots(figsize=(9, 6)) else: fig = ax.figure ax.loglog(self.t, np.abs(self.pu), "o-", ms=3, lw=1.0, color="#1f77b4", label=r"$p_u$") ax.loglog(self.t, np.abs(self.dpu_dlnt), "s-", ms=3, lw=1.0, color="#d62728", label=r"$dp_u/d\ln t$") ax.set_xlabel(r"$\Delta t$ (hr)") ax.set_ylabel(r"Unit-rate response (psi per unit $q$)") ax.set_title("Deconvolved Unit-Rate Response (vSH04)") ax.grid(True, which="both", alpha=0.3) ax.legend() return fig
# ───────────────────────────────────────────────────────────────────────────── # Internals # ───────────────────────────────────────────────────────────────────────────── def _build_log_grid(t_min: float, t_max: float, n: int) -> np.ndarray: """Geometric grid in time (log-spaced).""" if t_min <= 0 or t_max <= t_min: raise ValueError("t_min must be > 0 and t_max > t_min") return np.geomspace(t_min, t_max, n) def _pu_from_z(z: np.ndarray, sigma: np.ndarray) -> tuple[np.ndarray, np.ndarray]: r""" Given encoded variable :math:`z = \ln(dp_u/d\ln t)` on a log grid :math:`\sigma = \ln t`, return :math:`p_u(t)` and :math:`dp_u/d\ln t`. .. math:: p_u(t_n) = \int_{-\infty}^{\sigma_n} e^{z(\sigma)}\, d\sigma \approx \sum_{i \le n} e^{z_i}\, \Delta\sigma_i """ dpdlnt = np.exp(z) dsigma = np.diff(sigma, prepend=sigma[0] - (sigma[1] - sigma[0])) pu = np.cumsum(dpdlnt * dsigma) return pu, dpdlnt def _convolve_with_rates( t_obs: np.ndarray, rate_steps_t: np.ndarray, rate_steps_dq: np.ndarray, t_resp: np.ndarray, pu: np.ndarray, ) -> np.ndarray: r""" Compute :math:`\Delta p(t_j) = \sum_k \Delta q_k \cdot p_u(t_j - \tau_k)` using piecewise-linear interpolation on the log-spaced response grid. Parameters ---------- t_obs Observation times (hr) from the start of the test (t=0). rate_steps_t Times at which rate changes occur (hr from t=0). rate_steps_dq Rate jumps :math:`\Delta q_k = q_k - q_{k-1}` at each step. t_resp, pu Unit-rate response on a log-spaced grid (must start at t_resp[0] > 0). """ delta_p = np.zeros_like(t_obs, dtype=float) log_resp = np.log(t_resp) log_pu = np.log(np.maximum(pu, 1e-30)) # safe log for tau_k, dq_k in zip(rate_steps_t, rate_steps_dq): if dq_k == 0: continue dt_k = t_obs - tau_k active = dt_k > 0 if not active.any(): continue dt_active = dt_k[active] # Log-linear interpolation in time, but use linear pu directly log_dt = np.log(np.clip(dt_active, t_resp[0], t_resp[-1])) # numpy.interp handles outside-range as flat extrapolation pu_interp = np.interp(log_dt, log_resp, pu) delta_p[active] += dq_k * pu_interp return delta_p def _second_difference_operator(n: int) -> np.ndarray: r"""Build the (n-2) × n centred second-difference matrix on a uniform grid.""" D = np.zeros((n - 2, n)) for i in range(n - 2): D[i, i] = 1.0 D[i, i + 1] = -2.0 D[i, i + 2] = 1.0 return D # ───────────────────────────────────────────────────────────────────────────── # Rate-history helpers # ───────────────────────────────────────────────────────────────────────────── def _rate_history_from_events( events, default_q: Optional[float] = None, ) -> pd.DataFrame: """ Build a rate-step DataFrame from a list of Event objects. Each event must have ``rate`` set (drawdowns get q, buildups get 0). If ``default_q`` is supplied, drawdowns without explicit rate use it. """ rows = [] last_q = 0.0 for ev in events: q = ev.rate if q is None: if ev.event_type == "buildup": q = 0.0 else: if default_q is None: raise ValueError( f"Event {ev.event_id} ({ev.event_type}) has no rate; " f"set ev.rate or pass default_q." ) q = default_q dq = q - last_q if dq != 0.0: rows.append({"t_hr": float(ev.elapsed_start_hr), "q": float(q), "dq": float(dq)}) last_q = q if not rows: # Fallback: single step at t=0 with q=0 rows.append({"t_hr": 0.0, "q": 0.0, "dq": 0.0}) return pd.DataFrame(rows) # ───────────────────────────────────────────────────────────────────────────── # Main entry point # ─────────────────────────────────────────────────────────────────────────────
[docs] def deconvolve( events, rate_history: pd.DataFrame | dict | None = None, default_q: Optional[float] = None, nu: float = 1e-2, n_response_nodes: int = 60, t_response_min: float = 1e-3, t_response_max: Optional[float] = None, p_initial: Optional[float] = None, fit_p_initial: bool = True, max_iter: int = 200, verbose: bool = False, ) -> DeconvolutionResult: r""" Recover the unit-rate constant-rate response from multi-rate data. Implements the encoded vSH04 formulation. Suitable for merging two or more buildups/drawdowns into a single equivalent buildup whose log–log derivative is the diagnostic "master plot" of the entire test. Parameters ---------- events Iterable of :class:`Event` (or ``EventCollection``). Both drawdowns and buildups are used. Each event must carry rate information (``ev.rate`` in STB/D — or set ``default_q``). rate_history Optional pre-built rate history. DataFrame with columns ``t_hr``, ``q`` (rate at each step). If ``None``, derived from events. default_q Fallback flow rate (STB/D) for drawdowns without an explicit ``ev.rate``. Buildups always get q=0. nu Regularisation weight for curvature of :math:`z`. Larger ⇒ smoother response (more bias). Smaller ⇒ noisier (more variance). Typical :math:`10^{-3}` (low noise) to :math:`10^{-1}` (high noise). n_response_nodes Number of log-spaced nodes on the recovered response grid. t_response_min, t_response_max Time range (hr) of the recovered response. ``t_response_max`` defaults to the maximum observation time. p_initial Initial reservoir pressure (psi). If ``None`` and ``fit_p_initial=True``, it is solved as an additional unknown. fit_p_initial Whether to recover :math:`p_i` simultaneously with :math:`z`. max_iter Maximum solver iterations (Levenberg–Marquardt). verbose Print solver progress. Returns ------- :class:`DeconvolutionResult` Notes ----- For best results: include both a long buildup and the immediately preceding drawdown; clean buildup tails (V8.1 detector does this); use consistent rate units throughout (typically STB/D). """ # ── 1. Assemble (t, p) observation arrays from all PTA events ── t_obs_list, p_obs_list = [], [] for ev in events: df_ev = ev.data if df_ev is None or df_ev.empty: continue t_obs_list.append(df_ev["elapsed_hr"].to_numpy()) p_obs_list.append(df_ev["p_smooth"].to_numpy()) if not t_obs_list: raise ValueError("No event data available for deconvolution.") t_obs = np.concatenate(t_obs_list) p_obs = np.concatenate(p_obs_list) order = np.argsort(t_obs) t_obs = t_obs[order] p_obs = p_obs[order] # ── 2. Build rate history (step changes Δq_k at τ_k) ── if rate_history is None: rate_history = _rate_history_from_events(events, default_q=default_q) elif isinstance(rate_history, dict): rate_history = pd.DataFrame(rate_history) if "dq" not in rate_history.columns: # build dq from q q_arr = rate_history["q"].to_numpy() rate_history = rate_history.copy() rate_history["dq"] = np.concatenate([[q_arr[0]], np.diff(q_arr)]) rate_t = rate_history["t_hr"].to_numpy() rate_dq = rate_history["dq"].to_numpy() # ── 3. Build log-spaced response grid ── t_max = t_response_max if t_response_max is not None else float(np.max(t_obs)) t_resp = _build_log_grid(t_response_min, t_max, n_response_nodes) sigma = np.log(t_resp) # ── 4. Initial guess for z (constant — flat radial response) ── # Choose a level so that p_u at t_max is roughly p_obs - p_i normalized p_i_guess = float(p_initial) if p_initial is not None else float(np.percentile(p_obs, 95)) typical_dp = max(p_i_guess - float(np.min(p_obs)), 50.0) typical_q = float(np.max(np.abs(rate_dq))) if (np.abs(rate_dq) > 0).any() else 1.0 pu_guess = typical_dp / max(typical_q, 1e-6) # p_u(t_max) ≈ pu_guess → exp(z) * (sigma_max - sigma_min) ≈ pu_guess z0_const = np.log(max(pu_guess / (sigma[-1] - sigma[0] + 1e-9), 1e-6)) z0 = np.full(n_response_nodes, z0_const) # ── 5. Build regularisation matrix ── D = _second_difference_operator(n_response_nodes) sqrt_nu = np.sqrt(nu) # ── 6. Residual function ── def residuals(params: np.ndarray) -> np.ndarray: if fit_p_initial and p_initial is None: z = params[:-1] p_i = params[-1] else: z = params p_i = p_i_guess if p_initial is None else float(p_initial) pu, _ = _pu_from_z(z, sigma) delta_p_pred = _convolve_with_rates(t_obs, rate_t, rate_dq, t_resp, pu) # In a buildup: p_obs grows, model: p_i + delta_p (delta_p < 0 if drawing down) # Convention: rates are positive when producing → delta_p contributions are NEGATIVE (drawdown) # We want residual = obs - model: p_model = p_i - delta_p_pred # drawdown reduces pressure r_data = p_obs - p_model r_reg = sqrt_nu * D @ z return np.concatenate([r_data, r_reg]) # ── 7. Initial parameter vector ── if fit_p_initial and p_initial is None: x0 = np.concatenate([z0, [p_i_guess]]) else: x0 = z0.copy() # ── 8. Solve ── result = least_squares( residuals, x0=x0, method="lm", max_nfev=max_iter * (len(x0) + 1), xtol=1e-9, ftol=1e-9, gtol=1e-9, verbose=2 if verbose else 0, ) # ── 9. Unpack solution ── if fit_p_initial and p_initial is None: z_sol = result.x[:-1] p_i_sol = float(result.x[-1]) else: z_sol = result.x p_i_sol = p_i_guess if p_initial is None else float(p_initial) pu_sol, dpu_dlnt_sol = _pu_from_z(z_sol, sigma) delta_p_pred = _convolve_with_rates(t_obs, rate_t, rate_dq, t_resp, pu_sol) p_fit = p_i_sol - delta_p_pred res_norm = float(np.linalg.norm(p_obs - p_fit)) return DeconvolutionResult( t=t_resp, pu=pu_sol, dpu_dlnt=dpu_dlnt_sol, z=z_sol, p_initial=p_i_sol, nu=nu, converged=bool(result.success), iterations=int(result.nfev), residual_norm=res_norm, fit_pressure=p_fit, obs_pressure=p_obs, obs_time=t_obs, rate_history=rate_history, metadata={ "n_response_nodes": n_response_nodes, "t_response_min": t_response_min, "t_response_max": t_max, "fit_p_initial": fit_p_initial, }, )