Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions python/lib/sift_client/_internal/util/hdf5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from __future__ import annotations

from pathlib import Path

import h5py
import numpy as np

from sift_client.sift_types.channel import ChannelDataType
from sift_client.sift_types.data_import import Hdf5DataColumn, Hdf5ImportConfig, TimeFormat

# Common HDF5 attribute names used to detect channel metadata.
_NAME_ATTRS = ["Name", "name", "Title", "title", "Sensor", "sensor", "Channel", "channel"]
_UNIT_ATTRS = ["Unit", "unit", "Units", "units"]
_DESCRIPTION_ATTRS = ["Description", "description"]

_NUMPY_TO_SIFT: dict[type, ChannelDataType] = {
np.bool_: ChannelDataType.BOOL,
np.int8: ChannelDataType.INT_32,
np.int16: ChannelDataType.INT_32,
np.int32: ChannelDataType.INT_32,
np.int64: ChannelDataType.INT_64,
np.uint8: ChannelDataType.UINT_32,
np.uint16: ChannelDataType.UINT_32,
np.uint32: ChannelDataType.UINT_32,
np.uint64: ChannelDataType.UINT_64,
np.float32: ChannelDataType.FLOAT,
np.float64: ChannelDataType.DOUBLE,
np.datetime64: ChannelDataType.INT_64,
np.complex64: ChannelDataType.FLOAT,
np.complex128: ChannelDataType.DOUBLE,
np.str_: ChannelDataType.STRING,
# HDF5/TDMS fixed-length strings are stored as np.bytes_; use STRING, not
# BYTES (np.void below handles truly opaque binary data).
np.bytes_: ChannelDataType.STRING,
Comment thread
wei-qlu marked this conversation as resolved.
# Numpy uses object dtype for variable-length strings; TDMS/HDF5 files
# cannot produce non-string object arrays.
np.object_: ChannelDataType.STRING,
np.void: ChannelDataType.BYTES,
}


def _detect_attr(dataset: h5py.Dataset, candidates: list[str], default: str = "") -> str:
"""Return the first matching HDF5 attribute value, or *default*."""
possible = [dataset.attrs.get(attr) for attr in candidates if dataset.attrs.get(attr)]
return str(possible[0]) if possible else default


def _numpy_to_sift_type(dtype: np.dtype) -> ChannelDataType:
"""Map a numpy dtype to a Sift ChannelDataType."""
sift_type = _NUMPY_TO_SIFT.get(dtype.type)
if sift_type is None:
raise ValueError(f"Unsupported numpy dtype: {dtype}")
return sift_type


def detect_hdf5_config(file_path: str | Path) -> Hdf5ImportConfig:
"""Detect an HDF5 import config by inspecting the file's datasets.

Traverses the HDF5 file and produces (time dataset, value dataset) pairs.
For compound datasets with multiple fields, the first field is assumed to
be time and remaining fields become value channels. For simple datasets,
a root-level ``time`` dataset is used if present.
"""
path = Path(file_path)

with h5py.File(path, "r") as h5file:
columns: list[Hdf5DataColumn] = []
seen_names: set[str] = set()
has_root_time = "time" in h5file

def _visit(dataset_name: str, obj: object) -> None:
if not isinstance(obj, h5py.Dataset):
return

# Skip root "time" dataset — it's used as the time source, not a value channel.
if dataset_name == "time" and obj.parent == h5file:
return

n_fields = len(obj.dtype.names) if obj.dtype.names else 0

if n_fields > 1:
# Compound type: first field is time, remaining are value channels.
for value_index in range(1, n_fields):
channel_name = _detect_attr(obj, _NAME_ATTRS, dataset_name)
if channel_name in seen_names:
channel_name = f"{channel_name}.{dataset_name}.{value_index}"

