Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions SU2_PY/SU2/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
from .config import Config
from .state import State_Factory as State
from .historyMap import history_header_map as historyOutFields
from .fwh import FWHData
206 changes: 206 additions & 0 deletions SU2_PY/SU2/io/fwh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#!/usr/bin/env python3
# \file fwh.py
# \brief Reader for SU2 FWH binary surface data files (fwh_bin.dat).
#
# The binary format written by the SU2 FWH surface writer:
# Header : 3 × uint32 (little-endian)
# [0] start_time_us — simulation time at first stored timestep (microseconds)
# [1] n_timesteps — number of timesteps in this file
# [2] n_points — number of surface points
# Data : n_timesteps × n_points × float32 (little-endian), row-major
# pressure[t, i] — static pressure at surface point i, timestep t (Pa)
#
# Usage:
# from SU2.io.fwh import FWHData
# fwh = FWHData.from_file("fwh_bin.dat", time_step=0.0015)
# print(fwh.pressure.shape) # (n_timesteps, n_points)
# print(fwh.time) # physical time array in seconds
#
# \author Riddhi (GSoC 2026 candidate)
# \version 8.4.0 "Harrier"

import struct
import numpy as np
from dataclasses import dataclass, field
Comment thread Fixed
from pathlib import Path
from typing import Optional


@dataclass
class FWHData:
"""Container for FWH binary surface pressure data read from fwh_bin.dat.

Attributes
----------
pressure : np.ndarray, shape (n_timesteps, n_points), float32
Static pressure at each FWH surface point for each timestep (Pa).
time : np.ndarray, shape (n_timesteps,), float64
Physical simulation time corresponding to each timestep (seconds).
n_timesteps : int
Number of timesteps stored in the file.
n_points : int
Number of surface quadrature points on the FWH surface.
start_time_us : int
Raw start-time tag from file header (microseconds, uint32).
source_file : str
Path to the file this data was read from.
"""

pressure: np.ndarray
time: np.ndarray
n_timesteps: int
n_points: int
start_time_us: int
source_file: str = ""

# ------------------------------------------------------------------
# Construction
# ------------------------------------------------------------------

@classmethod
def from_file(cls, path: str, time_step: float = 0.0) -> "FWHData":
"""Read a SU2 FWH binary file.

Parameters
----------
path : str or Path
Path to fwh_bin.dat (or equivalent).
time_step : float, optional
CFD time step in seconds (TIME_STEP in the .cfg file).
Used to build the physical time array. If 0.0, the time
array is integer timestep indices cast to float.

Returns
-------
FWHData
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"FWH binary file not found: {path}")

with open(path, "rb") as fh:
raw = fh.read()

# --- header ---------------------------------------------------
if len(raw) < 12:
raise ValueError(
f"File too small to contain a valid FWH header: {len(raw)} bytes"
)

start_time_us, n_t, n_pts = struct.unpack_from("<3I", raw, 0)
expected_data = n_t * n_pts * 4 # float32
actual_data = len(raw) - 12

if actual_data != expected_data:
raise ValueError(
f"FWH binary size mismatch: header says {n_t}×{n_pts} "
f"({expected_data} bytes) but file has {actual_data} data bytes. "
f"File may be truncated or use a different layout."
)

# --- data block -----------------------------------------------
pressure = np.frombuffer(raw, dtype="<f4", offset=12).reshape(n_t, n_pts).copy()

# --- time axis ------------------------------------------------
if time_step > 0.0:
t0 = start_time_us * 1e-6 # convert μs → s
time = t0 + np.arange(n_t) * time_step
else:
time = np.arange(n_t, dtype=np.float64)

return cls(
pressure=pressure,
time=time,
n_timesteps=int(n_t),
n_points=int(n_pts),
start_time_us=int(start_time_us),
source_file=str(path),
)

# ------------------------------------------------------------------
# Derived quantities for FWH post-processing
# ------------------------------------------------------------------

def pressure_perturbation(self, p_ref: Optional[float] = None) -> np.ndarray:
"""Return p' = p - p_mean (or p - p_ref if supplied).

Parameters
----------
p_ref : float, optional
Reference (free-stream) pressure in Pa. If None, the
time-mean pressure at each point is used.

Returns
-------
np.ndarray, shape (n_timesteps, n_points)
"""
if p_ref is not None:
return self.pressure - p_ref
return self.pressure - self.pressure.mean(axis=0, keepdims=True)

def time_mean_pressure(self) -> np.ndarray:
"""Time-averaged pressure at each surface point (Pa).

Returns
-------
np.ndarray, shape (n_points,)
"""
return self.pressure.mean(axis=0)

def pressure_rms(self) -> np.ndarray:
"""RMS of pressure fluctuation p' at each surface point (Pa).

Returns
-------
np.ndarray, shape (n_points,)
"""
return self.pressure_perturbation().std(axis=0)

# ------------------------------------------------------------------
# I/O helpers
# ------------------------------------------------------------------

def to_dict(self) -> dict:
"""Return a plain dict suitable for JSON serialisation or DataFrame construction."""
return {
"source_file": self.source_file,
"n_timesteps": self.n_timesteps,
"n_points": self.n_points,
"start_time_us": self.start_time_us,
"time": self.time.tolist(),
"pressure_mean": self.time_mean_pressure().tolist(),
"pressure_rms": self.pressure_rms().tolist(),
}

def to_dataframe(self):
"""Return a pandas DataFrame with columns [time, point_0, point_1, ...].

Requires pandas. Each column ``point_i`` contains the pressure
time-series at surface point i.

