diff --git a/cmd/server/server.go b/cmd/server/server.go index 8f5d75a249..7bc994c01d 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -203,7 +203,7 @@ func StartConvoyServer(a *cli.App) error { s.RegisterTask("0 1 * * *", convoy.ScheduleQueue, convoy.RetentionPolicies) } - err = metrics.RegisterQueueMetrics(a.Queue, a.DB, nil) + err = metrics.RegisterQueueMetrics(a.Queue, a.DB, a.Redis, nil) if err != nil { return fmt.Errorf("failed to register queue metrics: %w", err) } diff --git a/cmd/worker/worker.go b/cmd/worker/worker.go index 728ba925bb..7133f3c833 100644 --- a/cmd/worker/worker.go +++ b/cmd/worker/worker.go @@ -446,7 +446,7 @@ func NewWorker(ctx context.Context, a *cli.App, cfg config.Configuration) (*Work consumer.RegisterHandlers(convoy.UpdateOrganisationStatus, task.UpdateOrganisationStatus(a.DB, billingClient, rd, lo), nil) } - err = metrics.RegisterQueueMetrics(a.Queue, a.DB, circuitBreakerManager) + err = metrics.RegisterQueueMetrics(a.Queue, a.DB, a.Redis, circuitBreakerManager) if err != nil { return nil, fmt.Errorf("failed to register queue metrics: %w", err) } diff --git a/database/postgres/postgres_collector.go b/database/postgres/postgres_collector.go index b3ee32f47c..b9ea3ffdac 100644 --- a/database/postgres/postgres_collector.go +++ b/database/postgres/postgres_collector.go @@ -2,11 +2,14 @@ package postgres import ( "context" + "encoding/json" "fmt" "strings" + "sync" "time" "github.com/prometheus/client_golang/prometheus" + "github.com/redis/go-redis/v9" "github.com/frain-dev/convoy/config" "github.com/frain-dev/convoy/pkg/log" @@ -14,12 +17,26 @@ import ( // Namespace used in fully-qualified metrics names. const namespace = "convoy" +const queueMetricsSnapshotKey = "convoy:metrics:queue:snapshot:data" +const queueMetricsSnapshotLockKey = "convoy:metrics:queue:snapshot:refresh:lock" -var lastRun = time.Now() +var metricsConfig *config.MetricsConfiguration -var cachedMetrics *Metrics // needed to feed the UI with data when sampling time has not yet elapsed +type queueMetricsSnapshotState struct { + client redis.UniversalClient + mu sync.Mutex + started bool +} -var metricsConfig *config.MetricsConfiguration +var ( + queueMetricsSnapshotStates sync.Map + queueMetricsSnapshotLock = redis.NewScript(` +if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) +end +return 0 +`) +) type EventQueueMetrics struct { ProjectID string `json:"project_id" db:"project_id"` @@ -113,6 +130,121 @@ func (p *Postgres) Describe(ch chan<- *prometheus.Desc) { prometheus.DescribeByCollect(p, ch) } +func (p *Postgres) ConfigureQueueMetricsSnapshot(rdb redis.UniversalClient, refreshInterval time.Duration) { + if rdb == nil { + return + } + + state := &queueMetricsSnapshotState{client: rdb} + actual, _ := queueMetricsSnapshotStates.LoadOrStore(p, state) + + // Keep the latest client reference if it changes between invocations. + stored := actual.(*queueMetricsSnapshotState) + stored.client = rdb + + if refreshInterval <= 0 { + refreshInterval = 30 * time.Second + } + + stored.mu.Lock() + if stored.started { + stored.mu.Unlock() + return + } + stored.started = true + stored.mu.Unlock() + + go p.refreshQueueMetricsSnapshotLoop(stored, refreshInterval) +} + +func (p *Postgres) refreshQueueMetricsSnapshotLoop(state *queueMetricsSnapshotState, refreshInterval time.Duration) { + // One fast refresh at startup before periodic refreshes. + p.refreshQueueMetricsSnapshot(state) + + ticker := time.NewTicker(refreshInterval) + defer ticker.Stop() + + for range ticker.C { + p.refreshQueueMetricsSnapshot(state) + } +} + +func (p *Postgres) refreshQueueMetricsSnapshot(state *queueMetricsSnapshotState) { + if state.client == nil { + return + } + + lockCtx, lockCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer lockCancel() + + lockToken := fmt.Sprintf("%d", time.Now().UnixNano()) + lockTTL := 5 * time.Minute + lockAcquired, err := state.client.SetNX(lockCtx, queueMetricsSnapshotLockKey, lockToken, lockTTL).Result() + if err != nil || !lockAcquired { + return + } + + defer func() { + unlockCtx, unlockCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer unlockCancel() + + _, unlockErr := queueMetricsSnapshotLock.Run(unlockCtx, state.client, []string{queueMetricsSnapshotLockKey}, lockToken).Result() + if unlockErr != nil && unlockErr != redis.Nil { + log.WithError(unlockErr).Warn("failed to release queue metrics snapshot refresh lock") + } + }() + + metrics, err := p.collectMetrics() + if err != nil { + log.WithError(err).Error("failed to refresh queue metrics snapshot") + return + } + + payload, err := json.Marshal(metrics) + if err != nil { + log.WithError(err).Error("failed to marshal queue metrics snapshot") + return + } + + writeCtx, writeCancel := context.WithTimeout(context.Background(), 3*time.Second) + defer writeCancel() + + if err = state.client.Set(writeCtx, queueMetricsSnapshotKey, payload, 0).Err(); err != nil { + log.WithError(err).Error("failed to write queue metrics snapshot") + } +} + +func (p *Postgres) loadMetricsSnapshot() *Metrics { + stateRaw, ok := queueMetricsSnapshotStates.Load(p) + if !ok { + return &Metrics{} + } + + state := stateRaw.(*queueMetricsSnapshotState) + if state.client == nil { + return &Metrics{} + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + payload, err := state.client.Get(ctx, queueMetricsSnapshotKey).Bytes() + if err != nil { + if err != redis.Nil { + log.WithError(err).Warn("failed to load queue metrics snapshot") + } + return &Metrics{} + } + + metrics := &Metrics{} + if err = json.Unmarshal(payload, metrics); err != nil { + log.WithError(err).Warn("failed to unmarshal queue metrics snapshot") + return &Metrics{} + } + + return metrics +} + func (p *Postgres) Collect(ch chan<- prometheus.Metric) { if metricsConfig == nil { cfg, err := config.Get() @@ -125,25 +257,7 @@ func (p *Postgres) Collect(ch chan<- prometheus.Metric) { return } - var metrics *Metrics - var err error - now := time.Now() - if cachedMetrics != nil && lastRun.Add(time.Duration(metricsConfig.Prometheus.SampleTime)*time.Second).After(now) { - metrics = cachedMetrics - } else { - metrics, err = p.collectMetrics() - if err != nil { - log.Errorf("Failed to collect metrics data: %v", err) - if cachedMetrics != nil { - metrics = cachedMetrics - log.Warn("Using cached metrics due to collection failure") - } else { - // Return empty metrics to prevent blocking the endpoint - metrics = &Metrics{} - } - } - cachedMetrics = metrics - } + metrics := p.loadMetricsSnapshot() // Use unique keys per metric type to prevent collisions metricsMap := make(map[string]struct{}) @@ -238,29 +352,18 @@ func (p *Postgres) Collect(ch chan<- prometheus.Metric) { metricsMap[key] = struct{}{} } - lastRun = now -} - -// materializedViewExists checks if a materialized view exists in the database -func (p *Postgres) materializedViewExists(ctx context.Context, viewName string) bool { - query := ` - SELECT EXISTS ( - SELECT 1 - FROM pg_matviews - WHERE schemaname = 'convoy' - AND matviewname = $1 - )` - var exists bool - err := p.GetDB().GetContext(ctx, &exists, query, viewName) - if err != nil { - log.Warnf("Failed to check if materialized view %s exists: %v", viewName, err) - return false - } - return exists } // collectMetrics gathers essential metrics from the DB func (p *Postgres) collectMetrics() (*Metrics, error) { + if metricsConfig == nil { + cfg, err := config.Get() + if err != nil { + return nil, err + } + metricsConfig = &cfg.Metrics + } + queryTimeout := time.Duration(metricsConfig.Prometheus.QueryTimeout) * time.Second if queryTimeout == 0 { queryTimeout = 30 * time.Second @@ -270,20 +373,13 @@ func (p *Postgres) collectMetrics() (*Metrics, error) { metrics := &Metrics{} - useMaterializedViews := p.materializedViewExists(ctx, "event_queue_metrics_mv") - - var queryEventQueueMetrics string - if useMaterializedViews { - queryEventQueueMetrics = "SELECT project_id, source_id, total FROM convoy.event_queue_metrics_mv" - } else { - queryEventQueueMetrics = ` - SELECT DISTINCT - project_id, - COALESCE(source_id, 'http') AS source_id, - COUNT(*) AS total - FROM convoy.events - GROUP BY project_id, source_id` - } + queryEventQueueMetrics := ` + SELECT DISTINCT + project_id, + COALESCE(source_id, 'http') AS source_id, + COUNT(*) AS total + FROM convoy.events + GROUP BY project_id, source_id` rows, err := p.GetDB().QueryxContext(ctx, queryEventQueueMetrics) if err != nil { return nil, fmt.Errorf("failed to query event queue metrics: %w", err) @@ -300,42 +396,37 @@ func (p *Postgres) collectMetrics() (*Metrics, error) { } metrics.EventQueueMetrics = eventQueueMetrics - var backlogQM string - if useMaterializedViews { - backlogQM = "SELECT project_id, source_id, age_seconds FROM convoy.event_queue_backlog_metrics_mv" - } else { - backlogQM = ` - WITH a1 AS ( - SELECT ed.project_id, - COALESCE(e.source_id, 'http') AS source_id, - EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds - FROM convoy.event_deliveries ed - LEFT JOIN convoy.events e ON e.id = ed.event_id - WHERE ed.status = 'Processing' - GROUP BY ed.project_id, e.source_id - ORDER BY age_seconds DESC, ed.project_id, e.source_id - LIMIT 1000 - ) - SELECT project_id, source_id, age_seconds - FROM ( - SELECT * FROM a1 - UNION ALL - SELECT ed.project_id, - COALESCE(e.source_id, 'http'), - 0 AS age_seconds - FROM convoy.event_deliveries ed - LEFT JOIN convoy.events e ON e.id = ed.event_id - WHERE ed.status = 'Success' - AND NOT EXISTS ( - SELECT 1 FROM a1 - WHERE a1.project_id = ed.project_id - AND a1.source_id = COALESCE(e.source_id, 'http') - ) - GROUP BY ed.project_id, e.source_id - ) AS combined - ORDER BY project_id, source_id - LIMIT 1000` - } + backlogQM := ` + WITH a1 AS ( + SELECT ed.project_id, + COALESCE(e.source_id, 'http') AS source_id, + EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Processing' + GROUP BY ed.project_id, e.source_id + ORDER BY age_seconds DESC, ed.project_id, e.source_id + LIMIT 1000 + ) + SELECT project_id, source_id, age_seconds + FROM ( + SELECT * FROM a1 + UNION ALL + SELECT ed.project_id, + COALESCE(e.source_id, 'http'), + 0 AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Success' + AND NOT EXISTS ( + SELECT 1 FROM a1 + WHERE a1.project_id = ed.project_id + AND a1.source_id = COALESCE(e.source_id, 'http') + ) + GROUP BY ed.project_id, e.source_id + ) AS combined + ORDER BY project_id, source_id + LIMIT 1000` rows1, err := p.GetDB().QueryxContext(ctx, backlogQM) if err != nil { return nil, fmt.Errorf("failed to query backlog metrics: %w", err) @@ -352,38 +443,23 @@ func (p *Postgres) collectMetrics() (*Metrics, error) { } metrics.EventQueueBacklogMetrics = eventQueueBacklogMetrics - var queryDeliveryQ string - if useMaterializedViews { - queryDeliveryQ = `SELECT - project_id, - project_name, - endpoint_id, - status, - event_type, - source_id, - organisation_id, - organisation_name, - total - FROM convoy.event_delivery_queue_metrics_mv` - } else { - queryDeliveryQ = ` - SELECT DISTINCT - ed.project_id, - COALESCE(p.name, '') AS project_name, - ed.endpoint_id, - ed.status, - COALESCE(ed.event_type, '') AS event_type, - COALESCE(e.source_id, 'http') AS source_id, - COALESCE(p.organisation_id, '') AS organisation_id, - COALESCE(o.name, '') AS organisation_name, - COUNT(*) AS total - FROM convoy.event_deliveries ed - LEFT JOIN convoy.events e ON ed.event_id = e.id - LEFT JOIN convoy.projects p ON ed.project_id = p.id - LEFT JOIN convoy.organisations o ON p.organisation_id = o.id - WHERE ed.deleted_at IS NULL - GROUP BY ed.project_id, p.name, ed.endpoint_id, ed.status, ed.event_type, e.source_id, p.organisation_id, o.name` - } + queryDeliveryQ := ` + SELECT DISTINCT + ed.project_id, + COALESCE(p.name, '') AS project_name, + ed.endpoint_id, + ed.status, + COALESCE(ed.event_type, '') AS event_type, + COALESCE(e.source_id, 'http') AS source_id, + COALESCE(p.organisation_id, '') AS organisation_id, + COALESCE(o.name, '') AS organisation_name, + COUNT(*) AS total + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON ed.event_id = e.id + LEFT JOIN convoy.projects p ON ed.project_id = p.id + LEFT JOIN convoy.organisations o ON p.organisation_id = o.id + WHERE ed.deleted_at IS NULL + GROUP BY ed.project_id, p.name, ed.endpoint_id, ed.status, ed.event_type, e.source_id, p.organisation_id, o.name` rows2, err := p.GetDB().QueryxContext(ctx, queryDeliveryQ) if err != nil { return nil, fmt.Errorf("failed to query delivery queue metrics: %w", err) @@ -400,45 +476,40 @@ func (p *Postgres) collectMetrics() (*Metrics, error) { } metrics.EventDeliveryQueueMetrics = eventDeliveryQueueMetrics - var backlogEQM string - if useMaterializedViews { - backlogEQM = "SELECT project_id, source_id, endpoint_id, age_seconds FROM convoy.event_endpoint_backlog_metrics_mv" - } else { - backlogEQM = ` - WITH a1 AS ( - SELECT ed.project_id, - COALESCE(e.source_id, 'http') AS source_id, - ed.endpoint_id, - EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds - FROM convoy.event_deliveries ed - LEFT JOIN convoy.events e ON e.id = ed.event_id - WHERE ed.status = 'Processing' - GROUP BY ed.project_id, e.source_id, ed.endpoint_id - ORDER BY age_seconds DESC, ed.project_id, e.source_id, ed.endpoint_id - LIMIT 1000 - ) - SELECT project_id, source_id, endpoint_id, age_seconds - FROM ( - SELECT * FROM a1 - UNION ALL - SELECT ed.project_id, - COALESCE(e.source_id, 'http'), - ed.endpoint_id, - 0 AS age_seconds - FROM convoy.event_deliveries ed - LEFT JOIN convoy.events e ON e.id = ed.event_id - WHERE ed.status = 'Success' - AND NOT EXISTS ( - SELECT 1 FROM a1 - WHERE a1.project_id = ed.project_id - AND a1.source_id = COALESCE(e.source_id, 'http') - AND a1.endpoint_id = ed.endpoint_id - ) - GROUP BY ed.project_id, e.source_id, ed.endpoint_id - ) AS combined - ORDER BY project_id, source_id, endpoint_id - LIMIT 1000` - } + backlogEQM := ` + WITH a1 AS ( + SELECT ed.project_id, + COALESCE(e.source_id, 'http') AS source_id, + ed.endpoint_id, + EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Processing' + GROUP BY ed.project_id, e.source_id, ed.endpoint_id + ORDER BY age_seconds DESC, ed.project_id, e.source_id, ed.endpoint_id + LIMIT 1000 + ) + SELECT project_id, source_id, endpoint_id, age_seconds + FROM ( + SELECT * FROM a1 + UNION ALL + SELECT ed.project_id, + COALESCE(e.source_id, 'http'), + ed.endpoint_id, + 0 AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Success' + AND NOT EXISTS ( + SELECT 1 FROM a1 + WHERE a1.project_id = ed.project_id + AND a1.source_id = COALESCE(e.source_id, 'http') + AND a1.endpoint_id = ed.endpoint_id + ) + GROUP BY ed.project_id, e.source_id, ed.endpoint_id + ) AS combined + ORDER BY project_id, source_id, endpoint_id + LIMIT 1000` rows3, err := p.GetDB().QueryxContext(ctx, backlogEQM) if err != nil { return nil, fmt.Errorf("failed to query endpoint backlog metrics: %w", err) diff --git a/docs/docs.go b/docs/docs.go index ba9adc0843..2ce051f8a0 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1,4 +1,4 @@ -// Package docs Code generated by swaggo/swag at 2026-02-10 14:18:50.71968 +0100 CET m=+1.596704293. DO NOT EDIT +// Package docs Code generated by swaggo/swag at 2026-02-17 04:41:07.508522 +0000 GMT m=+2.974422168. DO NOT EDIT package docs import "github.com/swaggo/swag" diff --git a/internal/pkg/metrics/metrics.go b/internal/pkg/metrics/metrics.go index 4ca106c13d..45df25152f 100644 --- a/internal/pkg/metrics/metrics.go +++ b/internal/pkg/metrics/metrics.go @@ -4,8 +4,10 @@ import ( "errors" "fmt" "sync" + "time" "github.com/prometheus/client_golang/prometheus" + "github.com/redis/go-redis/v9" "github.com/frain-dev/convoy/config" "github.com/frain-dev/convoy/database" @@ -33,7 +35,7 @@ func Reset() { prometheus.DefaultRegisterer = prometheus.NewRegistry() } -func RegisterQueueMetrics(q queue.Queuer, db database.Database, cbm *cb.CircuitBreakerManager) error { +func RegisterQueueMetrics(q queue.Queuer, db database.Database, rdb redis.UniversalClient, cbm *cb.CircuitBreakerManager) error { configuration, err := config.Get() if err != nil || !configuration.Metrics.IsEnabled { return err @@ -49,6 +51,12 @@ func RegisterQueueMetrics(q queue.Queuer, db database.Database, cbm *cb.CircuitB return errors.New("failed to assert postgres database") } + refreshInterval := time.Duration(configuration.Metrics.Prometheus.SampleTime) * time.Second + if refreshInterval <= 0 { + refreshInterval = 30 * time.Second + } + postgresDB.ConfigureQueueMetricsSnapshot(rdb, refreshInterval) + registry := Reg() // Register queue and database collectors