columns.append(
Hdf5DataColumn(
name=channel_name,
data_type=_numpy_to_sift_type(obj.dtype[value_index]),
units=_detect_attr(obj, _UNIT_ATTRS),
description=_detect_attr(obj, _DESCRIPTION_ATTRS),
time_dataset=dataset_name,
value_dataset=dataset_name,
time_index=0,
value_index=0,
time_field=obj.dtype.names[0],
value_field=obj.dtype.names[value_index],
)
)
seen_names.add(channel_name)

elif n_fields in (0, 1):
# Single column. Use root "time" as time dataset if available.
channel_name = _detect_attr(obj, _NAME_ATTRS, dataset_name)
if channel_name in seen_names:
channel_name = f"{channel_name}.{dataset_name}"

columns.append(
Hdf5DataColumn(
name=channel_name,
data_type=_numpy_to_sift_type(obj.dtype),
units=_detect_attr(obj, _UNIT_ATTRS),
description=_detect_attr(obj, _DESCRIPTION_ATTRS),
time_dataset="time" if has_root_time else "",
value_dataset=dataset_name,
time_index=0,
value_index=0,
)
)
seen_names.add(channel_name)

h5file.visititems(_visit)

return Hdf5ImportConfig(
asset_name="",
time_format=TimeFormat.ABSOLUTE_UNIX_NANOSECONDS,
data=columns,
)
132 changes: 132 additions & 0 deletions python/lib/sift_client/_tests/_internal/test_hdf5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""Tests for detect_hdf5_config."""

import h5py
import numpy as np
import pytest

from sift_client._internal.util.hdf5 import detect_hdf5_config
from sift_client.sift_types.channel import ChannelDataType


@pytest.fixture
def create_hdf5_file(tmp_path):
"""Return a helper that writes an HDF5 file and returns its path."""
file_path = tmp_path / "test.h5"

def _create(populate):
with h5py.File(file_path, "w") as hdf5_file:
populate(hdf5_file)
return file_path

return _create


class TestDetectHdf5Config:
def test_compound_dataset(self, create_hdf5_file):
"""Compound type: first field is time, remaining fields become value channels."""
compound_dtype = np.dtype([("timestamp_ns", "<i8"), ("voltage", "<f8"), ("current", "<f4")])

def populate(hdf5_file):
hdf5_file.create_dataset("sensors", shape=(10,), dtype=compound_dtype)

config = detect_hdf5_config(create_hdf5_file(populate))

assert len(config.data) == 2
assert config.data[0].time_field == "timestamp_ns"
assert config.data[0].value_field == "voltage"
assert config.data[0].data_type == ChannelDataType.DOUBLE
assert config.data[0].time_dataset == "sensors"
assert config.data[0].value_dataset == "sensors"

assert config.data[1].time_field == "timestamp_ns"
assert config.data[1].value_field == "current"
assert config.data[1].data_type == ChannelDataType.FLOAT

def test_single_column_with_root_time(self, create_hdf5_file):
"""Single-column datasets use root 'time' as time source when present."""

def populate(hdf5_file):
hdf5_file.create_dataset("time", data=np.arange(100, dtype="<i8"))
hdf5_file.create_dataset("voltage", data=np.random.rand(100).astype("<f8"))
hdf5_file.create_dataset("current", data=np.random.rand(100).astype("<f4"))

config = detect_hdf5_config(create_hdf5_file(populate))

assert len(config.data) == 2
for col in config.data:
assert col.time_dataset == "time"
assert col.time_field is None
assert col.value_field is None

def test_single_column_without_root_time(self, create_hdf5_file):
"""Without root 'time', time_dataset is empty string."""

def populate(hdf5_file):
hdf5_file.create_dataset("voltage", data=np.random.rand(10).astype("<f8"))

config = detect_hdf5_config(create_hdf5_file(populate))

assert len(config.data) == 1
assert config.data[0].time_dataset == ""
assert config.data[0].name == "voltage"

def test_root_time_skipped_as_value_channel(self, create_hdf5_file):
"""The root 'time' dataset must not appear as a value channel."""

def populate(hdf5_file):
hdf5_file.create_dataset("time", data=np.arange(10, dtype="<i8"))
hdf5_file.create_dataset("voltage", data=np.random.rand(10).astype("<f8"))

config = detect_hdf5_config(create_hdf5_file(populate))

channel_names = [col.name for col in config.data]
assert "time" not in channel_names
assert "voltage" in channel_names

def test_duplicate_name_deduplication(self, create_hdf5_file):
"""Duplicate channel names get a .{dataset_name} suffix."""

def populate(hdf5_file):
hdf5_file.create_dataset("time", data=np.arange(10, dtype="<i8"))
sensor_1 = hdf5_file.create_dataset(
"group1/sensor", data=np.random.rand(10).astype("<f8")
)
sensor_1.attrs["Name"] = "pressure"
sensor_2 = hdf5_file.create_dataset(
"group2/sensor", data=np.random.rand(10).astype("<f8")
)
sensor_2.attrs["Name"] = "pressure"

config = detect_hdf5_config(create_hdf5_file(populate))

channel_names = [col.name for col in config.data]
assert len(channel_names) == 2
assert len(set(channel_names)) == 2 # all unique
assert "pressure" in channel_names

def test_attribute_detection(self, create_hdf5_file):
"""Channel name, units, and description are read from HDF5 attributes."""

def populate(hdf5_file):
hdf5_file.create_dataset("time", data=np.arange(5, dtype="<i8"))
dataset = hdf5_file.create_dataset("raw_voltage", data=np.random.rand(5).astype("<f8"))
dataset.attrs["Name"] = "voltage"
dataset.attrs["Units"] = "V"
dataset.attrs["Description"] = "Supply voltage"

config = detect_hdf5_config(create_hdf5_file(populate))

assert len(config.data) == 1
assert config.data[0].name == "voltage"
assert config.data[0].units == "V"
assert config.data[0].description == "Supply voltage"

def test_unsupported_dtype_raises(self, create_hdf5_file):
"""Unsupported numpy dtypes raise ValueError rather than silently dropping data."""

def populate(hdf5_file):
hdf5_file.create_dataset("time", data=np.arange(5, dtype="<i8"))
hdf5_file.create_dataset("data", data=np.zeros(5, dtype=np.float16))

with pytest.raises(ValueError, match="Unsupported numpy dtype"):
detect_hdf5_config(create_hdf5_file(populate))
14 changes: 9 additions & 5 deletions python/lib/sift_client/resources/data_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from sift_client._internal.low_level_wrappers.data_imports import DataImportsLowLevelClient
from sift_client._internal.util.executor import run_sync_function
from sift_client._internal.util.file import extract_parquet_footer, upload_file
from sift_client._internal.util.hdf5 import detect_hdf5_config
from sift_client.resources._base import ResourceBase
from sift_client.sift_types.asset import Asset
from sift_client.sift_types.channel import ChannelDataType
Expand Down Expand Up @@ -61,8 +62,8 @@ async def import_from_path(
completion before proceeding.

When ``config`` is omitted the file format is auto-detected via
``detect_config`` (CSV and Parquet only). For other formats
(TDMS and HDF5), ``config`` must be provided.
``detect_config`` (CSV, Parquet, and HDF5). For other formats
(TDMS), ``config`` must be provided.
When ``asset`` is provided it overrides the config value;
otherwise the config's ``asset_name`` is used.
If neither ``run`` nor ``run_name`` is provided (and none is
Expand Down Expand Up @@ -198,9 +199,9 @@ async def detect_config(
is inferred from the file extension when ``data_type`` is not
provided.

Only CSV and Parquet files are currently supported for auto-detection.
For other formats (TDMS, HDF5), create the config manually
using ``TdmsImportConfig`` or ``Hdf5ImportConfig``.
CSV, Parquet, and HDF5 files are supported for auto-detection.
For other formats (TDMS), create the config manually
using ``TdmsImportConfig``.

For CSV files, the server scans the first two rows for an optional
JSON metadata row. Row 1 is checked first; row 2 is checked only
Expand Down Expand Up @@ -243,6 +244,9 @@ async def detect_config(

data_type_key = _resolve_data_type_key(path.suffix.lower(), data_type)

if data_type_key == DataTypeKey.HDF5:
return await run_sync_function(lambda: detect_hdf5_config(path))

is_parquet = data_type_key in (
DataTypeKey.PARQUET_FLATDATASET,
DataTypeKey.PARQUET_SINGLE_CHANNEL_PER_ROW,
Expand Down
10 changes: 5 additions & 5 deletions python/lib/sift_client/resources/sync_stubs/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -653,9 +653,9 @@ class DataImportAPI:
is inferred from the file extension when ``data_type`` is not
provided.

Only CSV and Parquet files are currently supported for auto-detection.
For other formats (TDMS, HDF5), create the config manually
using ``TdmsImportConfig`` or ``Hdf5ImportConfig``.
CSV, Parquet, and HDF5 files are supported for auto-detection.
For other formats (TDMS), create the config manually
using ``TdmsImportConfig``.

For CSV files, the server scans the first two rows for an optional
JSON metadata row. Row 1 is checked first; row 2 is checked only
Expand Down Expand Up @@ -733,8 +733,8 @@ class DataImportAPI:
completion before proceeding.

When ``config`` is omitted the file format is auto-detected via
``detect_config`` (CSV and Parquet only). For other formats
(TDMS and HDF5), ``config`` must be provided.
``detect_config`` (CSV, Parquet, and HDF5). For other formats
(TDMS), ``config`` must be provided.
When ``asset`` is provided it overrides the config value;
otherwise the config's ``asset_name`` is used.
If neither ``run`` nor ``run_name`` is provided (and none is
Expand Down
12 changes: 12 additions & 0 deletions python/lib/sift_client/sift_types/data_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,18 @@ class Hdf5ImportConfig(ImportConfigBase):
time_format: TimeFormat
relative_start_time: datetime | None = None

def __getitem__(self, name: str) -> Hdf5DataColumn:
"""Look up a data column by channel name.

