From 04b9999d5b0be958a619cb9de86704973fcb9c22 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Wed, 3 Jun 2026 08:55:47 +0200 Subject: [PATCH 01/12] Make DSL kernels first-class CTable columns @blosc2.dsl_kernel-decorated functions can now back both virtual computed columns (add_computed_column) and stored generated columns (add_generated_column), survive save/open round-trips, and be referenced inside where() predicates. API: - add_computed_column(name, kernel, inputs=[...], dtype=None) binds one stored scalar column per kernel parameter; the callable form returning a blosc2.lazyudf(...) is also accepted. - add_generated_column(..., values=kernel, inputs=[...]) adds a stored DSL column (new transformer_kind="dsl") with append/extend auto-fill, refresh_generated_column, and indexing. - dtype is inferred by NumPy type promotion of the input column dtypes when omitted; pass dtype explicitly for type-changing kernels (comparisons/casts). Internals: - Factor the safe-exec DSL reconstruction into dsl_kernel.kernel_from_source() and route b2objects.decode_structured_lazyudf through it. - Persist computed entries as kind:"dsl" + dsl_source (no expression); rebuild the kernel and LazyUDF on open. - DSL computed columns materialize via LazyUDF.compute() on access: the miniexpr DSL path only supports full-array getitem, so reads/where() cannot slice lazily. Materializing also lets a DSL column join where() as a plain NDArray operand (chunked staged co-evaluation, not single-kernel fusion). - Guard the LazyExpr-only sites (compact, info display, materialize, per-row access, sort, index reuse) and the three materialized eval paths (row/batch autofill, refresh). Pad length-1 DSL batches to 2 and slice back, since miniexpr rejects shape-(1,) inputs. Add tests/ctable/test_ctable_dsl_columns.py (20 tests) covering values, dtype inference, partial slicing, where() (incl. multi-chunk streaming), persistence, compact, materialize, and generated-column autofill/refresh/index. --- src/blosc2/b2objects.py | 17 +- src/blosc2/ctable.py | 415 +++++++++++++++++++++++++++++++++------ src/blosc2/dsl_kernel.py | 50 +++++ 3 files changed, 403 insertions(+), 79 deletions(-) diff --git a/src/blosc2/b2objects.py b/src/blosc2/b2objects.py index 50c583e60..375a1c9f3 100644 --- a/src/blosc2/b2objects.py +++ b/src/blosc2/b2objects.py @@ -7,9 +7,7 @@ from __future__ import annotations -import builtins import inspect -import linecache import pathlib import textwrap from dataclasses import asdict @@ -18,7 +16,7 @@ import numpy as np import blosc2 -from blosc2.dsl_kernel import DSLKernel +from blosc2.dsl_kernel import DSLKernel, kernel_from_source _B2OBJECT_META_KEY = "b2o" _B2OBJECT_VERSION = 1 @@ -190,18 +188,7 @@ def decode_structured_lazyudf(payload, *, carrier_path=None): if not isinstance(kwargs, dict): raise TypeError("Structured LazyUDF payload requires a mapping 'kwargs'") - local_ns = {} - filename = f"<{name}>" - safe_globals = { - "__builtins__": {k: v for k, v in builtins.__dict__.items() if k != "__import__"}, - "np": np, - "blosc2": blosc2, - } - linecache.cache[filename] = (len(udf_source), None, udf_source.splitlines(True), filename) - exec(compile(udf_source, filename, "exec"), safe_globals, local_ns) - func = local_ns[name] - if not isinstance(func, DSLKernel): - func = DSLKernel(func) + func = kernel_from_source(udf_source, name) ordered_operands_payload = {f"o{n}": operands_payload[f"o{n}"] for n in range(len(operands_payload))} operands, missing_ops = decode_operand_mapping(ordered_operands_payload, base_path=carrier_path) if missing_ops: diff --git a/src/blosc2/ctable.py b/src/blosc2/ctable.py index 9bf226a33..9b364eb27 100644 --- a/src/blosc2/ctable.py +++ b/src/blosc2/ctable.py @@ -759,7 +759,7 @@ def __init__(self, table: CTable, col_name: str, mask=None): def _raw_col(self): cc = self._table._computed_cols.get(self._col_name) if cc is not None: - return cc["lazy"] + return self._table._build_computed_lazy(cc) return self._table._cols[self._col_name] @property @@ -4150,7 +4150,7 @@ def _normalize_scalar_value(value): def _physical_row_value(self, col_name: str, pos: int): cc = self._computed_cols.get(col_name) if cc is not None: - return self._normalize_scalar_value(np.asarray(cc["lazy"][pos]).ravel()[0]) + return self._normalize_scalar_value(np.asarray(self._build_computed_lazy(cc)[pos]).ravel()[0]) value = self._normalize_scalar_value(self._cols[col_name][pos]) spec = self._schema.columns_by_name[col_name].spec if isinstance(spec, timestamp): @@ -7769,13 +7769,18 @@ def _col_dtype(self, name: str) -> np.dtype | None: @staticmethod def _readable_computed_expr(cc: dict) -> str: - """Return the expression string with ``o0``, ``o1``, … replaced by - their actual column names, for human-readable display. + """Return a human-readable description of a computed column. - Example: ``"(o0 * o1)"`` with ``col_deps=["price", "qty"]`` - becomes ``"(price * qty)"``. + For expression columns the stored string has ``o0``, ``o1``, … replaced + by their actual column names (``"(o0 * o1)"`` with + ``col_deps=["price", "qty"]`` → ``"(price * qty)"``). For DSL columns a + ``kernel(dep0, dep1)`` call label is returned. """ col_deps = cc["col_deps"] + if cc.get("kind") == "dsl": + kernel = cc.get("kernel") + kname = getattr(kernel, "__name__", "dsl_kernel") + return f"{kname}({', '.join(col_deps)})" def _sub(m: re.Match) -> str: idx = int(m.group(1)) @@ -7789,8 +7794,9 @@ def _fetch_col_at_positions(self, name: str, positions: np.ndarray): if cc is not None: if len(positions) == 0: return np.array([], dtype=cc["dtype"]) + lazy = self._build_computed_lazy(cc) return np.array( - [np.asarray(cc["lazy"][int(p)]).ravel()[0] for p in positions], + [np.asarray(lazy[int(p)]).ravel()[0] for p in positions], dtype=cc["dtype"], ) self._ensure_generated_column_not_stale(name) @@ -7814,15 +7820,29 @@ def _schema_dict_with_computed(self) -> dict: d["create_summary_index"] = getattr(self, "_create_summary_index", True) d["summary_indexes_built"] = getattr(self, "_summary_indexes_built", False) if self._computed_cols: - d["computed_columns"] = [ - { - "name": name, - "expression": cc["expression"], - "col_deps": cc["col_deps"], - "dtype": str(cc["dtype"]), - } - for name, cc in self._computed_cols.items() - ] + computed = [] + for name, cc in self._computed_cols.items(): + if cc.get("kind") == "dsl": + computed.append( + { + "name": name, + "kind": "dsl", + "dsl_source": cc["dsl_source"], + "col_deps": cc["col_deps"], + "dtype": str(cc["dtype"]), + } + ) + else: + computed.append( + { + "name": name, + "kind": "expression", + "expression": cc["expression"], + "col_deps": cc["col_deps"], + "dtype": str(cc["dtype"]), + } + ) + d["computed_columns"] = computed if self._materialized_cols: materialized = [] for name, meta in self._materialized_cols.items(): @@ -7837,6 +7857,8 @@ def _schema_dict_with_computed(self) -> dict: } if "transformer" in meta: entry["transformer"] = meta["transformer"] + if meta.get("dsl_source") is not None: + entry["dsl_source"] = meta["dsl_source"] materialized.append(entry) d["materialized_columns"] = materialized return d @@ -8023,17 +8045,30 @@ def _load_computed_cols_from_schema(self, schema_dict: dict) -> None: """ for cc_meta in schema_dict.get("computed_columns", []): name = cc_meta["name"] - expression = cc_meta["expression"] col_deps = cc_meta["col_deps"] dtype = np.dtype(cc_meta["dtype"]) - operands = {f"o{i}": self._cols[dep] for i, dep in enumerate(col_deps)} - lazy = blosc2.lazyexpr(expression, operands) - self._computed_cols[name] = { - "expression": expression, - "col_deps": col_deps, - "lazy": lazy, - "dtype": dtype, - } + if cc_meta.get("kind") == "dsl": + from blosc2.dsl_kernel import kernel_from_source + + dsl_source = cc_meta["dsl_source"] + self._computed_cols[name] = { + "kind": "dsl", + "dsl_source": dsl_source, + "kernel": kernel_from_source(dsl_source), + "col_deps": col_deps, + "dtype": dtype, + } + else: + expression = cc_meta["expression"] + operands = {f"o{i}": self._cols[dep] for i, dep in enumerate(col_deps)} + lazy = blosc2.lazyexpr(expression, operands) + self._computed_cols[name] = { + "kind": "expression", + "expression": expression, + "col_deps": col_deps, + "lazy": lazy, + "dtype": dtype, + } self.col_names.append(name) self._col_widths[name] = max(len(name), 15) @@ -8050,6 +8085,8 @@ def _load_materialized_cols_from_schema(self, schema_dict: dict) -> None: } if "transformer" in meta: loaded["transformer"] = dict(meta["transformer"]) + if meta.get("dsl_source") is not None: + loaded["dsl_source"] = meta["dsl_source"] self._materialized_cols[meta["name"]] = loaded def _require_computed_column(self, name: str) -> dict: @@ -8075,6 +8112,9 @@ def _autofill_materialized_row_values(self, row: dict[str, Any]) -> dict[str, An if meta.get("transformer_kind") == "row_transformer": transformer = RowTransformer.from_metadata(meta["transformer"]) row[name] = np.asarray(transformer.evaluate_row(row), dtype=meta["dtype"]) + elif meta.get("transformer_kind") == "dsl": + single = {dep: [row[dep]] for dep in meta["col_deps"]} + row[name] = self._evaluate_dsl_materialized_batch(meta, single)[0] else: operands = {f"o{i}": np.asarray([row[dep]]) for i, dep in enumerate(meta["col_deps"])} values = blosc2.lazyexpr(meta["expression"], operands)[:] @@ -8119,6 +8159,8 @@ def _autofill_materialized_batch_columns( if meta.get("transformer_kind") == "row_transformer": transformer = RowTransformer.from_metadata(meta["transformer"]) values = transformer.evaluate_batch(raw_columns) + elif meta.get("transformer_kind") == "dsl": + values = self._evaluate_dsl_materialized_batch(meta, raw_columns) else: operands = { f"o{i}": blosc2.asarray(raw_columns[dep], dtype=self._cols[dep].dtype) @@ -8223,8 +8265,10 @@ def _fill_stored_column_from_computed( ) -> None: """Evaluate computed column *computed_name* into stored column *target_name*.""" cc = self._require_computed_column(computed_name) - operands = {f"o{i}": self._cols[dep] for i, dep in enumerate(cc["col_deps"])} - lazy = blosc2.lazyexpr(cc["expression"], operands) + # Expression entries yield a LazyExpr (streamed per slice); DSL entries + # yield a fully materialized NDArray (the miniexpr DSL path cannot do + # partial-slice getitem). Both support the slicing used below. + lazy = self._build_computed_lazy(cc) capacity = len(self._valid_rows) step = int(self._valid_rows.chunks[0]) if self._valid_rows.chunks else 65536 @@ -8295,12 +8339,23 @@ def materialize_computed_column( target_dtype = np.dtype(dtype) if dtype is not None else np.dtype(cc["dtype"]) self._create_empty_stored_column(target_name, target_dtype, cparams=cparams) - self._materialized_cols[target_name] = { - "computed_column": name, - "expression": cc["expression"], - "col_deps": list(cc["col_deps"]), - "dtype": target_dtype, - } + if cc.get("kind") == "dsl": + self._materialized_cols[target_name] = { + "computed_column": name, + "expression": None, + "dsl_source": cc["dsl_source"], + "col_deps": list(cc["col_deps"]), + "dtype": target_dtype, + "transformer_kind": "dsl", + "stale": False, + } + else: + self._materialized_cols[target_name] = { + "computed_column": name, + "expression": cc["expression"], + "col_deps": list(cc["col_deps"]), + "dtype": target_dtype, + } if isinstance(self._storage, FileTableStorage): self._storage.save_schema(self._schema_dict_with_computed()) try: @@ -8350,6 +8405,121 @@ def _normalize_expression_transformer(self, expr) -> tuple[blosc2.LazyExpr, list col_deps.append(cname) return lazy, col_deps + def _validate_transformer_dep(self, cname: str) -> blosc2.NDArray: + """Validate that *cname* is a stored scalar column usable as a transformer + operand and return its backing NDArray.""" + if cname not in self._cols: + raise ValueError(f"Column {cname!r} is not a stored column of this table.") + self._ensure_generated_column_not_stale(cname) + col_info = self._schema.columns_by_name.get(cname) + if col_info is not None and self._is_ndarray_column(col_info): + raise TypeError( + f"Column {cname!r} is a fixed-shape ndarray column. DSL kernels only " + "support scalar columns as inputs." + ) + return self._cols[cname] + + def _dsl_deps_from_lazyudf(self, lazyudf) -> list[str]: + """Return the stored-column names backing a DSL LazyUDF's inputs, in order.""" + owned_ids = {id(arr): cname for cname, arr in self._cols.items()} + col_deps = [] + for i, arr in enumerate(lazyudf.inputs): + cname = owned_ids.get(id(arr)) + if cname is None: + raise ValueError( + f"Input {i} of the DSL kernel does not reference a stored column of this table." + ) + self._validate_transformer_dep(cname) + col_deps.append(cname) + return col_deps + + def _resolve_dsl_kernel(self, kernel, inputs) -> tuple[Any, list[str]]: + """Validate a bare DSL kernel + its ``inputs`` column bindings.""" + if kernel.dsl_error is not None: + raise blosc2.DSLSyntaxError(f"Invalid DSL kernel: {kernel.dsl_error}") + if inputs is None: + raise TypeError( + "A DSL kernel passed directly requires inputs=[...] naming one source " + "column per kernel parameter." + ) + col_deps = list(inputs) + expected = kernel.input_names + if expected is not None and len(col_deps) != len(expected): + raise ValueError( + f"DSL kernel expects {len(expected)} input(s) {expected}, " + f"but inputs={col_deps} provides {len(col_deps)}." + ) + for d in col_deps: + self._validate_transformer_dep(d) + return kernel, col_deps + + def _normalize_transformer(self, expr, inputs=None) -> dict: + """Resolve *expr* into a transformer descriptor. + + Returns one of:: + + {"kind": "expression", "lazy": , "col_deps": [...]} + {"kind": "dsl", "kernel": , "col_deps": [...]} + + A ``@blosc2.dsl_kernel`` is accepted either directly (with *inputs* + naming one source column per kernel parameter) or as a ``LazyUDF`` + returned by a callable (then operands are matched to columns by + identity and *inputs* is ignored). + """ + if isinstance(expr, blosc2.DSLKernel): + kernel, col_deps = self._resolve_dsl_kernel(expr, inputs) + return {"kind": "dsl", "kernel": kernel, "col_deps": col_deps} + # Resolve a callable once (a lambda may return a LazyExpr or a LazyUDF). + obj = expr(self._cols) if (callable(expr) and not isinstance(expr, blosc2.LazyExpr)) else expr + if isinstance(obj, blosc2.LazyUDF): + if not isinstance(obj.func, blosc2.DSLKernel): + raise TypeError( + "Only LazyUDFs backed by a @blosc2.dsl_kernel are supported as CTable columns." + ) + kernel = obj.func + if kernel.dsl_error is not None: + raise blosc2.DSLSyntaxError(f"Invalid DSL kernel: {kernel.dsl_error}") + return {"kind": "dsl", "kernel": kernel, "col_deps": self._dsl_deps_from_lazyudf(obj)} + lazy, col_deps = self._normalize_expression_transformer(obj) + return {"kind": "expression", "lazy": lazy, "col_deps": col_deps} + + def _dsl_result_dtype(self, kernel, col_deps, dtype): + """Resolve the result dtype for a DSL column. + + When *dtype* is omitted it is inferred by NumPy type promotion of the + dependency column dtypes — correct for elementwise arithmetic kernels. + Kernels that change the type (comparisons/``where``/explicit casts) + should pass *dtype* explicitly. + """ + if dtype is not None: + return np.dtype(dtype) + dep_dtypes = [self._cols[d].dtype for d in col_deps] + if not dep_dtypes: + raise TypeError( + f"Cannot infer dtype for DSL kernel {getattr(kernel, '__name__', '?')!r} " + "with no column inputs; pass dtype=... explicitly." + ) + return np.result_type(*dep_dtypes) + + def _build_computed_lazy(self, cc: dict): + """Return the readable array-like for a computed-column entry *cc*. + + Expression entries return their cached :class:`blosc2.LazyExpr` (which + supports partial-slice evaluation directly). DSL entries build a fresh + :class:`blosc2.LazyUDF` from the current column NDArrays and **eagerly + materialize** it to a concrete :class:`blosc2.NDArray` via + ``compute()``: the miniexpr DSL path only supports full-array getitem, + so a partial slice (used by reads and by ``where()`` per-chunk operand + access) cannot be evaluated lazily. Materializing also lets a DSL + computed column participate in ``where()`` as a plain NDArray operand + (the all-NDArray miniexpr fast path). The full column is recomputed on + each access — acceptable for a virtual, unstored column. + """ + if cc.get("kind") == "dsl": + operands = tuple(self._cols[d] for d in cc["col_deps"]) + return blosc2.lazyudf(cc["kernel"], operands, dtype=cc["dtype"]).compute() + return cc["lazy"] + def _evaluate_expression_materialized_batch( self, meta: dict, raw_columns: Mapping[str, Any] ) -> np.ndarray: @@ -8360,6 +8530,33 @@ def _evaluate_expression_materialized_batch( values = blosc2.lazyexpr(meta["expression"], operands)[:] return np.asarray(values, dtype=meta["dtype"]) + def _materialized_dsl_kernel(self, meta: dict): + """Return the (cached) DSLKernel for a ``transformer_kind == "dsl"`` entry.""" + kernel = meta.get("_kernel") + if kernel is None: + from blosc2.dsl_kernel import kernel_from_source + + kernel = kernel_from_source(meta["dsl_source"]) + meta["_kernel"] = kernel # not serialized (schema dump emits known keys only) + return kernel + + def _evaluate_dsl_materialized_batch(self, meta: dict, raw_columns: Mapping[str, Any]) -> np.ndarray: + kernel = self._materialized_dsl_kernel(meta) + arrays = [np.asarray(raw_columns[dep], dtype=self._cols[dep].dtype) for dep in meta["col_deps"]] + out_dtype = np.dtype(meta["dtype"]) + n = len(arrays[0]) if arrays else 0 + if n == 0: + return np.asarray([], dtype=out_dtype) + # The DSL miniexpr path rejects length-1 (shape ``(1,)``) inputs as + # "scalar-like"; pad to length 2 and slice the result back. + pad = n == 1 + if pad: + arrays = [np.concatenate([arr, arr]) for arr in arrays] + operands = tuple(blosc2.asarray(arr) for arr in arrays) + result = blosc2.lazyudf(kernel, operands, dtype=out_dtype).compute()[:] + result = np.asarray(result, dtype=out_dtype) + return result[:1] if pad else result + def _generated_dependency_closure(self, source: str) -> set[str]: """Return generated columns transitively depending on *source*.""" affected: set[str] = set() @@ -8396,6 +8593,9 @@ def refresh_generated_column(self, name: str) -> None: if meta.get("transformer_kind") == "row_transformer": transformer = RowTransformer.from_metadata(meta["transformer"]) values = np.asarray(transformer.evaluate_existing(self), dtype=meta["dtype"]) + elif meta.get("transformer_kind") == "dsl": + raw_columns = {dep: self[dep][:] for dep in meta["col_deps"]} + values = self._evaluate_dsl_materialized_batch(meta, raw_columns) else: raw_columns = {dep: self[dep][:] for dep in meta["col_deps"]} values = self._evaluate_expression_materialized_batch(meta, raw_columns) @@ -8418,9 +8618,14 @@ def add_generated_column( # noqa: C901 self, name: str, *, - values: str | blosc2.LazyExpr | Callable[[dict[str, Any]], blosc2.LazyExpr] | RowTransformer, + values: str + | blosc2.LazyExpr + | blosc2.DSLKernel + | Callable[[dict[str, Any]], blosc2.LazyExpr] + | RowTransformer, dtype=None, create_index: bool = False, + inputs: list[str] | None = None, ) -> None: """Add a stored generated column maintained by the table. @@ -8437,6 +8642,7 @@ def add_generated_column( # noqa: C901 add_generated_column(name, *, values="price * qty", dtype=..., create_index=False) add_generated_column(name, *, values=lazy_expr, dtype=..., create_index=False) add_generated_column(name, *, values=lambda cols: cols["price"] * 1.21, dtype=...) + add_generated_column(name, *, values=dsl_kernel, inputs=["price", "qty"], dtype=...) add_generated_column(name, *, values=t.embedding.row_transformer.norm(axis=0), dtype=...) add_generated_column(name, *, values=t.image.row_transformer.mean(axis=(0, 1)), dtype=blosc2.ndarray((3,), dtype=...)) @@ -8456,7 +8662,14 @@ def add_generated_column( # noqa: C901 * :class:`blosc2.LazyExpr`: scalar lazy expression over stored columns of this table. It must produce a 1-D scalar stream. * callable: called as ``values(self._cols)`` and must return a - :class:`blosc2.LazyExpr` over stored columns of this table. + :class:`blosc2.LazyExpr` (or a :class:`blosc2.LazyUDF`) over stored + columns of this table. + * :func:`blosc2.dsl_kernel`-decorated kernel: pass it directly with + ``inputs=[...]`` naming one stored scalar column per kernel + parameter. Produces one scalar per row. The kernel source is + persisted and recompiled on open; appended rows are auto-filled + and :meth:`refresh_generated_column` recomputes after in-place + edits. * :class:`RowTransformer`: row-wise projection/reduction bound to a fixed-shape ndarray column, e.g. ``t.embedding.row_transformer.norm(axis=0)`` or @@ -8464,8 +8677,8 @@ def add_generated_column( # noqa: C901 may produce either one scalar per row or one fixed-shape ndarray item per row. - Expression forms currently cannot depend on computed columns and - cannot directly consume fixed-shape ndarray columns; use a + Expression and DSL forms currently cannot depend on computed columns + and cannot directly consume fixed-shape ndarray columns; use a row-transformer for ndarray row projections/reductions. dtype: Output schema or dtype. Scalar outputs may pass a NumPy dtype or a @@ -8480,6 +8693,11 @@ def add_generated_column( # noqa: C901 Only scalar generated columns can be indexed; fixed-shape ndarray generated columns raise :class:`ValueError` when indexing is requested. + inputs: + Only used when *values* is a :func:`blosc2.dsl_kernel`-decorated + kernel passed directly: a list of stored scalar column names, one per + kernel parameter, bound positionally. Ignored for other *values* + forms. Examples -------- @@ -8586,8 +8804,35 @@ def add_generated_column( # noqa: C901 "transformer": transformer.to_metadata(), "stale": False, } + elif (desc := self._normalize_transformer(values, inputs))["kind"] == "dsl": + kernel = desc["kernel"] + col_deps = desc["col_deps"] + compute_dtype = ( + np.dtype(getattr(dtype, "dtype", dtype)) + if dtype is not None + else self._dsl_result_dtype(kernel, col_deps, None) + ) + operands = tuple(self._cols[d] for d in col_deps) + generated_values = np.asarray(blosc2.lazyudf(kernel, operands, dtype=compute_dtype).compute()[:]) + if generated_values.ndim != 1: + raise TypeError("DSL generated columns must produce a 1-D scalar result.") + generated_values = ( + generated_values[self._valid_rows[:]] + if len(generated_values) == len(self._valid_rows) + else generated_values + ) + spec = self._coerce_generated_spec(dtype, generated_values) + metadata = { + "computed_column": None, + "expression": None, + "dsl_source": kernel.dsl_source, + "col_deps": col_deps, + "dtype": np.dtype(spec.dtype), + "transformer_kind": "dsl", + "stale": False, + } else: - lazy, col_deps = self._normalize_expression_transformer(values) + lazy, col_deps = desc["lazy"], desc["col_deps"] generated_values = np.asarray(lazy[:]) if generated_values.ndim != 1: raise TypeError("Expression generated columns must produce a 1-D scalar result.") @@ -8634,9 +8879,10 @@ def add_generated_column( # noqa: C901 def add_computed_column( self, name: str, - expr: str | blosc2.LazyExpr | Callable[[dict[str, Any]], blosc2.LazyExpr], + expr: str | blosc2.LazyExpr | blosc2.DSLKernel | Callable[[dict[str, Any]], blosc2.LazyExpr], *, dtype: np.dtype | None = None, + inputs: list[str] | None = None, ) -> None: """Add a read-only virtual column computed from stored columns. @@ -8653,6 +8899,7 @@ def add_computed_column( add_computed_column(name, "price * qty", dtype=None) add_computed_column(name, lazy_expr, dtype=None) add_computed_column(name, lambda cols: cols["price"] * cols["qty"], dtype=None) + add_computed_column(name, dsl_kernel, inputs=["price", "qty"], dtype=None) Parameters ---------- @@ -8667,7 +8914,15 @@ def add_computed_column( * :class:`blosc2.LazyExpr`: lazy expression over stored columns of this table. * callable: called as ``expr(self._cols)`` and must return a - :class:`blosc2.LazyExpr` over stored columns of this table. + :class:`blosc2.LazyExpr` (or a :class:`blosc2.LazyUDF`) over stored + columns of this table. + * :func:`blosc2.dsl_kernel`-decorated kernel: pass it directly with + ``inputs=[...]`` naming one stored scalar column per kernel + parameter. The kernel may use loops, ``if``/``else`` and + ``where(...)``. DSL columns are persisted (their source is stored + and recompiled on open) and may be referenced inside + :meth:`where` predicates. Their values are recomputed on each + access (the column stays virtual/unstored). Expressions must depend only on stored columns of this table; computed columns cannot depend on other computed columns in this @@ -8676,10 +8931,19 @@ def add_computed_column( reductions, use :meth:`add_generated_column` with ``values=t.ndarray_col.row_transformer...``. dtype: - Optional dtype override for the computed values. When omitted, the - dtype is inferred from the resulting :class:`blosc2.LazyExpr`. - This changes the dtype reported by the CTable column wrapper; it - does not create physical storage. + Optional dtype override for the computed values. For expression + forms it is inferred from the resulting :class:`blosc2.LazyExpr` + when omitted. For a DSL kernel passed directly, an omitted dtype is + inferred by NumPy type promotion of the input column dtypes (correct + for elementwise arithmetic kernels); pass *dtype* explicitly for + kernels that change the type (comparisons/``where``/casts) or when + the kernel has no column inputs. This changes the dtype reported by + the CTable column wrapper; it does not create physical storage. + inputs: + Only used when *expr* is a :func:`blosc2.dsl_kernel`-decorated kernel + passed directly: a list of stored scalar column names, one per kernel + parameter, bound positionally (kernel parameter ``i`` ← ``inputs[i]``). + Ignored for the other *expr* forms. Examples -------- @@ -8756,15 +9020,26 @@ def ratio(num, den): if name in self._computed_cols: raise ValueError(f"A computed column named {name!r} already exists.") - lazy, col_deps = self._normalize_expression_transformer(expr) - result_dtype = np.dtype(dtype) if dtype is not None else lazy.dtype - - self._computed_cols[name] = { - "expression": lazy.expression, - "col_deps": col_deps, - "lazy": lazy, - "dtype": result_dtype, - } + desc = self._normalize_transformer(expr, inputs) + if desc["kind"] == "dsl": + kernel = desc["kernel"] + col_deps = desc["col_deps"] + self._computed_cols[name] = { + "kind": "dsl", + "dsl_source": kernel.dsl_source, + "kernel": kernel, + "col_deps": col_deps, + "dtype": self._dsl_result_dtype(kernel, col_deps, dtype), + } + else: + lazy = desc["lazy"] + self._computed_cols[name] = { + "kind": "expression", + "expression": lazy.expression, + "col_deps": desc["col_deps"], + "lazy": lazy, + "dtype": np.dtype(dtype) if dtype is not None else lazy.dtype, + } self.col_names.append(name) self._col_widths[name] = max(len(name), 15) @@ -9151,7 +9426,7 @@ def _sorted_positions_from_full_index(self, name: str, ascending: bool) -> np.nd target.get("source") == "expression" and candidate.get("kind") == "full" and not candidate.get("stale", False) - and target.get("expression_key") == cc["expression"] + and target.get("expression_key") == cc.get("expression") and list(target.get("dependencies", [])) == list(cc["col_deps"]) ): descriptor = candidate @@ -9210,7 +9485,7 @@ def _build_lex_keys( cc = self._computed_cols.get(name) if cc is not None: # Materialise computed column values at live positions - raw = np.asarray(cc["lazy"][:])[live_pos] + raw = np.asarray(self._build_computed_lazy(cc)[:])[live_pos] else: col_info = self._schema.columns_by_name.get(name) if col_info is not None and self._is_dictionary_column(col_info): @@ -9694,14 +9969,26 @@ def _empty_copy( # Rebuild computed columns with the new NDArray objects as operands obj._computed_cols = {} for cc_name, cc in self._computed_cols.items(): - operands = {f"o{i}": new_cols[dep] for i, dep in enumerate(cc["col_deps"])} - new_lazy = blosc2.lazyexpr(cc["expression"], operands) - obj._computed_cols[cc_name] = { - "expression": cc["expression"], - "col_deps": cc["col_deps"], - "lazy": new_lazy, - "dtype": cc["dtype"], - } + if cc.get("kind") == "dsl": + # DSL entries hold the live kernel; the LazyUDF is rebuilt on + # demand from obj._cols, so no operand rebinding is needed here. + obj._computed_cols[cc_name] = { + "kind": "dsl", + "dsl_source": cc["dsl_source"], + "kernel": cc["kernel"], + "col_deps": cc["col_deps"], + "dtype": cc["dtype"], + } + else: + operands = {f"o{i}": new_cols[dep] for i, dep in enumerate(cc["col_deps"])} + new_lazy = blosc2.lazyexpr(cc["expression"], operands) + obj._computed_cols[cc_name] = { + "kind": "expression", + "expression": cc["expression"], + "col_deps": cc["col_deps"], + "lazy": new_lazy, + "dtype": cc["dtype"], + } obj.col_names.append(cc_name) obj._col_widths.setdefault(cc_name, max(len(cc_name), 15)) obj._n_rows = 0 @@ -10236,7 +10523,7 @@ def _where_expression_operands(self) -> dict[str, blosc2.NDArray | blosc2.LazyEx or self._is_ndarray_column(col) ): operands[name] = arr - operands.update({name: cc["lazy"] for name, cc in self._computed_cols.items()}) + operands.update({name: self._build_computed_lazy(cc) for name, cc in self._computed_cols.items()}) return operands def _rewrite_nested_expression( diff --git a/src/blosc2/dsl_kernel.py b/src/blosc2/dsl_kernel.py index 6919fb938..d4ad0309c 100644 --- a/src/blosc2/dsl_kernel.py +++ b/src/blosc2/dsl_kernel.py @@ -618,6 +618,56 @@ def validate_dsl(func): } +def kernel_from_source(source: str, name: str | None = None) -> DSLKernel: + """Reconstruct a :class:`DSLKernel` from its stored source text. + + Executes *source* in a restricted namespace (builtins minus ``__import__``, + plus ``np`` and ``blosc2``), extracts the defined function, and wraps it in + a :class:`DSLKernel`. This is the inverse of persisting + :attr:`DSLKernel.dsl_source` and is shared by the persisted-``LazyUDF`` + decoder and the CTable DSL-column loaders. + + Parameters + ---------- + source + Complete, standalone function-definition source (as produced by + :attr:`DSLKernel.dsl_source` or :func:`inspect.getsource`). + name + Name of the function to extract from *source*. When omitted, the + single top-level function definition in *source* is used. + """ + import builtins + import linecache + + import numpy as np + + import blosc2 + + if name is None: + tree = ast.parse(source) + func_defs = [n for n in tree.body if isinstance(n, ast.FunctionDef)] + if len(func_defs) != 1: + raise ValueError( + "kernel_from_source requires an explicit 'name' when 'source' does not " + "contain exactly one top-level function definition" + ) + name = func_defs[0].name + + local_ns: dict = {} + filename = f"<{name}>" + safe_globals = { + "__builtins__": {k: v for k, v in builtins.__dict__.items() if k != "__import__"}, + "np": np, + "blosc2": blosc2, + } + linecache.cache[filename] = (len(source), None, source.splitlines(True), filename) + exec(compile(source, filename, "exec"), safe_globals, local_ns) + func = local_ns[name] + if not isinstance(func, DSLKernel): + func = DSLKernel(func) + return func + + class DSLBuilder: _binop_map: ClassVar[dict[type[ast.operator], str]] = { ast.Add: "+", From cf763860bf287354bfac106d0aa36a729e4f7ecc Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Wed, 3 Jun 2026 13:26:29 +0200 Subject: [PATCH 02/12] Usability improvements for UDF cols; new example of how they work --- examples/ctable/udf-computed-col.py | 142 ++++++++++++++++++++++++++++ src/blosc2/ctable.py | 89 +++++++++-------- src/blosc2/lazyexpr.py | 22 ++++- 3 files changed, 213 insertions(+), 40 deletions(-) create mode 100644 examples/ctable/udf-computed-col.py diff --git a/examples/ctable/udf-computed-col.py b/examples/ctable/udf-computed-col.py new file mode 100644 index 000000000..c2536893c --- /dev/null +++ b/examples/ctable/udf-computed-col.py @@ -0,0 +1,142 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +# DSL-kernel computed columns: virtual columns backed by a +# @blosc2.dsl_kernel function instead of a LazyExpr string. +# +# Advantages over plain expression strings: +# - full Python control flow (loops, if/else, where(...)) +# - validated at decoration time; errors surface immediately +# - source is persisted and recompiled on open — no extra setup needed +# +# This example shows: +# 1. Adding a DSL computed column with add_computed_column() +# 2. Save / open round-trip (persistence) +# 3. Adding a DSL generated column (stored, auto-filled on append) + +import shutil +import tempfile +from dataclasses import dataclass + +import numpy as np + +import blosc2 + +# --------------------------------------------------------------------------- +# Schema +# --------------------------------------------------------------------------- + + +@dataclass +class Trade: + ticker: str = blosc2.field(blosc2.string(max_length=6)) + price: float = blosc2.field(blosc2.float64(ge=0)) + shares: int = blosc2.field(blosc2.int64(ge=0)) + fee_pct: float = blosc2.field(blosc2.float64(ge=0, le=5), default=0.1) + + +TRADES = [ + ("AAPL", 189.50, 100, 0.10), + ("GOOG", 175.20, 50, 0.10), + ("MSFT", 415.00, 200, 0.08), + ("AMZN", 183.75, 75, 0.10), + ("NVDA", 875.40, 30, 0.12), +] + +t = blosc2.CTable(Trade, new_data=TRADES) + +# --------------------------------------------------------------------------- +# 1. DSL kernel as a computed column (virtual, unstored) +# --------------------------------------------------------------------------- + + +@blosc2.dsl_kernel +def net_value(price, shares, fee_pct): + return price * shares * (1.0 - fee_pct / 100.0) + + +# Pass inputs= to bind kernel parameters to stored columns positionally. +# dtype is inferred from the input column dtypes when omitted (float64 here). +t.add_computed_column("net_value", net_value, inputs=["price", "shares", "fee_pct"]) +# This works too; use whatever you prefer +# t.add_computed_column("net_value", blosc2.lazyudf(net_value, (t.price, t.shares, t.fee_pct))) + +print("Computed net_value (virtual, no storage added):") +for ticker, nv in zip(t.ticker[:], t["net_value"][:], strict=True): + print(f" {ticker:6s} {nv:>10.2f}") + + +# Kernels can use if/else and where() — not possible with a plain expression: +@blosc2.dsl_kernel +def tier(price, shares): + mv = price * shares + return where(mv > 50000, 2.0, where(mv > 10000, 1.0, 0.0)) # noqa: F821 + + +t.add_computed_column("tier", tier, inputs=["price", "shares"], dtype=np.float64) +print("\nTier (0=small, 1=mid, 2=large):") +for ticker, tv in zip(t.ticker[:], t.tier[:], strict=True): + print(f" {ticker:6s} {tv:.0f}") + +# --------------------------------------------------------------------------- +# 2. Persistence round-trip +# --------------------------------------------------------------------------- + +tmpdir = tempfile.mkdtemp() +path = f"{tmpdir}/trades.b2d" +try: + t.save(path) + t2 = blosc2.open(path) + + print("\nAfter save/open — net_value still available:") + print(" ", t2["net_value"][:]) + + print("\nAfter save/open — tier still available:") + print(" ", t2["tier"][:]) +finally: + shutil.rmtree(tmpdir) + +# --------------------------------------------------------------------------- +# 3. DSL generated column (stored, auto-filled on append) +# --------------------------------------------------------------------------- + +t3 = blosc2.CTable(Trade, new_data=TRADES) +t3.add_generated_column("net_value", values=net_value, inputs=["price", "shares", "fee_pct"]) + +print("\nGenerated net_value (stored):") +print(" ", t3["net_value"][:]) + +# New rows are auto-filled — the kernel runs for each appended row. +t3.append({"ticker": "TSLA", "price": 248.0, "shares": 120, "fee_pct": 0.10}) +print("\nAfter append — auto-filled row added:") +print(" ", t3["net_value"][:]) + +# --------------------------------------------------------------------------- +# 4. Append rows after reopening a persisted table +# --------------------------------------------------------------------------- + +tmpdir2 = tempfile.mkdtemp() +path2 = f"{tmpdir2}/trades_gen.b2d" +try: + t4 = blosc2.CTable(Trade, new_data=TRADES) + t4.add_generated_column( + "net_value", + values=blosc2.lazyudf(net_value, (t4.price, t4.shares, t4.fee_pct)), + ) + t4.save(path2) + + # Reopen in append mode — the DSL kernel is reconstructed from stored source. + t5 = blosc2.open(path2, mode="a") + print("\nReopened table net_value:") + print(" ", t5["net_value"][:]) + + t5.append({"ticker": "TSLA", "price": 248.0, "shares": 120, "fee_pct": 0.10}) + t5.append({"ticker": "NFLX", "price": 630.8, "shares": 40, "fee_pct": 0.10}) + print("\nAfter appending two rows — net_value auto-filled from persisted kernel:") + print(" ", t5["net_value"][:]) +finally: + shutil.rmtree(tmpdir2) diff --git a/src/blosc2/ctable.py b/src/blosc2/ctable.py index 9b364eb27..57aaa60ab 100644 --- a/src/blosc2/ctable.py +++ b/src/blosc2/ctable.py @@ -8640,9 +8640,10 @@ def add_generated_column( # noqa: C901 Supported signatures are:: add_generated_column(name, *, values="price * qty", dtype=..., create_index=False) - add_generated_column(name, *, values=lazy_expr, dtype=..., create_index=False) - add_generated_column(name, *, values=lambda cols: cols["price"] * 1.21, dtype=...) + add_generated_column(name, *, values=lazy_expr, dtype=...) add_generated_column(name, *, values=dsl_kernel, inputs=["price", "qty"], dtype=...) + add_generated_column(name, *, values=blosc2.lazyudf(dsl_kernel, (t.price, t.qty))) + add_generated_column(name, *, values=lambda cols: cols["price"] * 1.21, dtype=...) add_generated_column(name, *, values=t.embedding.row_transformer.norm(axis=0), dtype=...) add_generated_column(name, *, values=t.image.row_transformer.mean(axis=(0, 1)), dtype=blosc2.ndarray((3,), dtype=...)) @@ -8661,15 +8662,20 @@ def add_generated_column( # noqa: C901 per row. * :class:`blosc2.LazyExpr`: scalar lazy expression over stored columns of this table. It must produce a 1-D scalar stream. + * :func:`blosc2.dsl_kernel`-decorated kernel passed directly with + ``inputs=[...]`` — one stored scalar column name per kernel + parameter, bound positionally. Produces one scalar per row. + The kernel source is persisted and recompiled on open; appended + rows are auto-filled and :meth:`refresh_generated_column` + recomputes after in-place edits. + * :class:`blosc2.LazyUDF` built from a :func:`blosc2.dsl_kernel` via + :func:`blosc2.lazyudf` — column bindings are inferred by identity + from the operands, so ``inputs=`` is not needed. Accepts + :class:`Column` accessors (``t.col1``) or raw NDArrays as + operands. Same persistence and auto-fill behaviour as above. * callable: called as ``values(self._cols)`` and must return a - :class:`blosc2.LazyExpr` (or a :class:`blosc2.LazyUDF`) over stored - columns of this table. - * :func:`blosc2.dsl_kernel`-decorated kernel: pass it directly with - ``inputs=[...]`` naming one stored scalar column per kernel - parameter. Produces one scalar per row. The kernel source is - persisted and recompiled on open; appended rows are auto-filled - and :meth:`refresh_generated_column` recomputes after in-place - edits. + :class:`blosc2.LazyExpr` or a :class:`blosc2.LazyUDF` backed by a + :func:`blosc2.dsl_kernel`. * :class:`RowTransformer`: row-wise projection/reduction bound to a fixed-shape ndarray column, e.g. ``t.embedding.row_transformer.norm(axis=0)`` or @@ -8694,10 +8700,10 @@ def add_generated_column( # noqa: C901 generated columns raise :class:`ValueError` when indexing is requested. inputs: - Only used when *values* is a :func:`blosc2.dsl_kernel`-decorated - kernel passed directly: a list of stored scalar column names, one per - kernel parameter, bound positionally. Ignored for other *values* - forms. + Only used when *values* is a bare :func:`blosc2.dsl_kernel`: a list + of stored scalar column names, one per kernel parameter, bound + positionally. Not needed when passing a :class:`blosc2.LazyUDF` or + a callable — bindings are inferred from the operands in those cases. Examples -------- @@ -8896,10 +8902,11 @@ def add_computed_column( Supported signatures are:: - add_computed_column(name, "price * qty", dtype=None) - add_computed_column(name, lazy_expr, dtype=None) - add_computed_column(name, lambda cols: cols["price"] * cols["qty"], dtype=None) - add_computed_column(name, dsl_kernel, inputs=["price", "qty"], dtype=None) + add_computed_column(name, "price * qty") + add_computed_column(name, lazy_expr) + add_computed_column(name, dsl_kernel, inputs=["price", "qty"]) + add_computed_column(name, blosc2.lazyudf(dsl_kernel, (t.price, t.qty))) + add_computed_column(name, lambda cols: cols["price"] * cols["qty"]) Parameters ---------- @@ -8913,16 +8920,23 @@ def add_computed_column( ``"price * qty"``. * :class:`blosc2.LazyExpr`: lazy expression over stored columns of this table. + * :func:`blosc2.dsl_kernel`-decorated kernel passed directly with + ``inputs=[...]`` — one stored scalar column name per kernel + parameter, bound positionally. The kernel may use loops, + ``if``/``else`` and ``where(...)``. Its source is persisted and + recompiled on open; the column stays virtual/unstored. + * :class:`blosc2.LazyUDF` built from a :func:`blosc2.dsl_kernel` via + :func:`blosc2.lazyudf` — column bindings are inferred by identity + from the operands, so ``inputs=`` is not needed. Accepted forms + include ``blosc2.lazyudf(kernel, (t.col1, t.col2))`` (using + :class:`Column` accessors) or the raw NDArray equivalents. * callable: called as ``expr(self._cols)`` and must return a - :class:`blosc2.LazyExpr` (or a :class:`blosc2.LazyUDF`) over stored - columns of this table. - * :func:`blosc2.dsl_kernel`-decorated kernel: pass it directly with - ``inputs=[...]`` naming one stored scalar column per kernel - parameter. The kernel may use loops, ``if``/``else`` and - ``where(...)``. DSL columns are persisted (their source is stored - and recompiled on open) and may be referenced inside - :meth:`where` predicates. Their values are recomputed on each - access (the column stays virtual/unstored). + :class:`blosc2.LazyExpr` or a :class:`blosc2.LazyUDF` backed by a + :func:`blosc2.dsl_kernel`. + + DSL columns (last three forms) are persisted — their source is stored + and recompiled on open — and may be referenced inside :meth:`where` + predicates. Expressions must depend only on stored columns of this table; computed columns cannot depend on other computed columns in this @@ -8933,17 +8947,18 @@ def add_computed_column( dtype: Optional dtype override for the computed values. For expression forms it is inferred from the resulting :class:`blosc2.LazyExpr` - when omitted. For a DSL kernel passed directly, an omitted dtype is - inferred by NumPy type promotion of the input column dtypes (correct - for elementwise arithmetic kernels); pass *dtype* explicitly for - kernels that change the type (comparisons/``where``/casts) or when - the kernel has no column inputs. This changes the dtype reported by - the CTable column wrapper; it does not create physical storage. + when omitted. For DSL forms, an omitted dtype is inferred by NumPy + type promotion of the input column dtypes (correct for elementwise + arithmetic kernels); pass *dtype* explicitly for kernels that change + the type (comparisons/``where``/casts) or when the kernel has no + column inputs. This changes the dtype reported by the CTable column + wrapper; it does not create physical storage. inputs: - Only used when *expr* is a :func:`blosc2.dsl_kernel`-decorated kernel - passed directly: a list of stored scalar column names, one per kernel - parameter, bound positionally (kernel parameter ``i`` ← ``inputs[i]``). - Ignored for the other *expr* forms. + Only used when *expr* is a bare :func:`blosc2.dsl_kernel`: a list of + stored scalar column names, one per kernel parameter, bound + positionally (kernel parameter ``i`` ← ``inputs[i]``). Not needed + when passing a :class:`blosc2.LazyUDF` or a callable — bindings are + inferred from the operands in those cases. Examples -------- diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 538cef4b7..de185b7dc 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -682,6 +682,9 @@ def convert_inputs(inputs): return [] inputs_ = [] for obj in inputs: + # CTable Column — unwrap to the backing NDArray so shape and identity match. + if hasattr(obj, "_raw_col"): + obj = obj._raw_col if not isinstance(obj, np.ndarray | blosc2.Operand) and not np.isscalar(obj): try: obj = blosc2.SimpleProxy(obj) @@ -4755,7 +4758,7 @@ def _numpy_eval_expr(expression, operands, prefer_blosc=False): def lazyudf( func: Callable[[tuple, np.ndarray, tuple[int]], None], inputs: Sequence[Any] | None, - dtype: np.dtype, + dtype: np.dtype | None = None, shape: tuple | list | None = None, chunked_eval: bool = True, jit: bool | None = None, @@ -4786,8 +4789,11 @@ def myudf(inputs_tuple, output, offset): any other object is supported too, and it will be passed as-is to the user-defined function. If not needed, this can be empty, but `shape` must be provided. - dtype: np.dtype - The resulting ndarray dtype in NumPy format. + dtype: np.dtype, optional + The resulting ndarray dtype in NumPy format. When omitted and *func* + is a :class:`DSLKernel`, the dtype is inferred by NumPy type promotion + of the input dtypes. For type-changing kernels (comparisons, casts) + pass *dtype* explicitly. Required for plain Python UDFs. shape: tuple, optional The shape of the resulting array. If None, the shape will be guessed from inputs. chunked_eval: bool, optional @@ -4845,6 +4851,16 @@ def inplace_udf(inputs_tuple, output, offset): if isinstance(func, DSLKernel) and func.dsl_error is not None: udf_name = getattr(func.func, "__name__", func.__name__) raise DSLSyntaxError(f"Invalid DSL kernel '{udf_name}'.\n{func.dsl_error}") from None + if dtype is None: + if isinstance(func, DSLKernel): + dep_dtypes = [arr.dtype for arr in (inputs or []) if hasattr(arr, "dtype")] + if not dep_dtypes: + raise TypeError( + "Cannot infer dtype for DSL kernel with no array inputs; pass dtype= explicitly." + ) + dtype = np.result_type(*dep_dtypes) + else: + raise TypeError("dtype is required for non-DSL UDFs.") return LazyUDF(func, inputs, dtype, shape, chunked_eval, jit, jit_backend, **kwargs) From eff437fc56aa5d70a0904e2ca597a02a9478f1cc Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Wed, 3 Jun 2026 13:31:18 +0200 Subject: [PATCH 03/12] Practical info on when should generated cols be used instead of cheaper computed cols --- doc/reference/ctable.rst | 38 ++++++++++++++++++++++++++++---------- src/blosc2/ctable.py | 15 ++++++++------- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/doc/reference/ctable.rst b/doc/reference/ctable.rst index 3dbbcda3c..959fb3964 100644 --- a/doc/reference/ctable.rst +++ b/doc/reference/ctable.rst @@ -315,16 +315,34 @@ Mutations --------- In addition to physical schema changes such as :meth:`CTable.add_column`, -CTables can host **computed columns** backed by a lazy expression over stored -columns. Computed columns are read-only, use no extra storage, participate in -display, filtering, sorting, and aggregates, and are persisted across -:meth:`CTable.save`, :meth:`CTable.load`, and :meth:`CTable.open`. - -When a computed result should become a normal stored column, use -:meth:`CTable.materialize_computed_column`. The materialized column is a stored -snapshot that can be indexed like any other stored column. New rows inserted -later via :meth:`CTable.append` or :meth:`CTable.extend` auto-fill omitted -materialized-column values from the recorded expression metadata. +CTables support two kinds of derived columns: + +**Computed columns** (:meth:`CTable.add_computed_column`) are purely virtual — +they use no extra storage, are evaluated on demand, and are read-only. They +participate in display, filtering, sorting, and aggregates, and are persisted +across save/open round-trips. Because they have no physical storage, they +**cannot be indexed**. + +**Generated columns** (:meth:`CTable.add_generated_column`) are physically +stored. Their values are computed once and written to disk; new rows appended +later are auto-filled automatically. Because the data is real, generated +columns **can be indexed** with :meth:`CTable.create_index`, which makes +``where()`` queries on them fast. + +**Practical rule**: use a computed column when you just need a derived value +available for display, export, or occasional reads. Use a generated column +(optionally with ``create_index=True``) when you need to filter or sort by a +derived value frequently — the index pays for itself after the first few +queries. + +Both forms accept plain expression strings, :func:`blosc2.dsl_kernel`-decorated +functions, and :class:`blosc2.LazyUDF` objects. DSL kernels support full Python +control flow (``if``/``else``, ``where()``, loops) and have their source +persisted and recompiled on open. + +When a computed result should become a stored snapshot rather than a live +virtual column, use :meth:`CTable.materialize_computed_column` to convert it +in place. .. autosummary:: diff --git a/src/blosc2/ctable.py b/src/blosc2/ctable.py index 57aaa60ab..17b2bec81 100644 --- a/src/blosc2/ctable.py +++ b/src/blosc2/ctable.py @@ -8998,13 +8998,9 @@ def ratio(num, den): expensive = t.where(t.total > 100) total_revenue = t.total.sum() - Computed columns are virtual and read-only. Materialize one when a - stored snapshot or an indexable column is needed:: - - t.materialize_computed_column("total", new_name="total_stored") - t.create_index("total_stored") - - For maintained stored results, prefer generated columns:: + Computed columns are virtual and read-only and cannot be indexed. If + you need to filter or sort by this value frequently, use a generated + column instead — it is physically stored and can be indexed:: t.add_generated_column( "total_stored", @@ -9013,6 +9009,11 @@ def ratio(num, den): create_index=True, ) + Or convert an existing computed column to a stored snapshot:: + + t.materialize_computed_column("total", new_name="total_stored") + t.create_index("total_stored") + Raises ------ ValueError From 5e0ac46fb4c5a916203cb458d1d748cc5f578eef Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Wed, 3 Jun 2026 13:54:13 +0200 Subject: [PATCH 04/12] Persist jit_backend= in UDF kernel cols --- doc/reference/ctable.rst | 12 +++++++ examples/ctable/udf-computed-col.py | 24 +++++++++++++ src/blosc2/ctable.py | 53 ++++++++++++++++++++++------- 3 files changed, 76 insertions(+), 13 deletions(-) diff --git a/doc/reference/ctable.rst b/doc/reference/ctable.rst index 959fb3964..789ce0b72 100644 --- a/doc/reference/ctable.rst +++ b/doc/reference/ctable.rst @@ -340,6 +340,18 @@ functions, and :class:`blosc2.LazyUDF` objects. DSL kernels support full Python control flow (``if``/``else``, ``where()``, loops) and have their source persisted and recompiled on open. +When passing a :class:`blosc2.LazyUDF` built with an explicit ``jit_backend=`` +(e.g. ``jit_backend="cc"`` to use the system C compiler instead of the default +TCC), that choice is persisted in the column metadata and automatically restored +on :func:`blosc2.open`. This matters for kernels where one backend produces +measurably faster code — the optimised backend stays active for the lifetime of +the table without any extra configuration:: + + t.add_generated_column( + "score", + values=blosc2.lazyudf(my_kernel, (t.col_a, t.col_b), jit_backend="cc"), + ) + When a computed result should become a stored snapshot rather than a live virtual column, use :meth:`CTable.materialize_computed_column` to convert it in place. diff --git a/examples/ctable/udf-computed-col.py b/examples/ctable/udf-computed-col.py index c2536893c..f457bc906 100644 --- a/examples/ctable/udf-computed-col.py +++ b/examples/ctable/udf-computed-col.py @@ -140,3 +140,27 @@ def tier(price, shares): print(" ", t5["net_value"][:]) finally: shutil.rmtree(tmpdir2) + +# --------------------------------------------------------------------------- +# 5. Persisting jit_backend — use cc compiler for better code on this kernel +# --------------------------------------------------------------------------- + +tmpdir3 = tempfile.mkdtemp() +path3 = f"{tmpdir3}/trades_cc.b2d" +try: + t6 = blosc2.CTable(Trade, new_data=TRADES) + # jit_backend="cc" uses the system C compiler (gcc/clang) instead of TCC. + # This choice is persisted in the column metadata and restored on open — + # no need to set BLOSC_ME_JIT=cc or repeat jit_backend= after reloading. + t6.add_generated_column( + "net_value", + values=blosc2.lazyudf(net_value, (t6.price, t6.shares, t6.fee_pct), jit_backend="cc"), + ) + t6.save(path3) + + t7 = blosc2.open(path3, mode="a") + t7.append({"ticker": "TSLA", "price": 248.0, "shares": 120, "fee_pct": 0.10}) + print("\nAfter reload (jit_backend=cc persisted) — auto-filled row:") + print(" ", t7["net_value"][:]) +finally: + shutil.rmtree(tmpdir3) diff --git a/src/blosc2/ctable.py b/src/blosc2/ctable.py index 17b2bec81..c2c2596a5 100644 --- a/src/blosc2/ctable.py +++ b/src/blosc2/ctable.py @@ -7823,15 +7823,16 @@ def _schema_dict_with_computed(self) -> dict: computed = [] for name, cc in self._computed_cols.items(): if cc.get("kind") == "dsl": - computed.append( - { - "name": name, - "kind": "dsl", - "dsl_source": cc["dsl_source"], - "col_deps": cc["col_deps"], - "dtype": str(cc["dtype"]), - } - ) + entry = { + "name": name, + "kind": "dsl", + "dsl_source": cc["dsl_source"], + "col_deps": cc["col_deps"], + "dtype": str(cc["dtype"]), + } + if cc.get("jit_backend") is not None: + entry["jit_backend"] = cc["jit_backend"] + computed.append(entry) else: computed.append( { @@ -7859,6 +7860,8 @@ def _schema_dict_with_computed(self) -> dict: entry["transformer"] = meta["transformer"] if meta.get("dsl_source") is not None: entry["dsl_source"] = meta["dsl_source"] + if meta.get("jit_backend") is not None: + entry["jit_backend"] = meta["jit_backend"] materialized.append(entry) d["materialized_columns"] = materialized return d @@ -8057,6 +8060,7 @@ def _load_computed_cols_from_schema(self, schema_dict: dict) -> None: "kernel": kernel_from_source(dsl_source), "col_deps": col_deps, "dtype": dtype, + "jit_backend": cc_meta.get("jit_backend"), } else: expression = cc_meta["expression"] @@ -8087,6 +8091,8 @@ def _load_materialized_cols_from_schema(self, schema_dict: dict) -> None: loaded["transformer"] = dict(meta["transformer"]) if meta.get("dsl_source") is not None: loaded["dsl_source"] = meta["dsl_source"] + if meta.get("jit_backend") is not None: + loaded["jit_backend"] = meta["jit_backend"] self._materialized_cols[meta["name"]] = loaded def _require_computed_column(self, name: str) -> dict: @@ -8348,6 +8354,7 @@ def materialize_computed_column( "dtype": target_dtype, "transformer_kind": "dsl", "stale": False, + "jit_backend": cc.get("jit_backend"), } else: self._materialized_cols[target_name] = { @@ -8479,7 +8486,12 @@ def _normalize_transformer(self, expr, inputs=None) -> dict: kernel = obj.func if kernel.dsl_error is not None: raise blosc2.DSLSyntaxError(f"Invalid DSL kernel: {kernel.dsl_error}") - return {"kind": "dsl", "kernel": kernel, "col_deps": self._dsl_deps_from_lazyudf(obj)} + return { + "kind": "dsl", + "kernel": kernel, + "col_deps": self._dsl_deps_from_lazyudf(obj), + "jit_backend": obj.kwargs.get("jit_backend"), + } lazy, col_deps = self._normalize_expression_transformer(obj) return {"kind": "expression", "lazy": lazy, "col_deps": col_deps} @@ -8517,7 +8529,12 @@ def _build_computed_lazy(self, cc: dict): """ if cc.get("kind") == "dsl": operands = tuple(self._cols[d] for d in cc["col_deps"]) - return blosc2.lazyudf(cc["kernel"], operands, dtype=cc["dtype"]).compute() + return blosc2.lazyudf( + cc["kernel"], + operands, + dtype=cc["dtype"], + jit_backend=cc.get("jit_backend"), + ).compute() return cc["lazy"] def _evaluate_expression_materialized_batch( @@ -8553,7 +8570,12 @@ def _evaluate_dsl_materialized_batch(self, meta: dict, raw_columns: Mapping[str, if pad: arrays = [np.concatenate([arr, arr]) for arr in arrays] operands = tuple(blosc2.asarray(arr) for arr in arrays) - result = blosc2.lazyudf(kernel, operands, dtype=out_dtype).compute()[:] + result = blosc2.lazyudf( + kernel, + operands, + dtype=out_dtype, + jit_backend=meta.get("jit_backend"), + ).compute()[:] result = np.asarray(result, dtype=out_dtype) return result[:1] if pad else result @@ -8818,8 +8840,11 @@ def add_generated_column( # noqa: C901 if dtype is not None else self._dsl_result_dtype(kernel, col_deps, None) ) + jit_backend = desc.get("jit_backend") operands = tuple(self._cols[d] for d in col_deps) - generated_values = np.asarray(blosc2.lazyudf(kernel, operands, dtype=compute_dtype).compute()[:]) + generated_values = np.asarray( + blosc2.lazyudf(kernel, operands, dtype=compute_dtype, jit_backend=jit_backend).compute()[:] + ) if generated_values.ndim != 1: raise TypeError("DSL generated columns must produce a 1-D scalar result.") generated_values = ( @@ -8836,6 +8861,7 @@ def add_generated_column( # noqa: C901 "dtype": np.dtype(spec.dtype), "transformer_kind": "dsl", "stale": False, + "jit_backend": jit_backend, } else: lazy, col_deps = desc["lazy"], desc["col_deps"] @@ -9046,6 +9072,7 @@ def ratio(num, den): "kernel": kernel, "col_deps": col_deps, "dtype": self._dsl_result_dtype(kernel, col_deps, dtype), + "jit_backend": desc.get("jit_backend"), } else: lazy = desc["lazy"] From 41e095fc74d9461b84e3401d9b97c7becbd43c89 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Wed, 3 Jun 2026 14:21:51 +0200 Subject: [PATCH 05/12] Make CTable.where to accept UDF kernels too --- src/blosc2/ctable.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/blosc2/ctable.py b/src/blosc2/ctable.py index c2c2596a5..b27b17c08 100644 --- a/src/blosc2/ctable.py +++ b/src/blosc2/ctable.py @@ -10624,7 +10624,7 @@ def _guard_varlen_scalar_expression(self, expr: str) -> None: def where( # noqa: C901 self, - expr_result: str | np.ndarray | blosc2.NDArray | blosc2.LazyExpr | Column, + expr_result: str | np.ndarray | blosc2.NDArray | blosc2.LazyExpr | blosc2.LazyUDF | Column, *, columns: list[str] | tuple[str, ...] | None = None, ) -> CTable: @@ -10636,9 +10636,10 @@ def where( # noqa: C901 The predicate can be supplied as a boolean :class:`blosc2.LazyExpr`, a boolean :class:`blosc2.NDArray`, a boolean NumPy array, a boolean - ``Column``, or a string expression evaluated against this table's - columns. String expressions can reference stored and computed columns - directly by name. + ``Column``, a :class:`blosc2.LazyUDF` (including those backed by a + :func:`blosc2.dsl_kernel`), or a string expression evaluated against + this table's columns. String expressions can reference stored and + computed columns directly by name. The returned object is a :class:`CTable` view sharing the original column data. The row-selection mask is evaluated immediately and @@ -10722,6 +10723,11 @@ def where( # noqa: C901 expr_result = ( expr_result._raw_col == 1 if expr_result._is_nullable_bool else expr_result._raw_col ) + if isinstance(expr_result, blosc2.LazyUDF): + # DSL miniexpr only supports full-array getitem, so we cannot stream + # a LazyUDF chunk-by-chunk the way LazyExpr does. Materialise the + # full boolean array upfront and let the NDArray path handle it. + expr_result = expr_result.compute() if not ( isinstance(expr_result, (blosc2.NDArray, blosc2.LazyExpr)) From 613415ff5440343b4abd12002c8e0ac4a056207c Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Wed, 3 Jun 2026 14:28:17 +0200 Subject: [PATCH 06/12] New tests for CTable.where accepting UDF kernels --- tests/ctable/test_ctable_computed_cols.py | 252 ++++++++++++++++++++++ 1 file changed, 252 insertions(+) diff --git a/tests/ctable/test_ctable_computed_cols.py b/tests/ctable/test_ctable_computed_cols.py index db2e383de..256be754b 100644 --- a/tests/ctable/test_ctable_computed_cols.py +++ b/tests/ctable/test_ctable_computed_cols.py @@ -9,6 +9,7 @@ from __future__ import annotations +import sys from dataclasses import dataclass import numpy as np @@ -795,3 +796,254 @@ def test_computed_column_compact(): # After compact, live rows are price=1,3,4,5 and qty=1,3,4,5 expected = np.array([1.0, 9.0, 16.0, 25.0]) np.testing.assert_allclose(arr, expected) + + +# --------------------------------------------------------------------------- +# 26. DSL kernels as computed columns +# --------------------------------------------------------------------------- + +# Shared DSL kernels used across the DSL tests + + +@blosc2.dsl_kernel +def total(price, qty): + return price * qty + + +@blosc2.dsl_kernel +def total_with_tax(price, qty, tax): + return price * qty * (1.0 + tax) + + +@blosc2.dsl_kernel +def is_large(price, qty): + return price * qty > 10 + + +def test_dsl_computed_column_inputs_form(): + """Bare DSLKernel + inputs= binds parameters positionally.""" + t = _make_invoice_table(5) + t.add_computed_column("total", total, inputs=["price", "qty"]) + np.testing.assert_allclose(t["total"][:], [1.0, 4.0, 9.0, 16.0, 25.0]) + + +def test_dsl_computed_column_lazyudf_form(): + """LazyUDF form infers col_deps by identity — no inputs= needed.""" + t = _make_invoice_table(5) + t.add_computed_column("total", blosc2.lazyudf(total, (t.price, t.qty))) + np.testing.assert_allclose(t["total"][:], [1.0, 4.0, 9.0, 16.0, 25.0]) + + +def test_dsl_computed_column_dtype_inferred(): + """dtype is inferred from input column dtypes when omitted.""" + t = _make_invoice_table(3) + t.add_computed_column("total", total, inputs=["price", "qty"]) + assert t._computed_cols["total"]["dtype"] == np.dtype(np.float64) + + +def test_dsl_computed_column_dtype_explicit(): + """Explicit dtype overrides inference — needed for type-changing kernels.""" + t = _make_invoice_table(5) # totals: 1, 4, 9, 16, 25 — last two exceed 10 + t.add_computed_column("is_large", is_large, inputs=["price", "qty"], dtype=bool) + assert t._computed_cols["is_large"]["dtype"] == np.dtype(bool) + result = t["is_large"][:] + np.testing.assert_array_equal(result, [False, False, False, True, True]) + + +def test_dsl_computed_column_where_via_string(): + """DSL computed column is reachable from a where() string predicate.""" + t = _make_invoice_table(5) + t.add_computed_column("total", total, inputs=["price", "qty"]) + view = t.where("total > 10") + np.testing.assert_allclose(view["total"][:], [16.0, 25.0]) + + +def test_dsl_computed_column_where_via_column(): + """DSL computed column participates in column-expression where().""" + t = _make_invoice_table(5) + t.add_computed_column("total", total, inputs=["price", "qty"]) + view = t.where(t.total > 10) + assert len(view) == 2 + + +def test_dsl_computed_column_no_storage(): + """DSL computed column must not add physical storage.""" + t = _make_invoice_table(3) + nb_before = t.nbytes + t.add_computed_column("total", total, inputs=["price", "qty"]) + assert t.nbytes == nb_before + assert "total" not in t._cols + + +def test_dsl_computed_column_persistence(tmp_path): + """DSL kernel source is persisted and the column is available after open.""" + t = _make_invoice_table(5) + t.add_computed_column("total", total, inputs=["price", "qty"]) + path = str(tmp_path / "tbl") + t.save(path) + + t2 = blosc2.open(path) + assert "total" in t2._computed_cols + assert t2._computed_cols["total"]["kind"] == "dsl" + np.testing.assert_allclose(t2["total"][:], [1.0, 4.0, 9.0, 16.0, 25.0]) + + +def test_dsl_computed_column_inputs_mismatch_raises(): + """inputs= count not matching kernel parameter count raises ValueError.""" + t = _make_invoice_table(3) + with pytest.raises(ValueError, match="inputs"): + t.add_computed_column("total", total, inputs=["price"]) # total needs 2 + + +def test_dsl_computed_column_inputs_required_raises(): + """Bare DSLKernel without inputs= raises TypeError.""" + t = _make_invoice_table(3) + with pytest.raises(TypeError, match="inputs"): + t.add_computed_column("total", total) + + +# --------------------------------------------------------------------------- +# 27. LazyUDF directly in where() +# --------------------------------------------------------------------------- + + +def test_lazyudf_in_where_direct(): + """LazyUDF passed directly to where() — no .compute() needed.""" + t = _make_invoice_table(5) + cond = blosc2.lazyudf(is_large, (t.price, t.qty), dtype=bool) + view = t.where(cond) + assert len(view) == 2 + np.testing.assert_allclose(view["price"][:], [4.0, 5.0]) + + +def test_lazyudf_in_where_column_inputs(): + """Column accessors are accepted directly as lazyudf inputs in where().""" + t = _make_invoice_table(5) + cond = blosc2.lazyudf(is_large, (t.price, t.qty), dtype=bool) + view = t.where(cond) + assert len(view) == 2 + + +# --------------------------------------------------------------------------- +# 28. lazyudf dtype inference +# --------------------------------------------------------------------------- + + +def test_lazyudf_dtype_inferred_from_inputs(): + """dtype is inferred from NDArray input dtypes for DSL kernels.""" + a = blosc2.arange(5, dtype=np.float64) + b = blosc2.arange(1, 6, dtype=np.float64) + result = blosc2.lazyudf(total, (a, b)).compute() + assert result.dtype == np.dtype(np.float64) + np.testing.assert_allclose(result[:5], [0.0, 2.0, 6.0, 12.0, 20.0]) + + +def test_lazyudf_dtype_required_for_plain_udf(): + """Plain (non-DSL) UDF without dtype= raises TypeError.""" + a = blosc2.arange(5, dtype=np.float64) + with pytest.raises(TypeError, match="dtype is required"): + blosc2.lazyudf(lambda ins, out, off: None, [a]) + + +def test_lazyudf_column_inputs_accepted(): + """Column instances are unwrapped to their backing NDArray automatically.""" + t = _make_invoice_table(5) + result = blosc2.lazyudf(total, (t.price, t.qty)).compute() + np.testing.assert_allclose(result[:5], [1.0, 4.0, 9.0, 16.0, 25.0]) + + +# --------------------------------------------------------------------------- +# 29. DSL kernels as generated columns +# --------------------------------------------------------------------------- + + +def test_dsl_generated_column_inputs_form(): + """DSL kernel + inputs= creates a stored generated column.""" + t = _make_invoice_table(5) + t.add_generated_column("total", values=total, inputs=["price", "qty"]) + assert "total" in t._cols + np.testing.assert_allclose(t["total"][:], [1.0, 4.0, 9.0, 16.0, 25.0]) + + +def test_dsl_generated_column_lazyudf_form(): + """LazyUDF form creates a stored generated column without inputs=.""" + t = _make_invoice_table(5) + t.add_generated_column("total", values=blosc2.lazyudf(total, (t.price, t.qty))) + assert "total" in t._cols + np.testing.assert_allclose(t["total"][:], [1.0, 4.0, 9.0, 16.0, 25.0]) + + +def test_dsl_generated_column_autofill_append(): + """Appended rows are auto-filled using the persisted DSL kernel.""" + t = _make_invoice_table(3) + t.add_generated_column("total", values=total, inputs=["price", "qty"]) + t.append((4.0, 4, 0.1)) + np.testing.assert_allclose(t["total"][:], [1.0, 4.0, 9.0, 16.0]) + + +def test_dsl_generated_column_persistence_and_append(tmp_path): + """DSL generated column survives save/open and auto-fills after reload.""" + t = _make_invoice_table(3) + t.add_generated_column("total", values=total, inputs=["price", "qty"]) + path = str(tmp_path / "tbl") + t.save(path) + + t2 = blosc2.open(path, mode="a") + assert "total" in t2._materialized_cols + assert t2._materialized_cols["total"]["transformer_kind"] == "dsl" + t2.append((4.0, 4, 0.1)) + np.testing.assert_allclose(t2["total"][:], [1.0, 4.0, 9.0, 16.0]) + + +def test_dsl_generated_column_refresh(tmp_path): + """refresh_generated_column recomputes from the DSL kernel.""" + t = _make_invoice_table(3) + t.add_generated_column("total", values=total, inputs=["price", "qty"]) + # Force-write wrong values directly + t._cols["total"][:3] = np.array([0.0, 0.0, 0.0]) + t.refresh_generated_column("total") + np.testing.assert_allclose(t["total"][:], [1.0, 4.0, 9.0]) + + +# --------------------------------------------------------------------------- +# 30. jit_backend persistence +# --------------------------------------------------------------------------- + + +@pytest.mark.skipif(sys.platform == "win32", reason="cc backend requires a system C compiler") +def test_dsl_jit_backend_persisted_in_schema(): + """jit_backend set via lazyudf() is stored in the column schema.""" + t = _make_invoice_table(3) + t.add_generated_column( + "total", + values=blosc2.lazyudf(total, (t.price, t.qty), jit_backend="cc"), + ) + schema = t._schema_dict_with_computed() + mat = {m["name"]: m for m in schema["materialized_columns"]} + assert mat["total"]["jit_backend"] == "cc" + + +@pytest.mark.skipif(sys.platform == "win32", reason="cc backend requires a system C compiler") +def test_dsl_jit_backend_restored_after_open(tmp_path): + """jit_backend is restored from schema on open and used for auto-fill.""" + t = _make_invoice_table(3) + t.add_generated_column( + "total", + values=blosc2.lazyudf(total, (t.price, t.qty), jit_backend="cc"), + ) + path = str(tmp_path / "tbl") + t.save(path) + + t2 = blosc2.open(path, mode="a") + assert t2._materialized_cols["total"].get("jit_backend") == "cc" + t2.append((4.0, 4, 0.1)) + np.testing.assert_allclose(t2["total"][:], [1.0, 4.0, 9.0, 16.0]) + + +def test_dsl_no_jit_backend_not_in_schema(): + """jit_backend is absent from schema when not specified (keeps files clean).""" + t = _make_invoice_table(3) + t.add_generated_column("total", values=total, inputs=["price", "qty"]) + schema = t._schema_dict_with_computed() + mat = {m["name"]: m for m in schema["materialized_columns"]} + assert "jit_backend" not in mat["total"] From 6a5ab284a0484afa7d586bf52ec71c1405b78a2d Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Wed, 3 Jun 2026 17:28:56 +0200 Subject: [PATCH 07/12] Add tests for DSL kernels as columns --- tests/ctable/test_ctable_dsl_columns.py | 244 ++++++++++++++++++++++++ 1 file changed, 244 insertions(+) create mode 100644 tests/ctable/test_ctable_dsl_columns.py diff --git a/tests/ctable/test_ctable_dsl_columns.py b/tests/ctable/test_ctable_dsl_columns.py new file mode 100644 index 000000000..6aaf5c2b0 --- /dev/null +++ b/tests/ctable/test_ctable_dsl_columns.py @@ -0,0 +1,244 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +"""Tests for DSL-kernel-backed computed and generated columns on CTable.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass + +import numpy as np +import pytest + +import blosc2 +from blosc2 import CTable + + +@dataclass +class Row: + a: int = blosc2.field(blosc2.int64()) + b: int = blosc2.field(blosc2.int64()) + + +@blosc2.dsl_kernel +def k_add(x, y): + return x + y + + +@blosc2.dsl_kernel +def k_loop(x, y): + # x + 2*y, exercising a while loop + assignments + acc = x + i = 0 + while i < 2: + acc = acc + y + i = i + 1 + return acc + + +@blosc2.dsl_kernel +def k_where(x, y): + # where(x < y, x + 1, x - 1) + return where(x < y, x + 1, x - 1) # noqa: F821 (DSL builtin) + + +def _make_table(n=20, b=10, urlpath=None, mode="w"): + data = [[i, b] for i in range(n)] + if urlpath is not None: + return CTable(Row, data, urlpath=urlpath, mode=mode) + return CTable(Row, data) + + +# --------------------------------------------------------------------------- +# add_computed_column — direct kernel + inputs +# --------------------------------------------------------------------------- + + +def test_dsl_computed_direct_kernel_values(): + t = _make_table() + t.add_computed_column("r", k_loop, inputs=["a", "b"]) + ref = np.arange(20) + 20 + np.testing.assert_array_equal(np.asarray(t["r"][:]), ref) + + +def test_dsl_computed_dtype_inferred_from_inputs(): + t = _make_table() + t.add_computed_column("r", k_add, inputs=["a", "b"]) + assert t._computed_cols["r"]["dtype"] == np.dtype(np.int64) + + +def test_dsl_computed_dtype_explicit(): + t = _make_table() + t.add_computed_column("r", k_add, inputs=["a", "b"], dtype=np.float64) + got = np.asarray(t["r"][:]) + assert got.dtype == np.float64 + np.testing.assert_allclose(got, np.arange(20) + 10) + + +def test_dsl_computed_callable_lazyudf_form(): + t = _make_table() + t.add_computed_column("r", lambda c: blosc2.lazyudf(k_loop, (c["a"], c["b"]), dtype=np.int64)) + np.testing.assert_array_equal(np.asarray(t["r"][:]), np.arange(20) + 20) + + +def test_dsl_computed_partial_slice(): + t = _make_table() + t.add_computed_column("r", k_add, inputs=["a", "b"]) + np.testing.assert_array_equal(np.asarray(t["r"][5:8]), np.array([15, 16, 17])) + + +def test_dsl_computed_where_kernel(): + t = _make_table(n=6, b=3) # a=0..5, b=3 + t.add_computed_column("r", k_where, inputs=["a", "b"]) + a = np.arange(6) + ref = np.where(a < 3, a + 1, a - 1) + np.testing.assert_array_equal(np.asarray(t["r"][:]), ref) + + +def test_dsl_computed_requires_inputs(): + t = _make_table() + with pytest.raises(TypeError, match="inputs="): + t.add_computed_column("r", k_add) + + +def test_dsl_computed_inputs_arity_mismatch(): + t = _make_table() + with pytest.raises(ValueError, match="input"): + t.add_computed_column("r", k_add, inputs=["a"]) + + +def test_dsl_computed_dtype_inferred_on_empty_table(): + t = CTable(Row, []) # no rows + # Inference uses the input column dtypes (always available), so it works even + # with zero rows — no explicit dtype needed for this elementwise int64 kernel. + t.add_computed_column("r", k_add, inputs=["a", "b"]) + assert t._computed_cols["r"]["dtype"] == np.dtype(np.int64) + + +# --------------------------------------------------------------------------- +# where() referencing a DSL computed column +# --------------------------------------------------------------------------- + + +def test_dsl_computed_in_where_predicate(): + t = _make_table() # a=0..19, b=10 ; r = a + 20 + t.add_computed_column("r", k_loop, inputs=["a", "b"]) + a = np.arange(20) + r = a + 20 + sel = t.where("(r > 25) & (a < 18)")[:] + expected = int(((r > 25) & (a < 18)).sum()) + assert len(sel) == expected + + +def test_dsl_computed_in_where_streams_multichunk(): + # Larger than a single chunk to exercise streamed co-evaluation. + t = _make_table(n=5000, b=1) + t.add_computed_column("r", k_add, inputs=["a", "b"]) # r = a + 1 + a = np.arange(5000) + sel = t.where("r > 2500")[:] + assert len(sel) == int((a + 1 > 2500).sum()) + + +# --------------------------------------------------------------------------- +# Persistence round-trip (.b2d) +# --------------------------------------------------------------------------- + + +def test_dsl_computed_roundtrip(tmp_path): + path = str(tmp_path / "dsl.b2d") + t = _make_table(urlpath=path, mode="w") + t.add_computed_column("r", k_loop, inputs=["a", "b"]) + t.close() + + # Stored schema carries kind:dsl + dsl_source and no expression. + meta = blosc2.open(f"{path}/_meta.b2f") + sd = json.loads(meta.vlmeta["schema"]) + (cc,) = sd["computed_columns"] + assert cc["kind"] == "dsl" + assert "dsl_source" in cc + assert "expression" not in cc + + t2 = blosc2.open(path) + np.testing.assert_array_equal(np.asarray(t2["r"][:]), np.arange(20) + 20) + a = np.arange(20) + r = a + 20 + sel = t2.where("(r > 25) & (a < 18)")[:] + assert len(sel) == int(((r > 25) & (a < 18)).sum()) + t2.close() + + +def test_dsl_computed_compact_preserved(): + t = _make_table() + t.add_computed_column("r", k_add, inputs=["a", "b"]) + t2 = t.copy(compact=True) + np.testing.assert_array_equal(np.asarray(t2["r"][:]), np.arange(20) + 10) + + +def test_dsl_computed_materialize(): + t = _make_table() + t.add_computed_column("r", k_add, inputs=["a", "b"]) + t.materialize_computed_column("r", new_name="r_stored") + np.testing.assert_array_equal(np.asarray(t["r_stored"][:]), np.arange(20) + 10) + + +# --------------------------------------------------------------------------- +# Generated / stored DSL columns +# --------------------------------------------------------------------------- + + +def test_dsl_generated_initial_values(): + t = _make_table() + t.add_generated_column("g", values=k_add, inputs=["a", "b"], dtype=blosc2.int64()) + np.testing.assert_array_equal(np.asarray(t["g"][:]), np.arange(20) + 10) + + +def test_dsl_generated_append_autofill(): + t = _make_table(n=10) + t.add_generated_column("g", values=k_add, inputs=["a", "b"], dtype=blosc2.int64()) + t.append([100, 5]) + assert int(t["g"][:][-1]) == 105 + + +def test_dsl_generated_extend_autofill(): + t = _make_table(n=5) + t.add_generated_column("g", values=k_add, inputs=["a", "b"], dtype=blosc2.int64()) + t.extend([[50, 7], [60, 8]]) + np.testing.assert_array_equal(np.asarray(t["g"][:])[-2:], np.array([57, 68])) + + +def test_dsl_generated_refresh_after_inplace_edit(): + t = _make_table(n=10) + t.add_generated_column("g", values=k_add, inputs=["a", "b"], dtype=blosc2.int64()) + t["a"][0] = 1000 + t.refresh_generated_column("g") + assert int(t["g"][:][0]) == 1010 + + +def test_dsl_generated_create_index(): + t = _make_table() + t.add_generated_column("g", values=k_add, inputs=["a", "b"], dtype=blosc2.int64(), create_index=True) + # Index-backed filter on the stored generated column. + sel = t.where("g > 25")[:] + assert len(sel) == int((np.arange(20) + 10 > 25).sum()) + + +def test_dsl_generated_roundtrip(tmp_path): + path = str(tmp_path / "gen.b2d") + t = _make_table(n=10, urlpath=path, mode="w") + t.add_generated_column("g", values=k_add, inputs=["a", "b"], dtype=blosc2.int64()) + t.close() + + meta = blosc2.open(f"{path}/_meta.b2f") + sd = json.loads(meta.vlmeta["schema"]) + (m,) = sd["materialized_columns"] + assert m["transformer_kind"] == "dsl" + assert "dsl_source" in m + + t2 = blosc2.open(path) + np.testing.assert_array_equal(np.asarray(t2["g"][:]), np.arange(10) + 10) + t2.close() From 1177f9f9419ee85cbe70d3cc51d169628231c3de Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Wed, 3 Jun 2026 17:40:42 +0200 Subject: [PATCH 08/12] Add more blosc2.array() tests --- tests/ndarray/test_ndarray.py | 48 +++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/ndarray/test_ndarray.py b/tests/ndarray/test_ndarray.py index c21c774a0..672d4b545 100644 --- a/tests/ndarray/test_ndarray.py +++ b/tests/ndarray/test_ndarray.py @@ -128,6 +128,54 @@ def test_asarray_ndarray_copies_for_dtype_changes_and_rejects_copy_false(tmp_pat blosc2.asarray(array, urlpath=tmp_path / "persisted_copy_false.b2nd", mode="w", copy=False) +def test_array_creates_ndarray_from_sequence(): + a = blosc2.array([1, 2, 3]) + + assert isinstance(a, blosc2.NDArray) + np.testing.assert_array_equal(a[:], np.array([1, 2, 3])) + + +def test_array_copies_ndarray_by_default(): + a = blosc2.asarray([1, 2, 3]) + + b = blosc2.array(a) + + assert b is not a + np.testing.assert_array_equal(b[:], a[:]) + + +def test_array_copy_false_reuses_compatible_ndarray(): + a = blosc2.asarray([1, 2, 3]) + + b = blosc2.array(a, copy=False) + + assert b is a + + +def test_array_copy_false_rejects_required_copy(): + a = blosc2.asarray([1, 2, 3]) + + with pytest.raises(ValueError, match="copy=False"): + blosc2.array(a, dtype=np.float64, copy=False) + + +def test_array_copy_none_matches_asarray_for_compatible_ndarray(): + a = blosc2.asarray([1, 2, 3]) + + b = blosc2.array(a, copy=None) + + assert b is a + + +def test_array_honors_constructor_kwargs(): + a = blosc2.array([1, 2, 3, 4], dtype=np.float32, chunks=(4,), blocks=(2,)) + + assert a.dtype == np.dtype(np.float32) + assert a.chunks == (4,) + assert a.blocks == (2,) + np.testing.assert_array_equal(a[:], np.array([1, 2, 3, 4], dtype=np.float32)) + + def test_ndarray_info_has_human_sizes(): array = blosc2.asarray(np.arange(16, dtype=np.int32)) From 2826d347a7671d4a9525998c0dc5d509930acc8d Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Wed, 3 Jun 2026 18:41:56 +0200 Subject: [PATCH 09/12] Make BLOSC_ME_JIT env var take full priority over jit= and jit_backend= params This lets users switch JIT on/off and change backends entirely from the command line without touching code, which is the natural experimentation workflow. --- src/blosc2/lazyexpr.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index de185b7dc..3ce9d169d 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -493,10 +493,10 @@ def compute( default (``"tcc"``). - ``BLOSC_ME_JIT`` environment variable: when set to ``"1"``, ``"true"``, - ``"on"``, ``"tcc"``, or ``"cc"``, it forces ``jit=True`` for all - ``compute()`` and ``__getitem__`` calls where ``jit`` is not explicitly - passed. Setting it to ``"tcc"`` or ``"cc"`` also selects that backend - unless ``jit_backend`` is given explicitly. + ``"on"``, ``"tcc"``, or ``"cc"``, it forces ``jit=True`` and overrides + both the ``jit`` and ``jit_backend`` arguments — this lets you switch + JIT on or change backends from the command line without touching code. + Setting it to ``"tcc"`` or ``"cc"`` also selects that backend. - ``BLOSC_ME_JIT_TRACE`` environment variable: when set to ``"1"``, ``"true"``, or ``"on"``, prints a one-line diagnostic to stdout @@ -709,15 +709,14 @@ def compute_broadcast_shape(arrays): def _jit_from_env(jit, jit_backend): """Apply BLOSC_ME_JIT environment variable to jit/jit_backend defaults.""" - if jit is not None: - return jit, jit_backend env_jit = os.environ.get("BLOSC_ME_JIT", "") if not env_jit: return jit, jit_backend env_jit_lower = env_jit.lower() + # Env var always wins over both jit= and jit_backend= for easy CLI experimentation. if env_jit_lower in ("1", "true", "on", "tcc", "cc"): jit = True - if jit_backend is None and env_jit_lower in ("tcc", "cc"): + if env_jit_lower in ("tcc", "cc"): jit_backend = env_jit_lower return jit, jit_backend From 5f75c685594b59bf54ab82c5e34db042a200dc4b Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Wed, 3 Jun 2026 19:46:13 +0200 Subject: [PATCH 10/12] Add a new section on CTable in overview, with performance tips --- bench/ctable/query-backends.py | 142 +++++++++++++++++++++++++++++++ doc/getting_started/overview.rst | 92 ++++++++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 bench/ctable/query-backends.py diff --git a/bench/ctable/query-backends.py b/bench/ctable/query-backends.py new file mode 100644 index 000000000..8f9270c7b --- /dev/null +++ b/bench/ctable/query-backends.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +""" +Benchmark: CTable.where() across three evaluation backends. + +Tests how performance scales with table size (10M–500M rows) for the query: + (tips > 100) & (km > 0) & (lon < 0) + +Backends: + interpreted : miniexpr bytecode interpreter (default, no JIT) + tcc : Tiny C Compiler JIT (fast compile, modest code quality) + cc : system C compiler JIT (clang/gcc, -O3 + auto-vectorisation) + +Two timings are shown per backend: + cold – first call, includes JIT compilation cost for tcc/cc + warm – second call, kernel is cached (shared library already loaded) +""" + +import os +import sys +import time +from dataclasses import dataclass + +import numpy as np + +import blosc2 + + +SIZES = [10_000_000, 50_000_000, 100_000_000, 200_000_000, 500_000_000] +BUILD_CHUNK = 10_000_000 # rows per extend() call to avoid large temp arrays + +BACKENDS = [ + ("interpreted", None), + ("tcc", "tcc"), + ("cc", "cc"), +] + +NP_DTYPE = np.dtype([ + ("passenger_count", np.int32), + ("shared", np.bool_), + ("tips", np.float32), + ("km", np.float32), + ("lon", np.float32), +]) + + +@dataclass +class Row: + passenger_count: int = blosc2.field(blosc2.int32()) + shared: bool = blosc2.field(blosc2.bool()) + tips: float = blosc2.field(blosc2.float32()) + km: float = blosc2.field(blosc2.float32()) + lon: float = blosc2.field(blosc2.float32()) + + +def make_chunk(n: int, rng: np.random.Generator) -> np.ndarray: + # Integer-valued floats: mantissa low bytes are zero → high compression ratio. + chunk = np.empty(n, dtype=NP_DTYPE) + chunk["passenger_count"] = rng.integers(1, 7, n, dtype=np.int32) + chunk["shared"] = rng.integers(0, 2, n, dtype=np.bool_) + chunk["tips"] = rng.integers(0, 501, n).astype(np.float32) # 501 distinct values + chunk["km"] = rng.integers(-10, 201, n).astype(np.float32) # 211 distinct values + chunk["lon"] = rng.integers(-150, 51, n).astype(np.float32) # 201 distinct values + return chunk + + +def build_table(n_rows: int, rng: np.random.Generator) -> blosc2.CTable: + ct = blosc2.CTable(Row, expected_size=n_rows) + remaining = n_rows + while remaining > 0: + batch = min(remaining, BUILD_CHUNK) + ct.extend(make_chunk(batch, rng)) + remaining -= batch + return ct + + +def run_where(ct: blosc2.CTable, blosc_me_jit: str | None) -> tuple[float, int]: + """Run the where() query under the given BLOSC_ME_JIT setting. + + Thresholds are chosen so that each sub-condition passes ~4.6% of rows + independently, giving a combined selectivity of ~0.01%: + tips ~ U(0, 500): tips > 477 passes (500-477)/500 = 4.6% + km ~ U(-10, 200): km > 190 passes (200-190)/210 = 4.8% + lon ~ U(-150, 50): lon < -140 passes (-140+150)/200 = 5.0% + Combined: 0.046 * 0.048 * 0.050 ≈ 0.011% + """ + saved = os.environ.pop("BLOSC_ME_JIT", None) + try: + if blosc_me_jit is not None: + os.environ["BLOSC_ME_JIT"] = blosc_me_jit + condition = (ct.tips > 477) & (ct.km > 190) & (ct.lon < -140) + t0 = time.perf_counter() + result = ct.where(condition) + elapsed = time.perf_counter() - t0 + return elapsed, len(result) + finally: + os.environ.pop("BLOSC_ME_JIT", None) + if saved is not None: + os.environ["BLOSC_ME_JIT"] = saved + + +def fmt_row(n: int, timings: list[tuple[float, float]], n_matched: int) -> str: + parts = [f"{n:>12,}"] + for cold, warm in timings: + parts.append(f"{cold:>8.3f} {warm:>8.3f}") + parts.append(f" ({n_matched:,} matched)") + return " | ".join(parts) + + +def main(): + rng = np.random.default_rng(42) + + # Header + backend_header = " | ".join(f"{'--- ' + name + ' ---':>17}" for name, _ in BACKENDS) + print(f"\n{'':>12} | {backend_header}") + subheader = " | ".join(f"{'cold(s)':>8} {'warm(s)':>8}" for _ in BACKENDS) + print(f"{'rows':>12} | {subheader}") + print("-" * (14 + 19 * len(BACKENDS))) + + for n in SIZES: + print(f" building {n:,} rows...", end=" ", flush=True) + ct = build_table(n, rng) + print("done", flush=True) + + timings = [] + n_matched = None + for _name, backend in BACKENDS: + cold, n_matched = run_where(ct, backend) + warm, _ = run_where(ct, backend) + timings.append((cold, warm)) + + print(fmt_row(n, timings, n_matched)) + sys.stdout.flush() + + del ct # free memory before building the next (larger) table + + print() + print("cold = first call (includes JIT compilation for tcc/cc)") + print("warm = second call (kernel cached, compilation cost amortised)") + + +if __name__ == "__main__": + main() diff --git a/doc/getting_started/overview.rst b/doc/getting_started/overview.rst index 04344768c..0082532ad 100644 --- a/doc/getting_started/overview.rst +++ b/doc/getting_started/overview.rst @@ -77,6 +77,8 @@ container objects in Python-Blosc2: `buffer protocol `_. * ``NDArray``: An N-Dimensional store that mirrors the NumPy API, enhanced with efficient compressed data storage. +* ``CTable``: A columnar table for structured, record-oriented data with a + powerful query engine built on top of compressed ``NDArray`` columns. These containers are described in more detail below. @@ -257,6 +259,96 @@ parameter in ``compute()`` or ``sum()`` functions). For a more in-depth look at this example, with performance comparisons, see this `compute-bigger blog post `_. +Querying Columnar Data with CTable +=================================== + +``CTable`` is Python-Blosc2's columnar store for structured, record-oriented +data. Each column is a compressed ``NDArray``, so the same chunking, +compression, and compute-engine machinery that powers ``NDArray`` expressions +is available for tabular queries — with no data copy required. + +Schemas are defined with plain Python dataclasses, supporting a rich mix of +types including integers, floats, booleans, and strings: + +.. code-block:: python + + from dataclasses import dataclass + import blosc2 + + + @dataclass + class Row: + passenger_count: int = blosc2.field(blosc2.int32()) + shared: bool = blosc2.field(blosc2.bool()) + tips: float = blosc2.field(blosc2.float32()) + km: float = blosc2.field(blosc2.float32()) + lon: float = blosc2.field(blosc2.float32()) + company: str = blosc2.field(blosc2.string(max_length=50)) + + + t = blosc2.CTable(Row, expected_size=10_000_000) + +Columns support the full lazy-expression syntax, so compound boolean filters +are written naturally and evaluated in a single pass over the compressed data: + +.. code-block:: python + + condition = (t.tips > 100) & (t.km > 0) & (t.lon < -10) + result = t.where(condition).sort_by("km") + +Beyond filtering and sorting, ``CTable`` offers: + +* **Aggregations and group-by** — ``groupby()``, ``sum()``, ``mean()``, + ``min()``, ``max()``, ``std()`` and more, optionally with a ``where=`` + mask for conditional aggregation. +* **Computed and generated columns** — columns whose values are derived from + other columns via a lazy expression, evaluated on the fly without storing + extra data. +* **Automatic SUMMARY indexes** — per-block min/max indexes built + transparently at write time, enabling ``where()`` to skip entire blocks + that cannot contain matching rows, dramatically reducing I/O for + high-selectivity queries. +* **Schema validation** — type and constraint checking (``ge=``, ``le=``, + nullable, etc.) enforced at insert time, keeping data quality guarantees + inside the table itself. +* **Null handling** — first-class nullable columns with ``notnull()``, + ``null_count``, and null-aware aggregations. +* **Nested field paths** — hierarchically structured schemas expose columns + as ``t.payment.tips``, ``t.trip.begin.lon``, etc., keeping query code + readable even for wide, deeply nested records. +* **Parquet and Arrow round-trips** — load from and save to Parquet or Apache + Arrow with a single call, making it easy to interoperate with the broader + data ecosystem. +* **Persistent storage** — open and save tables to disk (``CTable.open()``, + ``CTable.save()``); in-memory and on-disk tables share the same API. + +.. code-block:: python + + # Load from Parquet, filter, and persist the result + t = blosc2.CTable.from_parquet("trips.parquet") + result = t.where((t.tips > 100) & (t.km > 0)).sort_by("km") + result.save("filtered_trips.b2z") + +.. tip:: + + **Free ~30% speedup for large tables:** set the ``BLOSC_ME_JIT=cc`` + environment variable to have filter expressions JIT-compiled by the system C + compiler (clang/gcc) with ``-O3`` and auto-vectorisation, instead of the + default bytecode interpreter. The compiled kernel is cached on disk so + subsequent runs pay no compilation cost. + + .. code-block:: bash + + BLOSC_ME_JIT=cc python my_script.py + + Benchmarks on tables from 50 M to 500 M rows show a consistent ~30% + speedup across Intel, AMD, and Apple Silicon hardware. The one-time + compilation cost on Linux (gcc, ~30 ms) is negligible; on macOS (clang, + ~400 ms) it is only worth paying for large tables or repeated queries. + For small tables (< ~50 M rows) the default bytecode interpreter is + perfectly adequate. See the :py:meth:`blosc2.LazyArray.compute` docstring + for the full list of ``BLOSC_ME_JIT`` values and options. + Hopefully, this overview has provided a good understanding of Python-Blosc2's capabilities. To begin your journey with Python-Blosc2, proceed to the `installation instructions `_. Then explore the From e7df9fb9bc4deaf1889e1e54389f0182e3d00447 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Thu, 4 Jun 2026 10:09:31 +0200 Subject: [PATCH 11/12] Update to use node v24 --- .github/workflows/cibuildwheels.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cibuildwheels.yml b/.github/workflows/cibuildwheels.yml index 5f7077489..8d86f7837 100644 --- a/.github/workflows/cibuildwheels.yml +++ b/.github/workflows/cibuildwheels.yml @@ -87,7 +87,7 @@ jobs: run: echo "C:\\Program Files\\LLVM\\bin" >> $env:GITHUB_PATH - name: Install MSVC amd64 - uses: ilammy/msvc-dev-cmd@v1 + uses: ilammy/msvc-dev-cmd@v1.13.0 with: arch: amd64 From 0f5b92f578182dd1cead7390639218be345f4aff Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Thu, 4 Jun 2026 10:54:34 +0200 Subject: [PATCH 12/12] Address Copilot review comments on DSL CTable columns - lazyexpr.py: avoid double _raw_col property lookup (hasattr + access) - ctable.py: only materialise referenced DSL computed columns in _where_expression_operands, not all of them eagerly - ctable.py: preserve jit_backend in DSL computed column metadata during _empty_copy (was silently dropped) - dsl_kernel.py: kernel_from_source() now validates source contains only a function definition (rejects side-effectful top-level nodes) - dsl_kernel.py: raise ValueError with clear message when source does not define the requested function name (was a cryptic KeyError) - ctable.rst: add security warning about opening .b2d files from untrusted sources when DSL columns are present --- doc/reference/ctable.rst | 8 ++++++++ src/blosc2/ctable.py | 18 ++++++++++++------ src/blosc2/dsl_kernel.py | 28 +++++++++++++++++++++++++--- src/blosc2/lazyexpr.py | 3 ++- 4 files changed, 47 insertions(+), 10 deletions(-) diff --git a/doc/reference/ctable.rst b/doc/reference/ctable.rst index 789ce0b72..10481afbd 100644 --- a/doc/reference/ctable.rst +++ b/doc/reference/ctable.rst @@ -340,6 +340,14 @@ functions, and :class:`blosc2.LazyUDF` objects. DSL kernels support full Python control flow (``if``/``else``, ``where()``, loops) and have their source persisted and recompiled on open. +.. warning:: + + Because DSL kernel source is persisted in the table metadata and re-executed + during :func:`blosc2.open`, **do not open** ``.b2d`` files from untrusted + sources if they may contain DSL computed or generated columns. The kernel + source runs with restricted builtins (no ``__import__``), but arbitrary + Python code execution still carries risk. + When passing a :class:`blosc2.LazyUDF` built with an explicit ``jit_backend=`` (e.g. ``jit_backend="cc"`` to use the system C compiler instead of the default TCC), that choice is persisted in the column metadata and automatically restored diff --git a/src/blosc2/ctable.py b/src/blosc2/ctable.py index b27b17c08..45aceab75 100644 --- a/src/blosc2/ctable.py +++ b/src/blosc2/ctable.py @@ -1914,7 +1914,7 @@ def _normalize_sum_where(self, where): return None if isinstance(where, str): self._table._guard_varlen_scalar_expression(where) - operands = self._table._where_expression_operands() + operands = self._table._where_expression_operands(where) where, operands = self._table._rewrite_nested_expression(where, operands) where = blosc2.lazyexpr(where, operands) if isinstance(where, np.ndarray) and where.dtype == np.bool_: @@ -8383,7 +8383,7 @@ def _normalize_expression_transformer(self, expr) -> tuple[blosc2.LazyExpr, list lazy = expr(self._cols) elif isinstance(expr, str): self._guard_scalar_expression(expr) - operands = self._where_expression_operands() + operands = self._where_expression_operands(expr) expr, operands = self._rewrite_nested_expression(expr, operands) lazy = blosc2.lazyexpr(expr, operands) else: @@ -10015,13 +10015,15 @@ def _empty_copy( if cc.get("kind") == "dsl": # DSL entries hold the live kernel; the LazyUDF is rebuilt on # demand from obj._cols, so no operand rebinding is needed here. - obj._computed_cols[cc_name] = { + dsl_entry: dict[str, Any] = { "kind": "dsl", "dsl_source": cc["dsl_source"], "kernel": cc["kernel"], "col_deps": cc["col_deps"], "dtype": cc["dtype"], + **({"jit_backend": cc["jit_backend"]} if "jit_backend" in cc else {}), } + obj._computed_cols[cc_name] = dsl_entry else: operands = {f"o{i}": new_cols[dep] for i, dep in enumerate(cc["col_deps"])} new_lazy = blosc2.lazyexpr(cc["expression"], operands) @@ -10555,7 +10557,9 @@ def extend(self, data: list | CTable | Any, *, validate: bool | None = None) -> # Filtering # ------------------------------------------------------------------ - def _where_expression_operands(self) -> dict[str, blosc2.NDArray | blosc2.LazyExpr]: + def _where_expression_operands( + self, expr: str | None = None + ) -> dict[str, blosc2.NDArray | blosc2.LazyExpr]: operands = {} for name, arr in self._cols.items(): col = self._schema.columns_by_name.get(name) @@ -10566,7 +10570,9 @@ def _where_expression_operands(self) -> dict[str, blosc2.NDArray | blosc2.LazyEx or self._is_ndarray_column(col) ): operands[name] = arr - operands.update({name: self._build_computed_lazy(cc) for name, cc in self._computed_cols.items()}) + for name, cc in self._computed_cols.items(): + if expr is None or self._expression_references_name(expr, name): + operands[name] = self._build_computed_lazy(cc) return operands def _rewrite_nested_expression( @@ -10714,7 +10720,7 @@ def where( # noqa: C901 """ if isinstance(expr_result, str): self._guard_varlen_scalar_expression(expr_result) - operands = self._where_expression_operands() + operands = self._where_expression_operands(expr_result) expr_result, operands = self._rewrite_nested_expression(expr_result, operands) expr_result = blosc2.lazyexpr(expr_result, operands) if isinstance(expr_result, np.ndarray) and expr_result.dtype == np.bool_: diff --git a/src/blosc2/dsl_kernel.py b/src/blosc2/dsl_kernel.py index d4ad0309c..4360a7e81 100644 --- a/src/blosc2/dsl_kernel.py +++ b/src/blosc2/dsl_kernel.py @@ -643,9 +643,9 @@ def kernel_from_source(source: str, name: str | None = None) -> DSLKernel: import blosc2 + tree = ast.parse(source) + func_defs = [n for n in tree.body if isinstance(n, ast.FunctionDef)] if name is None: - tree = ast.parse(source) - func_defs = [n for n in tree.body if isinstance(n, ast.FunctionDef)] if len(func_defs) != 1: raise ValueError( "kernel_from_source requires an explicit 'name' when 'source' does not " @@ -653,6 +653,25 @@ def kernel_from_source(source: str, name: str | None = None) -> DSLKernel: ) name = func_defs[0].name + # Security: reject sources that include top-level statements with + # side effects (assignments, calls, loops, etc.) beyond the function + # definition itself. Imports and docstrings are permitted. + allowed_nodes = (ast.FunctionDef, ast.Import, ast.ImportFrom) + for node in tree.body: + if isinstance(node, allowed_nodes): + continue + # A bare string literal at module level is a docstring — safe. + if ( + isinstance(node, ast.Expr) + and isinstance(node.value, ast.Constant) + and isinstance(node.value.value, str) + ): + continue + raise ValueError( + f"kernel_from_source source must contain only a function definition " + f"(optionally preceded by imports), but found: {ast.dump(node)[:120]}" + ) + local_ns: dict = {} filename = f"<{name}>" safe_globals = { @@ -662,7 +681,10 @@ def kernel_from_source(source: str, name: str | None = None) -> DSLKernel: } linecache.cache[filename] = (len(source), None, source.splitlines(True), filename) exec(compile(source, filename, "exec"), safe_globals, local_ns) - func = local_ns[name] + try: + func = local_ns[name] + except KeyError: + raise ValueError(f"kernel_from_source: source did not define function {name!r}") from None if not isinstance(func, DSLKernel): func = DSLKernel(func) return func diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 3ce9d169d..8d23d8f60 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -12,6 +12,7 @@ import asyncio import builtins import concurrent.futures +import contextlib import copy import enum import inspect @@ -683,7 +684,7 @@ def convert_inputs(inputs): inputs_ = [] for obj in inputs: # CTable Column — unwrap to the backing NDArray so shape and identity match. - if hasattr(obj, "_raw_col"): + with contextlib.suppress(AttributeError): obj = obj._raw_col if not isinstance(obj, np.ndarray | blosc2.Operand) and not np.isscalar(obj): try: