Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 286 additions & 0 deletions ymir/common/product_pages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
"""
Product Pages helpers for RHEL y-stream and z-stream labels.

This module queries the internal Product Pages API (HTTP SPNEGO via
``requests-gssapi``) and derives current y-streams, current z-streams, and
upcoming z-streams from active releases and GA/ZStream release metadata.

Callers must ensure a valid Kerberos ticket is available before invoking
``fetch_rhel_streams_snapshot``; this module does not initialize Kerberos.

Public API: ``await fetch_rhel_streams_snapshot()`` (async coroutine). Blocking
HTTP (``requests``) runs in a thread pool so the event loop is not blocked.
Everything else in this module is an implementation detail.
"""

import asyncio
import json
import os
import re
from collections import defaultdict
from functools import cache

import requests
import requests_gssapi
from beeai_framework.tools import ToolError
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing from BeeAI should be used here, including ToolError.


_PLAIN_SHORTNAME_RE = re.compile(r"^rhel-(\d+)\.(\d+)$")
_GA_ZSTREAM_RE = re.compile(r"\(GA\/ZStream\)")

_OIDC_AUTHENTICATE_URL = "https://pp.engineering.redhat.com/oidc/authenticate"
_RELEASES_API_URL = "https://pp.engineering.redhat.com/api/v7/releases/"

# ``requests`` accepts ``(connect, read)`` in seconds. OIDC/GSSAPI can be slow to
# establish; the releases listing can return a large JSON payload.
_PRODUCT_PAGES_TIMEOUT = (30.0, 120.0)


@cache
def _product_pages_verify() -> bool | str:
"""TLS ``verify`` argument for ``requests``: corporate CA bundle if configured.

Matches ``ymir.supervisor.errata_utils.ET_verify`` (``REDHAT_IT_CA_BUNDLE``)
and OpenShift-style ``REQUESTS_CA_BUNDLE`` when set.
"""
for key in ("REDHAT_IT_CA_BUNDLE", "REQUESTS_CA_BUNDLE"):
path = os.getenv(key)
if path:
return path
return True


def _rhel_sort_key(shortname: str) -> tuple[int, ...]:
"""Sort key for RHEL shortnames by numeric major.minor (not lexicographic).

Example: rhel-10.3 sorts after rhel-9.9.

Returns:
Tuple of ints for lexical comparison ordering (major, minor, ...).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be:

Suggested change
Tuple of ints for lexical comparison ordering (major, minor, ...).
Tuple of ints for numeric comparison ordering (major, minor, ...).

"""
body = shortname.removeprefix("rhel-").removesuffix(".z")
parts = body.split(".")
return tuple(int(p) for p in parts)


def _parse_plain_rhel_minor(shortname: str) -> tuple[int, int] | None:
"""
Parse rhel-M.m shortname (optional .z stripped).

Args:
shortname: Release shortname such as ``rhel-9.6`` or ``rhel-9.6.z``.

Returns:
``(major, minor)`` or None if the pattern does not match.
"""
base = shortname.removesuffix(".z")
m = _PLAIN_SHORTNAME_RE.match(base)
if not m:
return None
return int(m.group(1)), int(m.group(2))


def _format_z_label(shortname_or_stem: str) -> str:
"""
Display form for z-stream maps (e.g. ``rhel-9.6`` -> ``rhel-9.6.z``).

Args:
shortname_or_stem: Shortname or stem; ``.z`` is appended when missing.

Returns:
Canonical z-stream label string.
"""
s = shortname_or_stem.strip()
if s.endswith(".z"):
return s
return f"{s}.z"


def _build_current_y_streams(active_releases: list[dict]) -> dict[str, str]:
"""
Best current y-stream shortname per RHEL major.

Args:
active_releases: Active release records (must include ``shortname``).

Returns:
Mapping major version string -> highest ``rhel-M.m`` shortname among
active plain y-style names.
"""
best: dict[int, tuple[tuple[int, ...], str]] = {}
for item in active_releases:
sn = item.get("shortname") or ""
parsed = _parse_plain_rhel_minor(sn)
if not parsed:
continue
maj, _ = parsed
key = _rhel_sort_key(sn)
prev = best.get(maj)
if prev is None or key > prev[0]:
best[maj] = (key, sn)
return {str(m): sn for m, (_, sn) in sorted(best.items())}


def _build_upcoming_z_streams(active_releases: list[dict]) -> dict[str, str]:
"""
Upcoming z-stream label per major when multiple active streams exist.

If a major has more than one active release stream, the lower version is
treated as the upcoming z-stream; otherwise that major is omitted.

Args:
active_releases: Active release records (must include ``shortname``).

Returns:
Mapping major version string -> upcoming z-stream label (with ``.z``).
"""
by_major: defaultdict[int, list[str]] = defaultdict(list)
for item in active_releases:
sn = item.get("shortname") or ""
parsed = _parse_plain_rhel_minor(sn)
if not parsed:
continue
maj, _ = parsed
by_major[maj].append(sn)

out: dict[str, str] = {}
for maj in sorted(by_major):
sns = by_major[maj]
if len(sns) <= 1:
continue
lower = min(sns, key=_rhel_sort_key)
out[str(maj)] = _format_z_label(lower)
return out


