Source code for welltest_pta.parser

"""
welltest_pta.parser
===================
Robust multi-format ASCII reader for well-test gauge data.

Handles arbitrary delimiters (tab, semicolon, comma, pipe, whitespace),
mixed encodings, comma-decimal European format, and various date formats
(DD/MM/YYYY HH:MM:SS, MM/DD/YY HH:MM, etc.).

Refactored from the original ``EnhancedWellTestParser`` (Harkat 2025) into
a stateless function plus a stateful class for advanced use cases.
"""

from __future__ import annotations

import logging
import re
from pathlib import Path
from typing import Any, Optional

import numpy as np
import pandas as pd

logger = logging.getLogger(__name__)


# ─────────────────────────────────────────────────────────────────────────────
# Module-level parsing constants
# ─────────────────────────────────────────────────────────────────────────────

P_RANGE_DEFAULT = (0.0, 30000.0)
T_RANGE_DEFAULT = (-50.0, 500.0)
YEAR_RANGE_DEFAULT = (1980, 2035)

P_KEYWORDS = [
    "bhp", "press", "pression", "psia", "psig",
    "p-avg", "gauge", "pressure", "pres",
]
T_KEYWORDS = [
    "bht", "temp", "degc", "degf", "t-avg",
    "temperature", "deg", "°c", "°f",
]
DT_KEYWORDS = [
    "date", "time", "temps", "hh:mm", "hh/mm", "clock",
    "yyyy", "timestamp", "datetime", "dd/mm", "mm/dd", "dd-", "yy",
]
DELTA_KEYWORDS = ["delta", "elapsed", "cumul"]
UNIT_INDICATORS = [
    "psia", "psig", "degc", "degf", "°c", "°f",
    "hh:mm", "hh/mm", "mm/dd", "dd/mm",
    "mpa", "kpa", "bar", "atm",
]
META_PATTERNS = [
    r"well\s*name", r"gauge\s*(serial|model|manufacturer|s/n)",
    r"client", r"field", r"rig\s*name", r"date\s*of\s*last",
    r"pressure\s*units", r"temperature\s*units", r"type\s*de\s*test",
    r"d[ée]but", r"fin\s*des", r"intervalle",
    r"point\s*de\s*lecture", r"c[ôo]te",
    r"gauge\s*spe[sc]ialist", r"=====",
]


# ─────────────────────────────────────────────────────────────────────────────
# Public top-level function
# ─────────────────────────────────────────────────────────────────────────────

