Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/pyinfra/facts/util/packages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
Unified package data model for all package managers.

Provides a common :class:`PackageInfo` dataclass and :class:`PackageStatus`
enum so that package facts can return rich, structured data instead of plain
``dict[str, set[str]]``.
"""

from dataclasses import dataclass
from enum import Enum


class PackageStatus(Enum):
"""Status of an installed package."""

INSTALLED = "installed"
UPGRADEABLE = "upgradeable"
HELD = "held"


@dataclass(frozen=True)
class PackageInfo:
"""Unified package information returned by enriched package facts."""

name: str
installed_version: str
available_version: str | None = None
status: PackageStatus = PackageStatus.INSTALLED


def build_package_map(
installed: dict[str, set[str]],
upgradeable: dict[str, str] | None = None,
held: set[str] | None = None,
) -> dict[str, PackageInfo]:
"""Build a :class:`PackageInfo` map by combining sub-fact data.

+ installed: installed packages from a fact (name to set of versions).
+ upgradeable: packages with available upgrades (name to available version).
+ held: names of held/locked/pinned packages.
"""

result: dict[str, PackageInfo] = {}
_upgradeable = upgradeable or {}
_held = held or set()

for name, versions in installed.items():
# Sort so a multi-version set produces the same PackageInfo.installed_version
# across runs (sets have hash-randomized iteration order for strings).
version = next(iter(sorted(versions)), "")

if name in _held:
status = PackageStatus.HELD
elif name in _upgradeable:
status = PackageStatus.UPGRADEABLE
else:
status = PackageStatus.INSTALLED

result[name] = PackageInfo(
name=name,
installed_version=version,
available_version=_upgradeable.get(name),
status=status,
)

return result
92 changes: 78 additions & 14 deletions src/pyinfra/operations/util/packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pyinfra.api.command import QuoteString, StringCommand
from pyinfra.facts.files import File
from pyinfra.facts.rpm import RpmPackage
from pyinfra.facts.util.packages import PackageInfo, PackageStatus
from pyinfra.operations import files


Expand Down Expand Up @@ -100,16 +101,21 @@ def from_pep508(cls, s: str) -> PkgInfo | None:

def _has_package(
package: str | list[str],
packages: dict[str, set[str]],
packages: dict[str, set[str]] | dict[str, PackageInfo],
expand_package_fact: Callable[[str], list[str | list[str]]] | None = None,
match_any=False,
) -> tuple[bool, dict]:
def in_packages(pkg_name, pkg_versions):
if pkg_name not in packages:
return False
value = packages[pkg_name]
if isinstance(value, PackageInfo):
if not pkg_versions:
return True
return any(version == value.installed_version for version in pkg_versions)
if not pkg_versions:
return pkg_name in packages
return pkg_name in packages and any(
version in packages[pkg_name] for version in pkg_versions
)
return True
return any(version in value for version in pkg_versions)

packages_to_check: list[str | list[str]] = [package]
if expand_package_fact:
Expand All @@ -135,10 +141,40 @@ def in_packages(pkg_name, pkg_versions):
return all(checks), package_name_to_versions


def _get_package_status(
current_packages: dict[str, set[str]] | dict[str, PackageInfo],
pkg_name: str,
) -> PackageStatus | None:
"""Return the PackageStatus for ``pkg_name``, or ``None`` for old-format dicts."""
if pkg_name not in current_packages:
return None
value = current_packages[pkg_name]
if isinstance(value, PackageInfo):
return value.status
return None


def _format_version(
current_packages: dict[str, set[str]] | dict[str, PackageInfo],
pkg_name: str,
) -> str:
"""Return a human-readable version string for noop messages.