Example::

config["temperature"].data_type = ChannelDataType.FLOAT
"""
for dc in self.data:
if dc.name == name:
return dc
raise KeyError(f"No data column named '{name}'")

@model_validator(mode="after")
def _check_relative_start_time(self) -> Hdf5ImportConfig:
if self.time_format.name.startswith("RELATIVE_") and self.relative_start_time is None:
Expand Down
8 changes: 6 additions & 2 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ file-imports = [
]
hdf5 = [
'h5py~=3.11',
'polars~=1.8',
'polars~=1.8', # only used by sift_py; remove once sift_py is fully deprecated
]
openssl = [
'cffi~=1.14',
Expand Down Expand Up @@ -219,7 +219,7 @@ openssl = ["pyOpenSSL<24.0.0", "types-pyOpenSSL<24.0.0", "cffi~=1.14"]
tdms = ["npTDMS~=1.9"]
rosbags = ["rosbags~=0.0"]
sift-stream = ["sift-stream-bindings==0.2.2"]
hdf5 = ["h5py~=3.11", "polars~=1.8"]
hdf5 = ["h5py~=3.11", "polars~=1.8"] # polars is only used by sift_py; remove once sift_py is fully deprecated
data-review = ["pyarrow>=17.0.0"]

[tool.sift.extras.combine]
Expand Down Expand Up @@ -282,6 +282,10 @@ exclude = [

# No official typing stubs for Python gRPC libraries yet.
# https://github.com/grpc/grpc/issues/29041
[[tool.mypy.overrides]]
module = "h5py"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "grpc_testing"
ignore_missing_imports = true
Expand Down
Loading