Returns
-------
pandas.DataFrame
"""
try:
import pandas as pd
except ImportError as exc:
raise ImportError(
"pandas is required for FWHData.to_dataframe(). "
"Install it with: pip install pandas"
) from exc

cols = {"time": self.time}
cols.update({f"point_{i}": self.pressure[:, i] for i in range(self.n_points)})
return pd.DataFrame(cols)

# ------------------------------------------------------------------
# Dunder
# ------------------------------------------------------------------

def __repr__(self) -> str:
return (
f"FWHData(n_timesteps={self.n_timesteps}, n_points={self.n_points}, "
f"t=[{self.time[0]:.4f}, {self.time[-1]:.4f}] s, "
f"p=[{self.pressure.min():.1f}, {self.pressure.max():.1f}] Pa)"
)
Empty file added SU2_PY/tests/__init__.py
Empty file.
132 changes: 132 additions & 0 deletions SU2_PY/tests/test_fwh_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#!/usr/bin/env python3
"""Tests for SU2.io.fwh — FWH binary surface data reader.

Run with:
cd /home/riddhi/SU2
python3 -m pytest SU2_PY/tests/test_fwh_reader.py -v
"""

import struct
import sys
from pathlib import Path

import numpy as np
import pytest

sys.path.insert(0, str(Path(__file__).parent.parent))
from SU2.io.fwh import FWHData

REPO_ROOT = Path(__file__).parent.parent.parent
REFERENCE_FILE = REPO_ROOT / "TestCases/unsteady/square_cylinder/fwh_bin.dat"


def make_synthetic_fwh(path, n_t=10, n_pts=5, start_us=1000):
rng = np.random.default_rng(42)
pressure = (101325.0 + rng.standard_normal((n_t, n_pts)) * 100).astype("<f4")
with open(path, "wb") as fh:
fh.write(struct.pack("<3I", start_us, n_t, n_pts))
fh.write(pressure.tobytes())
return pressure


class TestFWHDataSynthetic:
def test_roundtrip_shape(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=10, n_pts=5)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat", time_step=0.0015)
assert fwh.pressure.shape == (10, 5)
assert fwh.n_timesteps == 10
assert fwh.n_points == 5

def test_roundtrip_values(self, tmp_path):
p = make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=10, n_pts=5)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat", time_step=0.0015)
np.testing.assert_allclose(fwh.pressure, p, rtol=1e-6)

def test_time_axis_with_dt(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=10, n_pts=5, start_us=500000)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat", time_step=0.0015)
assert fwh.time[0] == pytest.approx(0.5, rel=1e-6)
assert fwh.time[-1] == pytest.approx(0.5 + 9 * 0.0015, rel=1e-6)
assert len(fwh.time) == 10

def test_time_axis_no_dt(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=8, n_pts=3)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat")
np.testing.assert_array_equal(fwh.time, np.arange(8, dtype=float))

def test_pressure_perturbation_zero_mean(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=10, n_pts=5)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat")
p_prime = fwh.pressure_perturbation()
np.testing.assert_allclose(
p_prime.mean(axis=0), 0.0, atol=0.01
) # float32 quantisation

def test_pressure_perturbation_with_ref(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=10, n_pts=5)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat")
p_prime = fwh.pressure_perturbation(p_ref=101325.0)
np.testing.assert_allclose(p_prime, fwh.pressure - 101325.0, rtol=1e-6)

def test_file_not_found(self):
with pytest.raises(FileNotFoundError, match="not found"):
FWHData.from_file("/nonexistent/path/fwh_bin.dat")

def test_truncated_file_raises(self, tmp_path):
p = tmp_path / "fwh_bin.dat"
p.write_bytes(struct.pack("<3I", 0, 10, 5) + b"\x00" * 100)
with pytest.raises(ValueError, match="size mismatch"):
FWHData.from_file(p)

def test_repr(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat")
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat", time_step=0.0015)
r = repr(fwh)
assert "FWHData" in r
assert "n_timesteps=10" in r

def test_to_dict_keys(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat")
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat")
d = fwh.to_dict()
for key in ("n_timesteps", "n_points", "time", "pressure_mean", "pressure_rms"):
assert key in d

def test_time_mean_pressure_shape(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=10, n_pts=5)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat")
assert fwh.time_mean_pressure().shape == (5,)

def test_pressure_rms_shape(self, tmp_path):
make_synthetic_fwh(tmp_path / "fwh_bin.dat", n_t=10, n_pts=5)
fwh = FWHData.from_file(tmp_path / "fwh_bin.dat")
assert fwh.pressure_rms().shape == (5,)


@pytest.mark.skipif(
not REFERENCE_FILE.exists(),
reason="Reference fwh_bin.dat not found — run from repo root",
)
class TestFWHDataReference:
@pytest.fixture(scope="class")
def fwh(self):
return FWHData.from_file(REFERENCE_FILE, time_step=0.0015)

def test_shape(self, fwh):
assert fwh.n_timesteps == 50
assert fwh.n_points == 200
assert fwh.pressure.shape == (50, 200)

def test_pressure_range(self, fwh):
assert fwh.pressure.min() > 800.0
assert fwh.pressure.max() < 1200.0

def test_unsteady_variation(self, fwh):
assert fwh.pressure.std(axis=0).max() > 10.0

def test_time_step_spacing(self, fwh):
diffs = np.diff(fwh.time)
np.testing.assert_allclose(diffs, 0.0015, rtol=1e-6)

def test_pressure_rms_max(self, fwh):
assert fwh.pressure_rms().max() == pytest.approx(74.22, abs=0.5)
Loading