For the legacy ``set[str]`` shape, multiple versions are sorted so the
noop output is deterministic across runs.
"""
if pkg_name not in current_packages:
return ""
value = current_packages[pkg_name]
if isinstance(value, PackageInfo):
return value.installed_version
return ",".join(sorted(value))


def ensure_packages(
host: Host,
packages_to_ensure: str | list[str] | list[PkgInfo] | None,
current_packages: dict[str, set[str]],
current_packages: dict[str, set[str]] | dict[str, PackageInfo],
present: bool,
install_command: str | StringCommand,
uninstall_command: str | StringCommand,
Expand All @@ -151,14 +187,24 @@ def ensure_packages(
Handles this common scenario:

+ We have a list of packages(/versions/urls) to ensure
+ We have a map of existing package -> versions
+ We have a map of existing package -> versions (old) or PackageInfo (new)
+ We have the common command bits (install, uninstall, version "joiner")
+ Outputs commands to ensure our desired packages/versions
+ Optionally upgrades packages w/o specified version when present

When ``current_packages`` values are :class:`PackageInfo` objects, the richer
status information is used:

* **HELD** packages always produce a noop, even when ``latest=True``.
* **UPGRADEABLE** packages are upgraded when ``latest=True``.
* **INSTALLED** packages with no available upgrade produce a noop.

With the legacy ``dict[str, set[str]]`` format, behaviour is unchanged:
``latest=True`` blindly adds every versionless package to the upgrade list.

Args:
packages_to_ensure (list): list of packages or package/versions or PkgInfo's
current_packages (dict): dict of package names -> version
current_packages (dict): dict of package names -> version, or name -> PackageInfo
present (bool): whether packages should exist or not
install_command (str): command to prefix to list of packages to install
uninstall_command (str): as above for uninstalling packages
Expand Down Expand Up @@ -203,15 +249,33 @@ def ensure_packages(
diff_packages.append(package.inst_vers)
diff_expanded_packages[package.name] = expanded_packages
else:
# Present packages w/o version specified - for upgrade if latest
if not package.has_version: # don't try to upgrade if a specific version requested
upgrade_packages.append(package.inst_vers)
pkg_name = package.name
status = _get_package_status(current_packages, pkg_name)

if status == PackageStatus.HELD:
host.noop(f"package {pkg_name} is held")
continue

# Present packages w/o version specified: candidate for upgrade
if not package.has_version:
if status == PackageStatus.UPGRADEABLE:
upgrade_packages.append(package.inst_vers)
elif latest and status is None:
# Old format: try all (backward compat)
upgrade_packages.append(package.inst_vers)

if not latest:
if (pkg := package.name) in current_packages:
host.noop(f"package {pkg} is installed ({','.join(current_packages[pkg])})")
version_display = _format_version(current_packages, pkg_name)
if version_display:
host.noop(f"package {pkg_name} is installed ({version_display})")
else:
host.noop(f"package {pkg_name} is installed")
elif status == PackageStatus.INSTALLED:
version_display = _format_version(current_packages, pkg_name)
if version_display:
host.noop(f"package {pkg_name} is up to date ({version_display})")
else:
host.noop(f"package {package.name} is installed")
host.noop(f"package {pkg_name} is up to date")
if present is False:
for package in packages:
has_package, expanded_packages = _has_package(
Expand Down
63 changes: 63 additions & 0 deletions tests/test_facts_packages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from unittest import TestCase

from pyinfra.facts.util.packages import PackageInfo, PackageStatus, build_package_map


class TestPackageInfo(TestCase):
def test_defaults_to_installed_status(self):
info = PackageInfo(name="vim", installed_version="9.0")
assert info.status == PackageStatus.INSTALLED
assert info.available_version is None

def test_is_frozen(self):
info = PackageInfo(name="vim", installed_version="9.0")
with self.assertRaises(Exception):
info.installed_version = "9.1" # type: ignore[misc]


class TestBuildPackageMap(TestCase):
def test_only_installed(self):
result = build_package_map({"vim": {"9.0"}, "git": {"2.40"}})
assert set(result.keys()) == {"vim", "git"}
assert result["vim"] == PackageInfo(
name="vim", installed_version="9.0", status=PackageStatus.INSTALLED
)
assert result["git"] == PackageInfo(
name="git", installed_version="2.40", status=PackageStatus.INSTALLED
)

def test_marks_upgradeable(self):
result = build_package_map(
installed={"vim": {"9.0"}, "git": {"2.40"}},
upgradeable={"vim": "9.1"},
)
assert result["vim"].status == PackageStatus.UPGRADEABLE
assert result["vim"].available_version == "9.1"
assert result["git"].status == PackageStatus.INSTALLED
assert result["git"].available_version is None

def test_marks_held(self):
result = build_package_map(
installed={"vim": {"9.0"}, "git": {"2.40"}},
held={"vim"},
)
assert result["vim"].status == PackageStatus.HELD

def test_held_takes_precedence_over_upgradeable(self):
result = build_package_map(
installed={"vim": {"9.0"}},
upgradeable={"vim": "9.1"},
held={"vim"},
)
assert result["vim"].status == PackageStatus.HELD
assert result["vim"].available_version == "9.1"

def test_handles_empty_versions(self):
result = build_package_map({"foo": set()})
assert result["foo"].installed_version == ""

def test_multiple_versions_picks_lexicographic_min(self):
# Sets have hash-randomized iteration order; build_package_map sorts
# so the chosen installed_version is deterministic across runs.
result = build_package_map({"linux-image": {"6.1.0-13", "6.1.0-12", "5.10.0-26"}})
assert result["linux-image"].installed_version == "5.10.0-26"
Loading
Loading