diff --git a/.all-contributorsrc b/.all-contributorsrc index 9821a70c4..2f0ccb897 100644 --- a/.all-contributorsrc +++ b/.all-contributorsrc @@ -258,6 +258,12 @@ ] }, { + "login": "arnavk23", + "name": "Arnav Kapoor", + "avatar_url": "https://avatars.githubusercontent.com/u/arnavk23?v=4", + "profile": "https://github.com/arnavk23", + "contributions": [ + "code", "login": "kabirvashisht4-glitch", "name": "Kabir Vashisht", "contributions": [ diff --git a/docs/source/api_reference/regression.rst b/docs/source/api_reference/regression.rst index d312b46a1..28c8a206b 100644 --- a/docs/source/api_reference/regression.rst +++ b/docs/source/api_reference/regression.rst @@ -149,6 +149,14 @@ Formally, these algorithms are reduction algorithms, to tabular regression. CyclicBoosting +.. currentmodule:: skpro.regression.shrinking_interval + +.. autosummary:: + :toctree: auto_generated/ + :template: class.rst + + ShrinkingNormalIntervalRegressor + Reduction to probabilistic classification ----------------------------------------- diff --git a/skpro/regression/__init__.py b/skpro/regression/__init__.py index 8ceb27150..eea9c7eea 100644 --- a/skpro/regression/__init__.py +++ b/skpro/regression/__init__.py @@ -7,6 +7,7 @@ ) from skpro.regression.jackknife import MapieJackknifeAfterBootstrapRegressor from skpro.regression.nonparametric import NadarayaWatsonCDE +from skpro.regression.shrinking_interval import ShrinkingNormalIntervalRegressor __all__ = [ "MapieSplitConformalRegressor", @@ -14,4 +15,5 @@ "MapieConformalizedQuantileRegressor", "MapieJackknifeAfterBootstrapRegressor", "NadarayaWatsonCDE", + "ShrinkingNormalIntervalRegressor", ] diff --git a/skpro/regression/shrinking_interval.py b/skpro/regression/shrinking_interval.py new file mode 100644 index 000000000..39fa2de0b --- /dev/null +++ b/skpro/regression/shrinking_interval.py @@ -0,0 +1,149 @@ +"""Shrinking normal interval regressor with a static quantile baseline.""" + +from typing import Any, List, Optional + +import numpy as np +import pandas as pd +from scipy.stats import norm + +from skpro.regression.base import BaseProbaRegressor + + +class ShrinkingNormalIntervalRegressor(BaseProbaRegressor): + """Probabilistic regressor with shrinking intervals in ``mean_sd`` mode. + + Parameters + ---------- + method : str, default="mean_sd" + "mean_sd" for normal-approximation intervals that shrink with ``n``; + "quantile" for a static empirical-quantile baseline. + + Examples + -------- + >>> import pandas as pd + >>> from sklearn.datasets import load_diabetes + >>> from sklearn.model_selection import train_test_split + >>> from skpro.regression.shrinking_interval import ShrinkingNormalIntervalRegressor + >>> X, y = load_diabetes(return_X_y=True, as_frame=True) + >>> y = pd.DataFrame(y) + >>> X_train, X_test, y_train, y_test = train_test_split(X, y) + >>> reg = ShrinkingNormalIntervalRegressor(method="mean_sd") + >>> reg.fit(X_train, y_train) + ShrinkingNormalIntervalRegressor(...) + >>> y_pred = reg.predict(X_test) + >>> intervals = reg.predict_interval(X_test, coverage=[0.9]) + >>> quantiles = reg.predict_quantiles(X_test, alpha=[0.05, 0.5, 0.95]) + """ + + _tags = { + "authors": ["arnavk23"], + "capability:multioutput": False, + "capability:missing": True, + } + + def __init__(self, method: str = "mean_sd"): + if method not in ("mean_sd", "quantile"): + raise ValueError(f"method must be 'mean_sd' or 'quantile', got {method}") + self.method = method + super().__init__() + + def _fit(self, X: pd.DataFrame, y: pd.DataFrame, C: Optional[Any] = None): + # Input validation + if not isinstance(y, (pd.DataFrame, pd.Series, np.ndarray)) or len(y) == 0: + raise ValueError("y must be a non-empty DataFrame, Series, or ndarray") + self._X = X.copy() + self._y = y.copy() if hasattr(y, "copy") else np.array(y) + self._mean = np.asarray( + y.mean().values if hasattr(y, "mean") else np.mean(y, axis=0) + ) + self._std = np.asarray( + y.std(ddof=1).values if hasattr(y, "std") else np.std(y, ddof=1, axis=0) + ) + if np.any(~np.isfinite(self._std)): + self._std = np.nan_to_num(self._std, nan=0.0, posinf=0.0, neginf=0.0) + self._n = len(y) + self._y_cols = y.columns if hasattr(y, "columns") else ["y"] + return self + + def _predict(self, X: pd.DataFrame) -> pd.DataFrame: + # Predict mean for all X (featureless baseline) + mean = pd.DataFrame( + np.tile(self._mean, (len(X), 1)), index=X.index, columns=self._y_cols + ) + return mean + + def _predict_interval(self, X: pd.DataFrame, coverage: List[float]) -> pd.DataFrame: + n = self._n + mean = np.tile(self._mean, (len(X), 1)) + std = np.tile(self._std, (len(X), 1)) + intervals = [] + for c in coverage: + if not (0 < c <= 1): + raise ValueError(f"coverage must be in (0, 1], got {c}") + if self.method == "mean_sd": + z = abs(norm.ppf((1 + c) / 2)) + half_width = z * std / np.sqrt(n) + lower = mean - half_width + upper = mean + half_width + elif self.method == "quantile": + # Use empirical quantiles from training data, not shrinking + lower = np.tile( + np.percentile(self._y.values, 100 * (1 - c) / 2, axis=0), + (len(X), 1), + ) + upper = np.tile( + np.percentile(self._y.values, 100 * (1 + c) / 2, axis=0), + (len(X), 1), + ) + else: + raise NotImplementedError(f"Unknown method: {self.method}") + intervals.append((lower, upper)) + # Build MultiIndex columns + arrays = [ + np.repeat(self._y_cols, len(coverage) * 2), + np.tile(np.repeat(coverage, 2), len(self._y_cols)), + np.tile(["lower", "upper"], len(self._y_cols) * len(coverage)), + ] + columns = pd.MultiIndex.from_arrays(arrays, names=["var", "coverage", "bound"]) + data = np.hstack( + [np.column_stack([lower, upper]) for lower, upper in intervals] + ) + return pd.DataFrame(data, index=X.index, columns=columns) + + def _predict_quantiles(self, X: pd.DataFrame, alpha: List[float]) -> pd.DataFrame: + quantiles = [] + for a in alpha: + if not (0 <= a <= 1): + raise ValueError(f"alpha must be in [0, 1], got {a}") + if self.method == "quantile": + q = np.percentile(self._y.values, 100 * a, axis=0) + elif self.method == "mean_sd": + # For mean_sd, return mean for median + # or mean +/- z*std/sqrt(n) for tails + if a == 0.5: + q = self._mean + else: + z = abs(norm.ppf(a)) + q = self._mean + np.sign(a - 0.5) * z * self._std / np.sqrt(self._n) + else: + raise NotImplementedError(f"Unknown method: {self.method}") + quantiles.append(np.tile(q, (len(X), 1))) + # Build MultiIndex columns + arrays = [ + np.repeat(self._y_cols, len(alpha)), + np.tile(alpha, len(self._y_cols)), + ] + columns = pd.MultiIndex.from_arrays(arrays, names=["var", "alpha"]) + data = np.hstack(quantiles) + return pd.DataFrame(data, index=X.index, columns=columns) + + @classmethod + def get_test_params(cls, parameter_set: str = "default"): + """Return testing parameter sets for automated tests. + + Returns two parameter sets: one for mean/sd, one for quantile method. + """ + return [ + {"method": "mean_sd"}, + {"method": "quantile"}, + ] diff --git a/skpro/regression/tests/test_reducing_interval.py b/skpro/regression/tests/test_reducing_interval.py new file mode 100644 index 000000000..37e301aa5 --- /dev/null +++ b/skpro/regression/tests/test_reducing_interval.py @@ -0,0 +1,143 @@ +import numpy as np +import pandas as pd +import pandas.testing as pdt +from scipy.stats import norm + +from skpro.regression.shrinking_interval import ShrinkingNormalIntervalRegressor + + +def _make_xy(values): + X = pd.DataFrame({"x": np.arange(len(values))}) + y = pd.DataFrame({"y": values}) + return X, y + + +def test_mean_sd_interval_width_shrinks_with_n(): + X_test = pd.DataFrame({"x": [0, 1]}) + + X_small, y_small = _make_xy([0.0, 2.0, 0.0, 2.0]) + X_large, y_large = _make_xy([0.0, 2.0] * 8) + + reg_small = ShrinkingNormalIntervalRegressor(method="mean_sd").fit(X_small, y_small) + reg_large = ShrinkingNormalIntervalRegressor(method="mean_sd").fit(X_large, y_large) + + int_small = reg_small.predict_interval(X_test, coverage=[0.9]) + int_large = reg_large.predict_interval(X_test, coverage=[0.9]) + + width_small = int_small.iloc[:, 1] - int_small.iloc[:, 0] + width_large = int_large.iloc[:, 1] - int_large.iloc[:, 0] + + assert (width_large < width_small).all() + + +def test_predict_interval_and_quantiles_shapes_and_values(): + X_train, y_train = _make_xy([0.0, 2.0, 0.0, 2.0]) + X_test = pd.DataFrame({"x": [10, 11, 12]}) + + coverage = [0.8] + alpha = [0.1, 0.5, 0.9] + + for method in ["mean_sd", "quantile"]: + reg = ShrinkingNormalIntervalRegressor(method=method).fit(X_train, y_train) + + pred_int = reg.predict_interval(X_test, coverage=coverage) + pred_q = reg.predict_quantiles(X_test, alpha=alpha) + + assert pred_int.shape == (len(X_test), 2) + assert pred_q.shape == (len(X_test), 3) + + expected_int_cols = pd.MultiIndex.from_product( + [["y"], coverage, ["lower", "upper"]], names=["var", "coverage", "bound"] + ) + expected_q_cols = pd.MultiIndex.from_product( + [["y"], alpha], names=["var", "alpha"] + ) + + pdt.assert_index_equal(pred_int.columns, expected_int_cols) + pdt.assert_index_equal(pred_q.columns, expected_q_cols) + + if method == "mean_sd": + mean = float(np.mean(y_train.to_numpy())) + std = float(np.std(y_train.to_numpy(), ddof=1)) + n = len(y_train) + + z_interval = abs(norm.ppf((1 + coverage[0]) / 2)) + half_width = z_interval * std / np.sqrt(n) + expected_int = np.column_stack( + [ + np.full(len(X_test), mean - half_width), + np.full(len(X_test), mean + half_width), + ] + ) + + expected_q = np.column_stack( + [ + ( + np.full( + len(X_test), + mean + + np.sign(a - 0.5) * abs(norm.ppf(a)) * std / np.sqrt(n), + ) + if a != 0.5 + else np.full(len(X_test), mean) + ) + for a in alpha + ] + ) + else: + y_values = y_train.to_numpy() + expected_int = np.column_stack( + [ + np.full( + len(X_test), + np.percentile( + y_values, + 100 * (1 - coverage[0]) / 2, + axis=0, + )[0], + ), + np.full( + len(X_test), + np.percentile( + y_values, + 100 * (1 + coverage[0]) / 2, + axis=0, + )[0], + ), + ] + ) + expected_q = np.column_stack( + [ + np.full( + len(X_test), + np.percentile(y_values, 100 * a, axis=0)[0], + ) + for a in alpha + ] + ) + + pdt.assert_frame_equal( + pred_int.reset_index(drop=True), + pd.DataFrame(expected_int, columns=pred_int.columns), + check_dtype=False, + ) + pdt.assert_frame_equal( + pred_q.reset_index(drop=True), + pd.DataFrame(expected_q, columns=pred_q.columns), + check_dtype=False, + ) + + +def test_small_n_mean_sd_edge_case(): + X_train, y_train = _make_xy([5.0]) + X_test = pd.DataFrame({"x": [0, 1]}) + + reg = ShrinkingNormalIntervalRegressor(method="mean_sd").fit(X_train, y_train) + + pred_int = reg.predict_interval(X_test, coverage=[0.9]) + pred_q = reg.predict_quantiles(X_test, alpha=[0.1, 0.5, 0.9]) + + assert np.isfinite(pred_int.to_numpy()).all() + assert np.isfinite(pred_q.to_numpy()).all() + assert np.allclose(pred_int.to_numpy(), 5.0) + assert np.allclose(pred_q.to_numpy(), 5.0)