diff --git a/ymir/common/product_pages.py b/ymir/common/product_pages.py new file mode 100644 index 00000000..f95899bc --- /dev/null +++ b/ymir/common/product_pages.py @@ -0,0 +1,286 @@ +""" +Product Pages helpers for RHEL y-stream and z-stream labels. + +This module queries the internal Product Pages API (HTTP SPNEGO via +``requests-gssapi``) and derives current y-streams, current z-streams, and +upcoming z-streams from active releases and GA/ZStream release metadata. + +Callers must ensure a valid Kerberos ticket is available before invoking +``fetch_rhel_streams_snapshot``; this module does not initialize Kerberos. + +Public API: ``await fetch_rhel_streams_snapshot()`` (async coroutine). Blocking +HTTP (``requests``) runs in a thread pool so the event loop is not blocked. +Everything else in this module is an implementation detail. +""" + +import asyncio +import json +import os +import re +from collections import defaultdict +from functools import cache + +import requests +import requests_gssapi +from beeai_framework.tools import ToolError + +_PLAIN_SHORTNAME_RE = re.compile(r"^rhel-(\d+)\.(\d+)$") +_GA_ZSTREAM_RE = re.compile(r"\(GA\/ZStream\)") + +_OIDC_AUTHENTICATE_URL = "https://pp.engineering.redhat.com/oidc/authenticate" +_RELEASES_API_URL = "https://pp.engineering.redhat.com/api/v7/releases/" + +# ``requests`` accepts ``(connect, read)`` in seconds. OIDC/GSSAPI can be slow to +# establish; the releases listing can return a large JSON payload. +_PRODUCT_PAGES_TIMEOUT = (30.0, 120.0) + + +@cache +def _product_pages_verify() -> bool | str: + """TLS ``verify`` argument for ``requests``: corporate CA bundle if configured. + + Matches ``ymir.supervisor.errata_utils.ET_verify`` (``REDHAT_IT_CA_BUNDLE``) + and OpenShift-style ``REQUESTS_CA_BUNDLE`` when set. + """ + for key in ("REDHAT_IT_CA_BUNDLE", "REQUESTS_CA_BUNDLE"): + path = os.getenv(key) + if path: + return path + return True + + +def _rhel_sort_key(shortname: str) -> tuple[int, ...]: + """Sort key for RHEL shortnames by numeric major.minor (not lexicographic). + + Example: rhel-10.3 sorts after rhel-9.9. + + Returns: + Tuple of ints for lexical comparison ordering (major, minor, ...). + """ + body = shortname.removeprefix("rhel-").removesuffix(".z") + parts = body.split(".") + return tuple(int(p) for p in parts) + + +def _parse_plain_rhel_minor(shortname: str) -> tuple[int, int] | None: + """ + Parse rhel-M.m shortname (optional .z stripped). + + Args: + shortname: Release shortname such as ``rhel-9.6`` or ``rhel-9.6.z``. + + Returns: + ``(major, minor)`` or None if the pattern does not match. + """ + base = shortname.removesuffix(".z") + m = _PLAIN_SHORTNAME_RE.match(base) + if not m: + return None + return int(m.group(1)), int(m.group(2)) + + +def _format_z_label(shortname_or_stem: str) -> str: + """ + Display form for z-stream maps (e.g. ``rhel-9.6`` -> ``rhel-9.6.z``). + + Args: + shortname_or_stem: Shortname or stem; ``.z`` is appended when missing. + + Returns: + Canonical z-stream label string. + """ + s = shortname_or_stem.strip() + if s.endswith(".z"): + return s + return f"{s}.z" + + +def _build_current_y_streams(active_releases: list[dict]) -> dict[str, str]: + """ + Best current y-stream shortname per RHEL major. + + Args: + active_releases: Active release records (must include ``shortname``). + + Returns: + Mapping major version string -> highest ``rhel-M.m`` shortname among + active plain y-style names. + """ + best: dict[int, tuple[tuple[int, ...], str]] = {} + for item in active_releases: + sn = item.get("shortname") or "" + parsed = _parse_plain_rhel_minor(sn) + if not parsed: + continue + maj, _ = parsed + key = _rhel_sort_key(sn) + prev = best.get(maj) + if prev is None or key > prev[0]: + best[maj] = (key, sn) + return {str(m): sn for m, (_, sn) in sorted(best.items())} + + +def _build_upcoming_z_streams(active_releases: list[dict]) -> dict[str, str]: + """ + Upcoming z-stream label per major when multiple active streams exist. + + If a major has more than one active release stream, the lower version is + treated as the upcoming z-stream; otherwise that major is omitted. + + Args: + active_releases: Active release records (must include ``shortname``). + + Returns: + Mapping major version string -> upcoming z-stream label (with ``.z``). + """ + by_major: defaultdict[int, list[str]] = defaultdict(list) + for item in active_releases: + sn = item.get("shortname") or "" + parsed = _parse_plain_rhel_minor(sn) + if not parsed: + continue + maj, _ = parsed + by_major[maj].append(sn) + + out: dict[str, str] = {} + for maj in sorted(by_major): + sns = by_major[maj] + if len(sns) <= 1: + continue + lower = min(sns, key=_rhel_sort_key) + out[str(maj)] = _format_z_label(lower) + return out + + +def _build_current_z_streams_ga_zstream(ga_zstream_rows: list[dict]) -> dict[str, str]: + """ + Current z-stream labels from GA/ZStream maintenance releases. + + Rows should be releases whose ``name_incl_maint`` matches (GA/ZStream). + If several exist per major, the highest version is used. + + Args: + ga_zstream_rows: Filtered release dicts with ``shortname`` set. + + Returns: + Mapping major version string -> current z-stream label (with ``.z``). + """ + by_major: defaultdict[int, list[str]] = defaultdict(list) + for item in ga_zstream_rows: + sn = item.get("shortname") or "" + parsed = _parse_plain_rhel_minor(sn) + if not parsed: + continue + maj, _ = parsed + by_major[maj].append(sn) + + out: dict[str, str] = {} + for maj in sorted(by_major): + sns = by_major[maj] + top = max(sns, key=_rhel_sort_key) + out[str(maj)] = _format_z_label(top) + return out + + +def _require_ok(response: requests.Response, what: str) -> None: + """Raise ToolError unless *response* is HTTP 200.""" + if response.status_code != 200: + raise ToolError(f"Product Pages API error ({what}): expected HTTP 200, got {response.status_code}") + + +def _fetch_rhel_streams_snapshot_sync() -> dict[str, dict[str, str]]: + """Blocking implementation: HTTP via ``requests`` / GSSAPI.""" + timeout = _PRODUCT_PAGES_TIMEOUT + try: + with requests.Session() as s: + s.verify = _product_pages_verify() + auth = requests_gssapi.HTTPSPNEGOAuth(mutual_authentication=requests_gssapi.OPTIONAL) + auth_resp = s.post(_OIDC_AUTHENTICATE_URL, auth=auth, timeout=timeout) + _require_ok(auth_resp, "OIDC authenticate") + + # Multiple active releases per major: lower stream is finishing; higher is main y-stream. + response_active = s.get( + _RELEASES_API_URL, + params={ + "fields": "shortname", + "active": "", + "product__shortname": "rhel", + }, + timeout=timeout, + ) + _require_ok(response_active, "active releases") + active_data = response_active.json() + + current_y_streams = _build_current_y_streams(active_data) + upcoming_z_streams = _build_upcoming_z_streams(active_data) + + response_zstream = s.get( + _RELEASES_API_URL, + params={ + "fields": "shortname,name_incl_maint,name", + "product__shortname": "rhel", + }, + timeout=timeout, + ) + _require_ok(response_zstream, "releases for z-stream filtering") + z_data = response_zstream.json() + + fields = [ + "shortname", + "name_incl_maint", + "name", + ] + filtered = [ + {k: item[k] for k in fields} + for item in z_data + if _GA_ZSTREAM_RE.search(item.get("name_incl_maint") or "") + ] + + current_z_streams = _build_current_z_streams_ga_zstream(filtered) + + return { + "current_y_streams": current_y_streams, + "current_z_streams": current_z_streams, + "upcoming_z_streams": upcoming_z_streams, + } + except requests.Timeout as e: + raise ToolError( + f"Product Pages API request timed out (connect {timeout[0]}s, read {timeout[1]}s)" + ) from e + except requests.RequestException as e: + msg = f"Product Pages API network error: {e}" + err_chain = f"{e!s} {e.__cause__!s}" if e.__cause__ else str(e) + err_lower = err_chain.lower() + if "certificate" in err_lower or "ssl" in err_lower: + msg += ( + " If this is a corporate TLS trust issue, set REDHAT_IT_CA_BUNDLE or " + "REQUESTS_CA_BUNDLE to a CA bundle path (e.g. /etc/pki/tls/certs/ca-bundle.crt)." + ) + raise ToolError(msg) from e + except json.JSONDecodeError as e: + raise ToolError("Product Pages API returned a response body that is not valid JSON") from e + except ValueError as e: + raise ToolError(f"Product Pages API response could not be processed: {e}") from e + + +async def fetch_rhel_streams_snapshot() -> dict[str, dict[str, str]]: + """ + Query Product Pages and return y-stream and z-stream snapshot maps. + + Uses GSSAPI session authentication, then loads active releases and + GA/ZStream-filtered releases to compute stream labels. + + Requires a valid Kerberos ticket in the environment; this module does not + initialize Kerberos itself. + + Returns: + Dict with keys ``current_y_streams``, ``current_z_streams``, and + ``upcoming_z_streams``; each value maps major version strings to + shortname labels. + + Raises: + ToolError: On non-success HTTP responses, timeouts, transport errors + (``requests.RequestException``), invalid JSON, or unexpected response + shape (``ValueError``). + """ + return await asyncio.to_thread(_fetch_rhel_streams_snapshot_sync) diff --git a/ymir/common/pyproject.toml b/ymir/common/pyproject.toml index 08733a7a..a67856ef 100644 --- a/ymir/common/pyproject.toml +++ b/ymir/common/pyproject.toml @@ -30,6 +30,7 @@ packages = [] "config.py" = "ymir/common/config.py" "constants.py" = "ymir/common/constants.py" "models.py" = "ymir/common/models.py" +"product_pages.py" = "ymir/common/product_pages.py" "utils.py" = "ymir/common/utils.py" "validators.py" = "ymir/common/validators.py" "version_utils.py" = "ymir/common/version_utils.py" diff --git a/ymir/common/requirements.txt b/ymir/common/requirements.txt index 98051d51..2536504d 100644 --- a/ymir/common/requirements.txt +++ b/ymir/common/requirements.txt @@ -1,3 +1,5 @@ # Dependencies specific to ymir-common GitPython>=3.1.0 redis>=6.4.0 +requests>=2.32.0 +requests-gssapi>=1.3.0 diff --git a/ymir/common/tests/unit/test_product_pages.py b/ymir/common/tests/unit/test_product_pages.py new file mode 100644 index 00000000..14413417 --- /dev/null +++ b/ymir/common/tests/unit/test_product_pages.py @@ -0,0 +1,182 @@ +""" +Unit tests for ``ymir.common.product_pages``. + +HTTP is simulated by replacing ``requests.Session`` with a small fake that returns +fixed status codes and JSON bodies — no network calls. +""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest +import requests.exceptions +from beeai_framework.tools import ToolError + +import ymir.common.product_pages as pp + + +class _JsonResponse: + __slots__ = ("_data", "status_code") + + def __init__(self, status_code: int, data: object | None = None) -> None: + self.status_code = status_code + self._data = data + + def json(self) -> object: + if self._data is None: + raise ValueError("no json payload") + return self._data + + +class _FakeSession: + """Minimal session stub: one POST (OIDC), then two GETs (active releases, z-stream list).""" + + def __init__( + self, + *, + post_response: _JsonResponse, + get_responses: list[_JsonResponse], + ) -> None: + self.verify: bool | str | None = None + self._post_response = post_response + self._get_responses = list(get_responses) + + def __enter__(self) -> _FakeSession: + return self + + def __exit__(self, *args: object) -> None: + return None + + def post(self, url: str, **kwargs: object) -> _JsonResponse: + return self._post_response + + def get(self, url: str, **kwargs: object) -> _JsonResponse: + return self._get_responses.pop(0) + + +@pytest.fixture(autouse=True) +def _clear_product_pages_verify_cache() -> None: + pp._product_pages_verify.cache_clear() + yield + pp._product_pages_verify.cache_clear() + + +def test_product_pages_verify_prefers_redhat_bundle(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("REDHAT_IT_CA_BUNDLE", "/ca/redhat.pem") + monkeypatch.setenv("REQUESTS_CA_BUNDLE", "/ca/requests.pem") + assert pp._product_pages_verify() == "/ca/redhat.pem" + + +def test_product_pages_verify_falls_back_to_requests_bundle(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("REDHAT_IT_CA_BUNDLE", raising=False) + monkeypatch.setenv("REQUESTS_CA_BUNDLE", "/ca/requests.pem") + assert pp._product_pages_verify() == "/ca/requests.pem" + + +def test_product_pages_verify_default_true(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("REDHAT_IT_CA_BUNDLE", raising=False) + monkeypatch.delenv("REQUESTS_CA_BUNDLE", raising=False) + assert pp._product_pages_verify() is True + + +def test_fetch_rhel_streams_snapshot_sync_happy_path(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("REDHAT_IT_CA_BUNDLE", raising=False) + monkeypatch.delenv("REQUESTS_CA_BUNDLE", raising=False) + + active = [ + {"shortname": "rhel-9.5"}, + {"shortname": "rhel-9.6"}, + ] + z_rows = [ + { + "shortname": "rhel-9.5", + "name_incl_maint": "RHEL 9.5 (GA/ZStream)", + "name": "RHEL 9.5", + }, + ] + fake = _FakeSession( + post_response=_JsonResponse(200), + get_responses=[ + _JsonResponse(200, active), + _JsonResponse(200, z_rows), + ], + ) + + with patch.object(pp.requests, "Session", return_value=fake): + result = pp._fetch_rhel_streams_snapshot_sync() + + assert fake.verify is True + assert result == { + "current_y_streams": {"9": "rhel-9.6"}, + "current_z_streams": {"9": "rhel-9.5.z"}, + "upcoming_z_streams": {"9": "rhel-9.5.z"}, + } + + +def test_fetch_rhel_streams_snapshot_sync_oidc_not_ok(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("REDHAT_IT_CA_BUNDLE", raising=False) + monkeypatch.delenv("REQUESTS_CA_BUNDLE", raising=False) + + fake = _FakeSession( + post_response=_JsonResponse(401), + get_responses=[], + ) + + with ( + patch.object(pp.requests, "Session", return_value=fake), + pytest.raises( + ToolError, + match="OIDC authenticate", + ), + ): + pp._fetch_rhel_streams_snapshot_sync() + + +def test_fetch_rhel_streams_snapshot_sync_ssl_error_includes_ca_hint( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("REDHAT_IT_CA_BUNDLE", raising=False) + monkeypatch.delenv("REQUESTS_CA_BUNDLE", raising=False) + + class _SslSession(_FakeSession): + def post(self, url: str, **kwargs: object) -> _JsonResponse: + raise requests.exceptions.SSLError("certificate verify failed") + + fake = _SslSession( + post_response=_JsonResponse(200), + get_responses=[], + ) + + with ( + patch.object(pp.requests, "Session", return_value=fake), + pytest.raises( + ToolError, + match="REDHAT_IT_CA_BUNDLE", + ), + ): + pp._fetch_rhel_streams_snapshot_sync() + + +@pytest.mark.asyncio +async def test_fetch_rhel_streams_snapshot_end_to_end(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("REDHAT_IT_CA_BUNDLE", raising=False) + monkeypatch.delenv("REQUESTS_CA_BUNDLE", raising=False) + + active = [{"shortname": "rhel-10.0"}] + z_rows: list[dict] = [] + + fake = _FakeSession( + post_response=_JsonResponse(200), + get_responses=[ + _JsonResponse(200, active), + _JsonResponse(200, z_rows), + ], + ) + + with patch.object(pp.requests, "Session", return_value=fake): + result = await pp.fetch_rhel_streams_snapshot() + + assert result["current_y_streams"] == {"10": "rhel-10.0"} + assert result["current_z_streams"] == {} + assert result["upcoming_z_streams"] == {}