Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .all-contributorsrc
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@
]
},
{
"login": "arnavk23",
"name": "Arnav Kapoor",
"avatar_url": "https://avatars.githubusercontent.com/u/arnavk23?v=4",
"profile": "https://github.com/arnavk23",
"contributions": [
"code",
"login": "kabirvashisht4-glitch",
"name": "Kabir Vashisht",
"contributions": [
Expand Down
8 changes: 8 additions & 0 deletions docs/source/api_reference/regression.rst
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ Formally, these algorithms are reduction algorithms, to tabular regression.

CyclicBoosting

.. currentmodule:: skpro.regression.shrinking_interval

.. autosummary::
:toctree: auto_generated/
:template: class.rst

ShrinkingNormalIntervalRegressor

Reduction to probabilistic classification
-----------------------------------------

Expand Down
2 changes: 2 additions & 0 deletions skpro/regression/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
)
from skpro.regression.jackknife import MapieJackknifeAfterBootstrapRegressor
from skpro.regression.nonparametric import NadarayaWatsonCDE
from skpro.regression.shrinking_interval import ShrinkingNormalIntervalRegressor

__all__ = [
"MapieSplitConformalRegressor",
"MapieCrossConformalRegressor",
"MapieConformalizedQuantileRegressor",
"MapieJackknifeAfterBootstrapRegressor",
"NadarayaWatsonCDE",
"ShrinkingNormalIntervalRegressor",
]
149 changes: 149 additions & 0 deletions skpro/regression/shrinking_interval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""Shrinking normal interval regressor with a static quantile baseline."""

from typing import Any, List, Optional

import numpy as np
import pandas as pd
from scipy.stats import norm

from skpro.regression.base import BaseProbaRegressor


class ShrinkingNormalIntervalRegressor(BaseProbaRegressor):
"""Probabilistic regressor with shrinking intervals in ``mean_sd`` mode.

Parameters
----------
method : str, default="mean_sd"
"mean_sd" for normal-approximation intervals that shrink with ``n``;
"quantile" for a static empirical-quantile baseline.

Examples
--------
>>> import pandas as pd
>>> from sklearn.datasets import load_diabetes
>>> from sklearn.model_selection import train_test_split
>>> from skpro.regression.shrinking_interval import ShrinkingNormalIntervalRegressor
>>> X, y = load_diabetes(return_X_y=True, as_frame=True)
>>> y = pd.DataFrame(y)
>>> X_train, X_test, y_train, y_test = train_test_split(X, y)
>>> reg = ShrinkingNormalIntervalRegressor(method="mean_sd")
>>> reg.fit(X_train, y_train)
ShrinkingNormalIntervalRegressor(...)
>>> y_pred = reg.predict(X_test)
>>> intervals = reg.predict_interval(X_test, coverage=[0.9])
>>> quantiles = reg.predict_quantiles(X_test, alpha=[0.05, 0.5, 0.95])
"""

_tags = {
"authors": ["arnavk23"],
"capability:multioutput": False,
"capability:missing": True,
}

def __init__(self, method: str = "mean_sd"):
if method not in ("mean_sd", "quantile"):
raise ValueError(f"method must be 'mean_sd' or 'quantile', got {method}")
self.method = method
super().__init__()

def _fit(self, X: pd.DataFrame, y: pd.DataFrame, C: Optional[Any] = None):
# Input validation
if not isinstance(y, (pd.DataFrame, pd.Series, np.ndarray)) or len(y) == 0:
raise ValueError("y must be a non-empty DataFrame, Series, or ndarray")
self._X = X.copy()
self._y = y.copy() if hasattr(y, "copy") else np.array(y)
self._mean = np.asarray(
y.mean().values if hasattr(y, "mean") else np.mean(y, axis=0)
)
self._std = np.asarray(
y.std(ddof=1).values if hasattr(y, "std") else np.std(y, ddof=1, axis=0)
)
if np.any(~np.isfinite(self._std)):
self._std = np.nan_to_num(self._std, nan=0.0, posinf=0.0, neginf=0.0)
self._n = len(y)
self._y_cols = y.columns if hasattr(y, "columns") else ["y"]
return self

def _predict(self, X: pd.DataFrame) -> pd.DataFrame:
# Predict mean for all X (featureless baseline)
mean = pd.DataFrame(
np.tile(self._mean, (len(X), 1)), index=X.index, columns=self._y_cols
)
return mean

def _predict_interval(self, X: pd.DataFrame, coverage: List[float]) -> pd.DataFrame:
n = self._n
mean = np.tile(self._mean, (len(X), 1))
std = np.tile(self._std, (len(X), 1))
intervals = []
for c in coverage:
if not (0 < c <= 1):
raise ValueError(f"coverage must be in (0, 1], got {c}")
if self.method == "mean_sd":
z = abs(norm.ppf((1 + c) / 2))
half_width = z * std / np.sqrt(n)
lower = mean - half_width
upper = mean + half_width
elif self.method == "quantile":
# Use empirical quantiles from training data, not shrinking
lower = np.tile(
np.percentile(self._y.values, 100 * (1 - c) / 2, axis=0),
(len(X), 1),
)
upper = np.tile(
np.percentile(self._y.values, 100 * (1 + c) / 2, axis=0),
(len(X), 1),
)
else:
raise NotImplementedError(f"Unknown method: {self.method}")
intervals.append((lower, upper))
# Build MultiIndex columns
arrays = [
np.repeat(self._y_cols, len(coverage) * 2),
np.tile(np.repeat(coverage, 2), len(self._y_cols)),
np.tile(["lower", "upper"], len(self._y_cols) * len(coverage)),
]
columns = pd.MultiIndex.from_arrays(arrays, names=["var", "coverage", "bound"])
data = np.hstack(
[np.column_stack([lower, upper]) for lower, upper in intervals]
)
return pd.DataFrame(data, index=X.index, columns=columns)

def _predict_quantiles(self, X: pd.DataFrame, alpha: List[float]) -> pd.DataFrame:
quantiles = []
for a in alpha:
if not (0 <= a <= 1):
raise ValueError(f"alpha must be in [0, 1], got {a}")
if self.method == "quantile":
q = np.percentile(self._y.values, 100 * a, axis=0)
elif self.method == "mean_sd":
# For mean_sd, return mean for median
# or mean +/- z*std/sqrt(n) for tails
if a == 0.5:
q = self._mean
else:
z = abs(norm.ppf(a))
q = self._mean + np.sign(a - 0.5) * z * self._std / np.sqrt(self._n)
else:
raise NotImplementedError(f"Unknown method: {self.method}")
quantiles.append(np.tile(q, (len(X), 1)))
# Build MultiIndex columns
arrays = [
np.repeat(self._y_cols, len(alpha)),
np.tile(alpha, len(self._y_cols)),
]
columns = pd.MultiIndex.from_arrays(arrays, names=["var", "alpha"])
data = np.hstack(quantiles)
return pd.DataFrame(data, index=X.index, columns=columns)

@classmethod
def get_test_params(cls, parameter_set: str = "default"):
"""Return testing parameter sets for automated tests.

Returns two parameter sets: one for mean/sd, one for quantile method.
"""
return [
{"method": "mean_sd"},
{"method": "quantile"},
]
143 changes: 143 additions & 0 deletions skpro/regression/tests/test_reducing_interval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import numpy as np
import pandas as pd
import pandas.testing as pdt
from scipy.stats import norm

from skpro.regression.shrinking_interval import ShrinkingNormalIntervalRegressor


def _make_xy(values):
X = pd.DataFrame({"x": np.arange(len(values))})
y = pd.DataFrame({"y": values})
return X, y


def test_mean_sd_interval_width_shrinks_with_n():
X_test = pd.DataFrame({"x": [0, 1]})

X_small, y_small = _make_xy([0.0, 2.0, 0.0, 2.0])
X_large, y_large = _make_xy([0.0, 2.0] * 8)

reg_small = ShrinkingNormalIntervalRegressor(method="mean_sd").fit(X_small, y_small)
reg_large = ShrinkingNormalIntervalRegressor(method="mean_sd").fit(X_large, y_large)

int_small = reg_small.predict_interval(X_test, coverage=[0.9])
int_large = reg_large.predict_interval(X_test, coverage=[0.9])

width_small = int_small.iloc[:, 1] - int_small.iloc[:, 0]
width_large = int_large.iloc[:, 1] - int_large.iloc[:, 0]

assert (width_large < width_small).all()


def test_predict_interval_and_quantiles_shapes_and_values():
X_train, y_train = _make_xy([0.0, 2.0, 0.0, 2.0])
X_test = pd.DataFrame({"x": [10, 11, 12]})

coverage = [0.8]
alpha = [0.1, 0.5, 0.9]

for method in ["mean_sd", "quantile"]:
reg = ShrinkingNormalIntervalRegressor(method=method).fit(X_train, y_train)

pred_int = reg.predict_interval(X_test, coverage=coverage)
pred_q = reg.predict_quantiles(X_test, alpha=alpha)

assert pred_int.shape == (len(X_test), 2)
assert pred_q.shape == (len(X_test), 3)

expected_int_cols = pd.MultiIndex.from_product(
[["y"], coverage, ["lower", "upper"]], names=["var", "coverage", "bound"]
)
expected_q_cols = pd.MultiIndex.from_product(
[["y"], alpha], names=["var", "alpha"]
)

pdt.assert_index_equal(pred_int.columns, expected_int_cols)
pdt.assert_index_equal(pred_q.columns, expected_q_cols)

if method == "mean_sd":
mean = float(np.mean(y_train.to_numpy()))
std = float(np.std(y_train.to_numpy(), ddof=1))
n = len(y_train)

z_interval = abs(norm.ppf((1 + coverage[0]) / 2))
half_width = z_interval * std / np.sqrt(n)
expected_int = np.column_stack(
[
np.full(len(X_test), mean - half_width),
np.full(len(X_test), mean + half_width),
]
)

expected_q = np.column_stack(
[
(
np.full(
len(X_test),
mean
+ np.sign(a - 0.5) * abs(norm.ppf(a)) * std / np.sqrt(n),
)
if a != 0.5
else np.full(len(X_test), mean)
)
for a in alpha
]
)
else:
y_values = y_train.to_numpy()
expected_int = np.column_stack(
[
np.full(
len(X_test),
np.percentile(
y_values,
100 * (1 - coverage[0]) / 2,
axis=0,
)[0],
),
np.full(
len(X_test),
np.percentile(
y_values,
100 * (1 + coverage[0]) / 2,
axis=0,
)[0],
),
]
)
expected_q = np.column_stack(
[
np.full(
len(X_test),
np.percentile(y_values, 100 * a, axis=0)[0],
)
for a in alpha
]
)

pdt.assert_frame_equal(
pred_int.reset_index(drop=True),
pd.DataFrame(expected_int, columns=pred_int.columns),
check_dtype=False,
)
pdt.assert_frame_equal(
pred_q.reset_index(drop=True),
pd.DataFrame(expected_q, columns=pred_q.columns),
check_dtype=False,
)


def test_small_n_mean_sd_edge_case():
X_train, y_train = _make_xy([5.0])
X_test = pd.DataFrame({"x": [0, 1]})

reg = ShrinkingNormalIntervalRegressor(method="mean_sd").fit(X_train, y_train)

pred_int = reg.predict_interval(X_test, coverage=[0.9])
pred_q = reg.predict_quantiles(X_test, alpha=[0.1, 0.5, 0.9])

assert np.isfinite(pred_int.to_numpy()).all()
assert np.isfinite(pred_q.to_numpy()).all()
assert np.allclose(pred_int.to_numpy(), 5.0)
assert np.allclose(pred_q.to_numpy(), 5.0)
Loading