diff --git a/packit_service/celery_config.py b/packit_service/celery_config.py index b17855c41..2e819fdb0 100644 --- a/packit_service/celery_config.py +++ b/packit_service/celery_config.py @@ -54,6 +54,11 @@ "schedule": 10800.0, "options": {"queue": "long-running", "time_limit": 3600}, }, + "push-ogr-namespace-metrics": { + "task": "packit_service.worker.tasks.push_ogr_namespace_metrics", + "schedule": 300.0, + "options": {"queue": "short-running"}, + }, } # http://mher.github.io/flower/prometheus-integration.html#set-up-your-celery-application diff --git a/packit_service/worker/monitoring.py b/packit_service/worker/monitoring.py index 7beb8a8f9..5d46435f4 100644 --- a/packit_service/worker/monitoring.py +++ b/packit_service/worker/monitoring.py @@ -4,7 +4,7 @@ import logging import os -from prometheus_client import CollectorRegistry, Counter, Histogram, push_to_gateway +from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, push_to_gateway logger = logging.getLogger(__name__) @@ -265,6 +265,13 @@ def __init__(self): buckets=(5, 15, 20, 25, 30, 40, 60, float("inf")), ) + self.ogr_namespace_requests = Gauge( + "ogr_namespace_requests_total", + "Total number of ogr API requests per instance URL and namespace in the last 5 minutes", + ["instance_url", "namespace"], + registry=self.registry, + ) + def push(self): if not (self.pushgateway_address and self.worker_name): logger.debug("Pushgateway address or worker name not defined.") diff --git a/packit_service/worker/tasks.py b/packit_service/worker/tasks.py index 83baf3c3f..66f25076a 100644 --- a/packit_service/worker/tasks.py +++ b/packit_service/worker/tasks.py @@ -13,6 +13,7 @@ from copr.v3 import CoprException from ogr import __version__ as ogr_version from ogr.exceptions import OgrException +from ogr.metrics import get_metrics_tracker from packit import __version__ as packit_version from packit.exceptions import PackitException from sqlalchemy import __version__ as sqlal_version @@ -101,6 +102,7 @@ update_vm_image_build, ) from packit_service.worker.jobs import SteveJobs +from packit_service.worker.monitoring import Pushgateway from packit_service.worker.result import TaskResults logger = logging.getLogger(__name__) @@ -970,3 +972,37 @@ def get_usage_statistics() -> None: logger.debug(f"Getting usage data from datetime_from {day}.") get_usage_data(datetime_from=day) logger.debug("Got usage data.") + + +@celery_app.task +def push_ogr_namespace_metrics() -> None: + """ + Collect ogr namespace request metrics and push them to pushgateway. + + This task queries the ogr metrics tracker for request counts per instance URL + and namespace, then updates the Prometheus metrics and pushes them. + After pushing, the counters are reset for the next collection period. + """ + logger.debug("Collecting ogr namespace request metrics.") + + try: + metrics_tracker = get_metrics_tracker() + counts = metrics_tracker.get_all_counts() + + pushgateway = Pushgateway() + + for (instance_url, namespace), count in counts.items(): + pushgateway.ogr_namespace_requests.labels( + instance_url=instance_url, + namespace=namespace, + ).set(count) + + pushgateway.push() + + metrics_tracker.reset() + + logger.info( + f"Pushed ogr namespace metrics: {len(counts)} instance/namespace combinations", + ) + except Exception as e: + logger.error(f"Failed to push ogr namespace metrics: {e}", exc_info=True) diff --git a/tests/unit/test_tasks.py b/tests/unit/test_tasks.py index ac5ff6717..f506f97c3 100644 --- a/tests/unit/test_tasks.py +++ b/tests/unit/test_tasks.py @@ -8,7 +8,7 @@ from packit.exceptions import PackitException from packit_service.worker.handlers import CoprBuildHandler -from packit_service.worker.tasks import run_copr_build_handler +from packit_service.worker.tasks import push_ogr_namespace_metrics, run_copr_build_handler def test_autoretry(): @@ -21,3 +21,47 @@ def test_autoretry(): flexmock(Task).should_receive("retry").and_raise(PackitException).once() with pytest.raises(PackitException): run_copr_build_handler({}, {}, {}) + + +def test_push_metrics_handles_exception(): + """Test that exceptions are handled gracefully.""" + from packit_service.worker import tasks + + mock_tracker = flexmock() + mock_tracker.should_receive("get_all_counts").and_raise(Exception("Test error")).once() + + flexmock(tasks).should_receive("get_metrics_tracker").and_return(mock_tracker).once() + + push_ogr_namespace_metrics() + + +def test_push_metrics_resets_after_push(): + """Test that reset is called after pushing metrics.""" + from packit_service.worker import tasks + + mock_tracker = flexmock() + mock_tracker.should_receive("get_all_counts").and_return( + {("https://github.com", "packit"): 1} + ).once() + # Verify reset is called after push + mock_tracker.should_receive("reset").once() + + flexmock(tasks).should_receive("get_metrics_tracker").and_return(mock_tracker).once() + + # Mock Pushgateway + mock_gauge = flexmock() + mock_gauge.should_receive("set").with_args(1).once() + + mock_pushgateway = flexmock() + mock_pushgateway.ogr_namespace_requests = flexmock() + mock_pushgateway.ogr_namespace_requests.should_receive("labels").with_args( + instance_url="https://github.com", namespace="packit" + ).and_return(mock_gauge).once() + mock_pushgateway.should_receive("push").once() + + from packit_service.worker.monitoring import Pushgateway + + flexmock(Pushgateway).should_receive("__init__").and_return(None) + flexmock(Pushgateway).new_instances(mock_pushgateway) + + push_ogr_namespace_metrics()