def _build_current_z_streams_ga_zstream(ga_zstream_rows: list[dict]) -> dict[str, str]:
"""
Current z-stream labels from GA/ZStream maintenance releases.

Rows should be releases whose ``name_incl_maint`` matches (GA/ZStream).
If several exist per major, the highest version is used.

Args:
ga_zstream_rows: Filtered release dicts with ``shortname`` set.

Returns:
Mapping major version string -> current z-stream label (with ``.z``).
"""
by_major: defaultdict[int, list[str]] = defaultdict(list)
for item in ga_zstream_rows:
sn = item.get("shortname") or ""
parsed = _parse_plain_rhel_minor(sn)
if not parsed:
continue
maj, _ = parsed
by_major[maj].append(sn)

out: dict[str, str] = {}
for maj in sorted(by_major):
sns = by_major[maj]
top = max(sns, key=_rhel_sort_key)
out[str(maj)] = _format_z_label(top)
return out


def _require_ok(response: requests.Response, what: str) -> None:
"""Raise ToolError unless *response* is HTTP 200."""
if response.status_code != 200:
raise ToolError(f"Product Pages API error ({what}): expected HTTP 200, got {response.status_code}")


def _fetch_rhel_streams_snapshot_sync() -> dict[str, dict[str, str]]:
"""Blocking implementation: HTTP via ``requests`` / GSSAPI."""
timeout = _PRODUCT_PAGES_TIMEOUT
try:
with requests.Session() as s:
s.verify = _product_pages_verify()
auth = requests_gssapi.HTTPSPNEGOAuth(mutual_authentication=requests_gssapi.OPTIONAL)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to make sure this works okay with our kerberos setup

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work with an active and valid krb ticket in your system but I realize that packit might have a different way to get a ticket right?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at init_kerberos_ticket function. That is the one used to obtain kerberos ticket or verify it already exists.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushing another round of changes. I used the init_kereberos_ticket to get a ticket. I hope I am using that right.

I also added a unit test for the product pages..

auth_resp = s.post(_OIDC_AUTHENTICATE_URL, auth=auth, timeout=timeout)
_require_ok(auth_resp, "OIDC authenticate")

# Multiple active releases per major: lower stream is finishing; higher is main y-stream.
response_active = s.get(
_RELEASES_API_URL,
params={
"fields": "shortname",
"active": "",
"product__shortname": "rhel",
},
timeout=timeout,
)
_require_ok(response_active, "active releases")
active_data = response_active.json()

current_y_streams = _build_current_y_streams(active_data)
upcoming_z_streams = _build_upcoming_z_streams(active_data)

response_zstream = s.get(
_RELEASES_API_URL,
params={
"fields": "shortname,name_incl_maint,name",
"product__shortname": "rhel",
},
timeout=timeout,
)
_require_ok(response_zstream, "releases for z-stream filtering")
z_data = response_zstream.json()

fields = [
"shortname",
"name_incl_maint",
"name",
]
filtered = [
{k: item[k] for k in fields}
for item in z_data
if _GA_ZSTREAM_RE.search(item.get("name_incl_maint") or "")
]

current_z_streams = _build_current_z_streams_ga_zstream(filtered)

return {
"current_y_streams": current_y_streams,
"current_z_streams": current_z_streams,
"upcoming_z_streams": upcoming_z_streams,
}
except requests.Timeout as e:
raise ToolError(
f"Product Pages API request timed out (connect {timeout[0]}s, read {timeout[1]}s)"
) from e
except requests.RequestException as e:
msg = f"Product Pages API network error: {e}"
err_chain = f"{e!s} {e.__cause__!s}" if e.__cause__ else str(e)
err_lower = err_chain.lower()
if "certificate" in err_lower or "ssl" in err_lower:
msg += (
" If this is a corporate TLS trust issue, set REDHAT_IT_CA_BUNDLE or "
"REQUESTS_CA_BUNDLE to a CA bundle path (e.g. /etc/pki/tls/certs/ca-bundle.crt)."
)
raise ToolError(msg) from e
except json.JSONDecodeError as e:
raise ToolError("Product Pages API returned a response body that is not valid JSON") from e
except ValueError as e:
raise ToolError(f"Product Pages API response could not be processed: {e}") from e


async def fetch_rhel_streams_snapshot() -> dict[str, dict[str, str]]:
"""
Query Product Pages and return y-stream and z-stream snapshot maps.

Uses GSSAPI session authentication, then loads active releases and
GA/ZStream-filtered releases to compute stream labels.

Requires a valid Kerberos ticket in the environment; this module does not
initialize Kerberos itself.

Returns:
Dict with keys ``current_y_streams``, ``current_z_streams``, and
``upcoming_z_streams``; each value maps major version strings to
shortname labels.

Raises:
ToolError: On non-success HTTP responses, timeouts, transport errors
(``requests.RequestException``), invalid JSON, or unexpected response
shape (``ValueError``).
"""
return await asyncio.to_thread(_fetch_rhel_streams_snapshot_sync)
1 change: 1 addition & 0 deletions ymir/common/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ packages = []
"config.py" = "ymir/common/config.py"
"constants.py" = "ymir/common/constants.py"
"models.py" = "ymir/common/models.py"
"product_pages.py" = "ymir/common/product_pages.py"
"utils.py" = "ymir/common/utils.py"
"validators.py" = "ymir/common/validators.py"
"version_utils.py" = "ymir/common/version_utils.py"
Expand Down
2 changes: 2 additions & 0 deletions ymir/common/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Dependencies specific to ymir-common
GitPython>=3.1.0
redis>=6.4.0
requests>=2.32.0
requests-gssapi>=1.3.0
Loading
Loading