[docs] def parse( filepath: str | Path, p_range: tuple[float, float] = P_RANGE_DEFAULT, t_range: tuple[float, float] = T_RANGE_DEFAULT, ) -> pd.DataFrame: """ Parse a well-test ASCII gauge file into a clean DataFrame. Parameters ---------- filepath Path to the ASCII (.txt, .csv, .dat, .prn, ...) file. p_range Physical sanity bounds for pressure (psia). Values outside are NaN-ed. t_range Physical sanity bounds for temperature. Returns ------- DataFrame with at least ``timestamp`` and ``pressure`` columns, plus ``temperature``, ``delta_hours``, and QC flags when available. """ parser = WellTestParser(p_range=p_range, t_range=t_range) return parser.parse(filepath)
# ───────────────────────────────────────────────────────────────────────────── # Stateful parser class # ─────────────────────────────────────────────────────────────────────────────
[docs] class WellTestParser: """ Stateful parser that exposes detected metadata and column mapping after a successful ``.parse(...)`` call. """
[docs] def __init__( self, p_range: tuple[float, float] = P_RANGE_DEFAULT, t_range: tuple[float, float] = T_RANGE_DEFAULT, year_range: tuple[int, int] = YEAR_RANGE_DEFAULT, ) -> None: self.P_RANGE = p_range self.T_RANGE = t_range self.YEAR_RANGE = year_range self.metadata: dict[str, str] = {} self.mapping: dict[str, Any] = {} self._comma_decimal: bool = False self._dayfirst_hint: Optional[bool] = None
# ── Numeric and time-string helpers ────────────────────────────────── def _clean_numeric(self, series: pd.Series) -> pd.Series: def extract(val): if pd.isna(val) or str(val).strip() == "": return np.nan s = str(val).strip() if self._comma_decimal: s = re.sub(r"(\d),(\d)", r"\1.\2", s) try: m = re.search(r"[-+]?\d*\.?\d+(?:[eE][-+]?\d+)?", s) return float(m.group()) if m else np.nan except (ValueError, TypeError): return np.nan return series.apply(extract) @staticmethod def _normalize_time_string(val, is_time_col: bool = True): if pd.isna(val): return val s = str(val).strip() if not is_time_col: return s m = re.match(r"^(\d{1,2})/(\d{1,2})(/(\d{1,2}))?$", s) if m: hh, mm = int(m.group(1)), int(m.group(2)) if hh <= 23 and mm <= 59: return s.replace("/", ":") return s @staticmethod def _is_metadata_line(line: str) -> bool: stripped = line.strip() if not stripped: return True if re.match(r"^[=\-_\*#~]{3,}$", stripped): return True low = stripped.lower() return any(re.search(p, low) for p in META_PATTERNS) @staticmethod def _extract_metadata(lines: list[str]) -> dict[str, str]: meta: dict[str, str] = {} for line in lines: stripped = line.strip() if not stripped or re.match(r"^[=\-_\*#~]{3,}$", stripped): continue m = re.match(r"^(.+?)\s*[:=]\s*(.+)$", stripped) if m: meta[m.group(1).strip()] = m.group(2).strip() return meta @staticmethod def _detect_comma_decimal(lines: list[str], start: int) -> bool: cc = dc = checked = 0 for line in lines[start:start + 30]: s = line.strip() if not s: continue cc += len(re.findall(r"\d+,\d{1,5}(?!\d)", s)) dc += len(re.findall(r"\d+\.\d{1,5}(?!\d)", s)) checked += 1 if checked >= 10: break return cc > dc and cc > 3 @staticmethod def _detect_dayfirst_from_headers(header_line: str) -> Optional[bool]: low = header_line.lower() if re.search(r"dd[/\-]mm", low): return True if re.search(r"mm[/\-]dd", low): return False return None # ── Structure detection ────────────────────────────────────────────── def _detect_structure( self, filepath: Path, max_lines: int = 300 ) -> tuple[int, str, Optional[int], list[str]]: with open(filepath, "r", encoding="utf-8", errors="ignore") as f: lines = [line.rstrip("\n") for line in f][:max_lines] meta_lines: list[str] = [] data_start_zone = 0 for i, line in enumerate(lines): if self._is_metadata_line(line): meta_lines.append(line) data_start_zone = i + 1 else: break delimiters = ["\t", ";", ",", "|", r"\s+"] all_kw = P_KEYWORDS + T_KEYWORDS + DT_KEYWORDS best_score = -1 best_delim: Optional[str] = None best_header = data_start_zone best_units: Optional[int] = None search_start = max(0, data_start_zone - 3) search_end = min(len(lines), data_start_zone + 20) for delim in delimiters: for i in range(search_start, search_end): line = lines[i] stripped = line.strip() if not stripped or re.match(r"^[=\-_\*#~]{3,}$", stripped): continue parts = stripped.split() if delim == r"\s+" else stripped.split(delim) if len(parts) < 2: continue line_lower = stripped.lower() kw_matches = sum(1 for kw in all_kw if kw in line_lower) if kw_matches < 1: continue consistent, n_fields, data_lines_found = True, None, 0 for j in range(i + 1, min(i + 15, len(lines))): test = lines[j].strip() if not test: continue if any(u in test.lower() for u in UNIT_INDICATORS): continue if re.match(r"^[=\-_\*#~]{3,}$", test): continue tparts = test.split() if delim == r"\s+" else test.split(delim) if n_fields is None: n_fields = len(tparts) elif abs(len(tparts) - n_fields) > 1: consistent = False break data_lines_found += 1 if data_lines_found >= 5: break units_idx: Optional[int] = None if i + 1 < len(lines): next_low = lines[i + 1].lower().strip() unit_hits = sum(1 for u in UNIT_INDICATORS if u in next_low) if unit_hits >= 1 or re.search(r"(dd|mm|yy|hh)", next_low): if not re.findall(r"\d{4,}", next_low): units_idx = i + 1 score = ( kw_matches * 3 + (5 if units_idx else 0) + (15 if consistent else 0) + (3 if data_lines_found >= 3 else 0) ) if score > best_score: best_score = score best_delim = delim best_header = i best_units = units_idx if best_delim is None: best_delim, best_header, best_units = r"\s+", 0, None return best_header, best_delim, best_units, meta_lines # ── Column scoring & identification ────────────────────────────────── def _score_column( self, series: pd.Series, keywords: list[str], phys_range: tuple[float, float], ) -> float: clean = self._clean_numeric(series).dropna() if len(clean) == 0: return 0.0 range_frac = clean.between(*phys_range).mean() col_name = str(series.name).lower() if series.name else "" col_clean = re.sub(r"[\[\]\(\)\{\}]", " ", col_name) name_score = 0.0 for kw in keywords: if re.search(r"\b" + re.escape(kw) + r"\b", col_clean): name_score = 3.0 break if kw in col_clean: name_score = max(name_score, 1.5) return range_frac * (1.0 + name_score) def _identify_columns(self, df: pd.DataFrame) -> dict[str, Any]: dt_candidates: list[str] = [] delta_col: Optional[str] = None for col in df.columns: col_lower = str(col).lower() col_clean = re.sub(r"[\[\]\(\)\{\}]", " ", col_lower) if any(kw in col_clean for kw in DELTA_KEYWORDS): delta_col = col continue is_dt = any(kw in col_clean for kw in DT_KEYWORDS) if not is_dt: sample = df[col].dropna().head(5).astype(str) for val in sample: if re.search( r"\d{1,4}[/\-]\d{1,2}[/\-]\d{1,4}|\d{1,2}:\d{2}|" r"\d{1,2}/\d{2}(/\d{2})?|\w{3}-\d{2}", val, ): is_dt = True break if is_dt: dt_candidates.append(col) exclude = set(dt_candidates) if delta_col: exclude.add(delta_col) p_scores: dict[str, float] = {} t_scores: dict[str, float] = {} for col in df.columns: if col in exclude: continue p_scores[col] = self._score_column(df[col], P_KEYWORDS, self.P_RANGE) t_scores[col] = self._score_column(df[col], T_KEYWORDS, self.T_RANGE) p_col = ( max(p_scores, key=p_scores.get) if p_scores and max(p_scores.values()) > 0.3 else None ) t_col = ( max(t_scores, key=t_scores.get) if t_scores and max(t_scores.values()) > 0.3 else None ) if p_col and p_col == t_col: if p_scores.get(p_col, 0) >= t_scores.get(t_col, 0): t_col = None else: p_col = None return {"p": p_col, "t": t_col, "dt": dt_candidates, "delta": delta_col} # ── Datetime parsing with day/month-first auto-detection ───────────── def _parse_datetime( self, df: pd.DataFrame, dt_cols: list[str] ) -> Optional[pd.Series]: if not dt_cols: return None def looks_like_date(v: str) -> bool: m = re.match(r"^(\d{1,4})[/\-](\d{1,2})[/\-](\d{1,4})$", v.strip()) if not m: return False a, b = int(m.group(1)), int(m.group(2)) if len(m.group(1)) == 4 or len(m.group(3)) == 4: return True if b <= 12: return True if a <= 12 and b <= 31: return True return False for col in dt_cols: sample = df[col].dropna().head(5).astype(str).tolist() has_colons = any(":" in v for v in sample) date_votes = sum(1 for v in sample if looks_like_date(v)) is_pure_time = has_colons or (date_votes == 0) df[col] = df[col].apply( lambda v, _is_t=is_pure_time: self._normalize_time_string(v, is_time_col=_is_t) ) combined = df[dt_cols].astype(str).agg(" ".join, axis=1) combined = combined.str.replace(r"\s+", " ", regex=True).str.strip() if self._dayfirst_hint is None: dayfirst_options = [True, False] else: dayfirst_options = [self._dayfirst_hint, not self._dayfirst_hint] best_series, best_score = None, -1.0 for dayfirst in dayfirst_options: try: ts = pd.to_datetime( combined, dayfirst=dayfirst, errors="coerce", format="mixed" ) except Exception: continue valid_frac = ts.notna().mean() if valid_frac < 0.3: continue years = ts.dt.year year_frac = ( (years >= self.YEAR_RANGE[0]) & (years <= self.YEAR_RANGE[1]) ).mean() diffs = ts.diff().dt.total_seconds().dropna() mono_frac = (diffs >= 0).mean() if len(diffs) > 0 else 0 hint_bonus = 0.1 if dayfirst == self._dayfirst_hint else 0.0 score = ( valid_frac * 0.3 + year_frac * 0.3 + mono_frac * 0.3 + hint_bonus ) if score > best_score: best_score = score best_series = ts return best_series # ── QC flags ───────────────────────────────────────────────────────── def _add_qc_flags(self, df: pd.DataFrame) -> pd.DataFrame: qc = pd.DataFrame(index=df.index) qc["qc_pressure"] = "PASS" qc["qc_temperature"] = "PASS" qc["qc_timestamp"] = "PASS" if "pressure" in df.columns: p = df["pressure"] qc.loc[~p.between(*self.P_RANGE) | p.isna(), "qc_pressure"] = "OUT_OF_RANGE" if "temperature" in df.columns: t = df["temperature"] qc.loc[~t.between(*self.T_RANGE) | t.isna(), "qc_temperature"] = "OUT_OF_RANGE" if "timestamp" in df.columns: qc.loc[df["timestamp"].isna(), "qc_timestamp"] = "MISSING" dup = df["timestamp"].duplicated(keep="first") qc.loc[dup, "qc_timestamp"] = "DUPLICATE" if not df["timestamp"].dropna().empty: non_mono = df["timestamp"].diff().dt.total_seconds().fillna(0) < 0 qc.loc[non_mono, "qc_timestamp"] = "NON_MONOTONIC" return qc # ── PUBLIC API ───────────────────────────────────────────────────────
[docs] def parse(self, filepath: str | Path) -> pd.DataFrame: """Parse the ASCII file and return a clean DataFrame.""" filepath = Path(filepath) if not filepath.exists(): raise FileNotFoundError(f"File not found: {filepath}") with open(filepath, "r", encoding="utf-8", errors="ignore") as f: all_lines = [line.rstrip("\n") for line in f] header_row, delimiter, units_row, meta_lines = self._detect_structure(filepath) self.metadata = self._extract_metadata(meta_lines) data_start = header_row + 1 if units_row is not None: data_start = max(data_start, units_row + 1) self._comma_decimal = self._detect_comma_decimal(all_lines, data_start) self._dayfirst_hint = None if header_row < len(all_lines): self._dayfirst_hint = self._detect_dayfirst_from_headers(all_lines[header_row]) if self._dayfirst_hint is None and units_row is not None and units_row < len(all_lines): self._dayfirst_hint = self._detect_dayfirst_from_headers(all_lines[units_row]) skip_rows = list(range(0, header_row)) if units_row is not None: skip_rows.append(units_row) try: kwargs: dict = dict( sep=delimiter, skiprows=skip_rows, engine="python", on_bad_lines="skip", encoding="utf-8", encoding_errors="ignore", ) if self._comma_decimal: kwargs["dtype"] = str df = pd.read_csv(filepath, **kwargs) except Exception: df = pd.read_csv( filepath, sep=r"\s+", header=None, engine="python", on_bad_lines="skip", encoding="utf-8", encoding_errors="ignore", dtype=(str if self._comma_decimal else None), ) df.columns = [f"col_{i}" for i in range(df.shape[1])] df.columns = [str(c).strip() for c in df.columns] # Recover unnamed columns by re-reading the raw header unnamed_cols = [c for c in df.columns if c.startswith("Unnamed")] if unnamed_cols: df = self._recover_unnamed_columns(df, all_lines, header_row, units_row) df = df.dropna(how="all").dropna(axis=1, how="all") # Strip junk rows all_kw = P_KEYWORDS + T_KEYWORDS + DT_KEYWORDS def is_junk_row(row: pd.Series) -> bool: s = " ".join(str(v) for v in row.values if pd.notna(v)).lower() if re.match(r"^[=\-_\*#~\s]+$", s): return True if any(u in s for u in UNIT_INDICATORS): return True return False mask = df.apply(is_junk_row, axis=1) if mask.any(): df = df[~mask].reset_index(drop=True) col_roles = self._identify_columns(df) self.mapping = col_roles timestamp_series = ( self._parse_datetime(df, col_roles["dt"]) if col_roles["dt"] else None ) pressure_series = None if col_roles["p"]: pressure_series = self._clean_numeric(df[col_roles["p"]]) pressure_series = pressure_series.where(pressure_series.between(*self.P_RANGE)) temperature_series = None if col_roles["t"]: temperature_series = self._clean_numeric(df[col_roles["t"]]) temperature_series = temperature_series.where(temperature_series.between(*self.T_RANGE)) out = pd.DataFrame(index=df.index) if timestamp_series is not None: out["timestamp"] = timestamp_series if pressure_series is not None: out["pressure"] = pressure_series if temperature_series is not None: out["temperature"] = temperature_series if col_roles["delta"]: out["delta_hours"] = self._clean_numeric(df[col_roles["delta"]]) used = {col_roles["p"], col_roles["t"], col_roles["delta"]} used.update(col_roles["dt"]) used.discard(None) for c in df.columns: if c not in used: out[f"aux_{c}"] = df[c] qc = self._add_qc_flags(out) out = pd.concat([out, qc], axis=1) if "timestamp" in out.columns: out = out.sort_values("timestamp").reset_index(drop=True) else: out = out.reset_index(drop=True) return out
def _recover_unnamed_columns( self, df: pd.DataFrame, all_lines: list[str], header_row: int, units_row: Optional[int], ) -> pd.DataFrame: raw_header = all_lines[header_row] if header_row < len(all_lines) else "" real_names = [tok.strip() for tok in re.split(r"\t+|\s{2,}", raw_header) if tok.strip()] data_start_line = header_row + 1 if units_row is not None and units_row >= data_start_line: data_start_line = units_row + 1 all_kw = P_KEYWORDS + T_KEYWORDS + DT_KEYWORDS data_rows: list[list[str]] = [] for line in all_lines[data_start_line:]: stripped = line.strip() if not stripped: continue if re.match(r"^[=\-_\*#~]{3,}$", stripped): continue low = stripped.lower() if any(kw in low for kw in all_kw): continue if any(u in low for u in UNIT_INDICATORS): continue fields = stripped.split() if len(fields) >= 2: data_rows.append(fields) if not data_rows: return df n_data, n_names = len(data_rows[0]), len(real_names) if n_data > n_names: extra = n_data - n_names expanded = [real_names[0]] for ei in range(extra): expanded.append(f"_dt_part_{ei}") expanded.extend(real_names[1:]) real_names = expanded uniform: list[list] = [] for row in data_rows: if len(row) >= len(real_names): uniform.append(row[:len(real_names)]) else: uniform.append(row + [np.nan] * (len(real_names) - len(row))) return pd.DataFrame(uniform, columns=real_names[:len(uniform[0])])