Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packit_service/celery_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
"schedule": 10800.0,
"options": {"queue": "long-running", "time_limit": 3600},
},
"push-ogr-namespace-metrics": {
"task": "packit_service.worker.tasks.push_ogr_namespace_metrics",
"schedule": 300.0,
"options": {"queue": "short-running"},
},
}

# http://mher.github.io/flower/prometheus-integration.html#set-up-your-celery-application
Expand Down
9 changes: 8 additions & 1 deletion packit_service/worker/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import os

from prometheus_client import CollectorRegistry, Counter, Histogram, push_to_gateway
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, push_to_gateway

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -265,6 +265,13 @@ def __init__(self):
buckets=(5, 15, 20, 25, 30, 40, 60, float("inf")),
)

self.ogr_namespace_requests = Gauge(
"ogr_namespace_requests_total",
"Total number of ogr API requests per instance URL and namespace in the last 5 minutes",
["instance_url", "namespace"],
registry=self.registry,
)
Comment thread
majamassarini marked this conversation as resolved.

def push(self):
if not (self.pushgateway_address and self.worker_name):
logger.debug("Pushgateway address or worker name not defined.")
Expand Down
36 changes: 36 additions & 0 deletions packit_service/worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from copr.v3 import CoprException
from ogr import __version__ as ogr_version
from ogr.exceptions import OgrException
from ogr.metrics import get_metrics_tracker
from packit import __version__ as packit_version
from packit.exceptions import PackitException
from sqlalchemy import __version__ as sqlal_version
Expand Down Expand Up @@ -101,6 +102,7 @@
update_vm_image_build,
)
from packit_service.worker.jobs import SteveJobs
from packit_service.worker.monitoring import Pushgateway
from packit_service.worker.result import TaskResults

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -970,3 +972,37 @@ def get_usage_statistics() -> None:
logger.debug(f"Getting usage data from datetime_from {day}.")
get_usage_data(datetime_from=day)
logger.debug("Got usage data.")


@celery_app.task
def push_ogr_namespace_metrics() -> None:
"""
Collect ogr namespace request metrics and push them to pushgateway.

This task queries the ogr metrics tracker for request counts per instance URL
and namespace, then updates the Prometheus metrics and pushes them.
After pushing, the counters are reset for the next collection period.
"""
logger.debug("Collecting ogr namespace request metrics.")

try:
metrics_tracker = get_metrics_tracker()
counts = metrics_tracker.get_all_counts()

pushgateway = Pushgateway()

for (instance_url, namespace), count in counts.items():
pushgateway.ogr_namespace_requests.labels(
instance_url=instance_url,
namespace=namespace,
).set(count)

pushgateway.push()

metrics_tracker.reset()

logger.info(
f"Pushed ogr namespace metrics: {len(counts)} instance/namespace combinations",
)
except Exception as e:
logger.error(f"Failed to push ogr namespace metrics: {e}", exc_info=True)
Comment thread
majamassarini marked this conversation as resolved.
46 changes: 45 additions & 1 deletion tests/unit/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from packit.exceptions import PackitException

from packit_service.worker.handlers import CoprBuildHandler
from packit_service.worker.tasks import run_copr_build_handler
from packit_service.worker.tasks import push_ogr_namespace_metrics, run_copr_build_handler


def test_autoretry():
Expand All @@ -21,3 +21,47 @@ def test_autoretry():
flexmock(Task).should_receive("retry").and_raise(PackitException).once()
with pytest.raises(PackitException):
run_copr_build_handler({}, {}, {})


def test_push_metrics_handles_exception():
"""Test that exceptions are handled gracefully."""
from packit_service.worker import tasks

mock_tracker = flexmock()
mock_tracker.should_receive("get_all_counts").and_raise(Exception("Test error")).once()

flexmock(tasks).should_receive("get_metrics_tracker").and_return(mock_tracker).once()

push_ogr_namespace_metrics()


def test_push_metrics_resets_after_push():
"""Test that reset is called after pushing metrics."""
from packit_service.worker import tasks

mock_tracker = flexmock()
mock_tracker.should_receive("get_all_counts").and_return(
{("https://github.com", "packit"): 1}
).once()
# Verify reset is called after push
mock_tracker.should_receive("reset").once()

flexmock(tasks).should_receive("get_metrics_tracker").and_return(mock_tracker).once()

# Mock Pushgateway
mock_gauge = flexmock()
mock_gauge.should_receive("set").with_args(1).once()

mock_pushgateway = flexmock()
mock_pushgateway.ogr_namespace_requests = flexmock()
mock_pushgateway.ogr_namespace_requests.should_receive("labels").with_args(
instance_url="https://github.com", namespace="packit"
).and_return(mock_gauge).once()
mock_pushgateway.should_receive("push").once()

from packit_service.worker.monitoring import Pushgateway

flexmock(Pushgateway).should_receive("__init__").and_return(None)
flexmock(Pushgateway).new_instances(mock_pushgateway)

push_ogr_namespace_metrics()
Loading