Skip to content
Open
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
101 commits
Select commit Hold shift + click to select a range
209c64b
Add backup jobs infrastructure for hourly project data exports
jirevwe Mar 30, 2026
846276e
Add models for AzureBlobStorage and BackupJob to datastore
jirevwe Mar 30, 2026
37b2f77
Add BlobStore abstraction and support for Azure Blob Storage integration
jirevwe Mar 30, 2026
93928d6
Add StreamExport for gzip-compressed JSONL backups to BlobStore
jirevwe Mar 30, 2026
b357755
Switch ExportRecords to JSONL format with snapshot consistency using …
jirevwe Mar 30, 2026
bea397c
Refactor tests to use helper functions for parsing JSONL and decompre…
jirevwe Mar 30, 2026
1c8c0d6
Switch ExportRecords to JSONL format with snapshot consistency using …
jirevwe Mar 30, 2026
1cfe7e0
Add Azure Blob Storage configuration support to queries and migrations
jirevwe Mar 30, 2026
7225cff
Add Azure Blob Storage configuration support to storage policy
jirevwe Mar 30, 2026
bb4378e
Add E2E tests for Azure Blob Storage backup functionality
jirevwe Mar 30, 2026
6b79fd5
Refactor storage policy handling to support Azure Blob Storage and si…
jirevwe Mar 30, 2026
4f1eb9c
Add unit tests for BlobStore with support for OnPrem, S3, and Azure B…
jirevwe Mar 30, 2026
4f0d221
Add support for Azurite (Azure Blob Storage emulator) for testing env…
jirevwe Mar 30, 2026
28bdc28
Handle "ContainerAlreadyExists" error when creating default Azurite c…
jirevwe Mar 31, 2026
e5edf07
Simplify `FailBackupJob` method signature by combining parameters `jo…
jirevwe Mar 31, 2026
db0bb0b
Add indexes for filtering and sorting `backup_jobs` by status and pro…
jirevwe Mar 31, 2026
c396aa3
Remove unnecessary blank line in `closeWithError` function
jirevwe Mar 31, 2026
5b89ead
Add configurable backup interval support with dynamic cron scheduling…
jirevwe Mar 31, 2026
6d2e37e
Refactor SQL queries and Go methods for `backup_jobs` to improve argu…
jirevwe Mar 31, 2026
45fe85a
Add stress test script for backups with support for S3, Azure Blob, a…
jirevwe Mar 31, 2026
e2bff73
Refactor SQL queries and Go methods for `backup_jobs` to improve argu…
jirevwe Mar 31, 2026
03e6ecb
Update Postgres version to 18 in docker-compose and configuration
jirevwe Mar 31, 2026
c8ef81f
Integrate cached repositories across services to improve query perfor…
jirevwe Apr 1, 2026
b698073
Add cached endpoint repository with comprehensive tests to improve ca…
jirevwe Apr 1, 2026
458b63d
Add cached filter repository with tests to enhance caching and minimi…
jirevwe Apr 1, 2026
3994a9b
Add cached project repository with tests to optimize caching and redu…
jirevwe Apr 1, 2026
2f73f3b
Add cached subscription repository with tests to optimize caching and…
jirevwe Apr 1, 2026
69071ea
Add cached API key repository with methods to enhance caching and red…
jirevwe Apr 1, 2026
3a492dc
Add cached organisation repository to enhance caching and reduce data…
jirevwe Apr 1, 2026
2012ef0
Add cached portal link repository to enhance caching and reduce datab…
jirevwe Apr 1, 2026
f2eaa0d
Update PgBouncer configuration to adjust pool mode, connection limits…
jirevwe Apr 1, 2026
99cff0a
Add backup collector for CDC-based WAL streaming and periodic blob st…
jirevwe Apr 1, 2026
70f90f4
Add CDC backup collector to worker for WAL streaming and periodic blo…
jirevwe Apr 1, 2026
a16b87e
Add publication for convoy tables to support CDC-based backup options
jirevwe Apr 1, 2026
1038d54
Add CDC and replication configuration options to Convoy
jirevwe Apr 1, 2026
dc72280
Add publication for convoy tables to support CDC-based backup options
jirevwe Apr 1, 2026
55274e6
Update Docker Compose: add ports and WAL configuration for Postgres; …
jirevwe Apr 1, 2026
3adad41
Add cached repositories for API keys and portal links to enhance cach…
jirevwe Apr 1, 2026
9d39d54
Remove project-scoped filtering from export functions and queries
jirevwe Apr 1, 2026
daad8f0
Update query struct: replace basic types with pgtype equivalents for …
jirevwe Apr 1, 2026
c30a8f3
Refactor repository functions to use grouped parameter declarations f…
jirevwe Apr 1, 2026
df1f45d
Add tests for backup collector, covering start/stop behavior, insert …
jirevwe Apr 2, 2026
6ec97f3
Add end-to-end tests for CDC-based backup collector on on-prem, S3, a…
jirevwe Apr 2, 2026
5d37020
Add unit tests for backup collector buffer and flush functionality
jirevwe Apr 2, 2026
feaef9b
Enable logical WAL in PostgreSQL test container for CDC backup tests
jirevwe Apr 2, 2026
ff8e6a9
Relax strict equality checks in backup export tests to allow for mult…
jirevwe Apr 2, 2026
701005b
Simplify time filtering logic in backup helper tests by replacing cut…
jirevwe Apr 2, 2026
6f35302
Add helper function to check UID existence in JSONL results in backup…
jirevwe Apr 2, 2026
4c9ebf1
Remove project-scoped filtering test from `ExportRecords` and add `Ba…
jirevwe Apr 2, 2026
f0c2db3
Add path traversal protection to `OnPremClient.Upload` to prevent ins…
jirevwe Apr 4, 2026
8878d6a
Make `OnPremClient.Upload` context-aware to handle cancellation durin…
jirevwe Apr 4, 2026
1a9c92c
Make `flushedLSN` thread-safe using `atomic.Uint64` and improve shutd…
jirevwe Apr 4, 2026
9560251
Add nil check for `storage` in `NewBlobStoreClient` to prevent nil po…
jirevwe Apr 4, 2026
d49f528
Handle `pgx.ErrNoRows` in `ClaimBackupJob` to return nil instead of a…
jirevwe Apr 4, 2026
1e066da
Add conditional creation of `convoy_backup` publication to handle exi…
jirevwe Apr 4, 2026
5e90583
Rename `BackupProjectData` task to `ExportTableData` across the codebase
jirevwe Apr 7, 2026
a5d3b57
Add `ensureContainer` to AzureBlobClient for thread-safe container cr…
jirevwe Apr 7, 2026
fbba096
Replace `ExportTableData` task with direct call to `Exporter.StreamEx…
jirevwe Apr 7, 2026
bf38375
Rename `lookback` to `lookBackDur` for consistency in variable naming
jirevwe Apr 7, 2026
974058a
Merge branch 'main' of https://github.com/frain-dev/convoy into raymo…
jirevwe Apr 7, 2026
7b0b2bb
Remove `project_id` field from `BackupJob` and drop related database …
jirevwe Apr 8, 2026
41c801d
Remove `project_id` references from backup job handling and update re…
jirevwe Apr 8, 2026
74d8145
Remove `project_id` references from backup job queries and related st…
jirevwe Apr 8, 2026
ad91823
Update blob key and file path formatting for backups to use date and …
jirevwe Apr 8, 2026
bbbc528
Remove `project_id` from backup job handling and streamline backup logic
jirevwe Apr 8, 2026
e01614b
Offset backup job scheduling to ensure enqueue runs before processing
jirevwe Apr 8, 2026
14da20b
Add new backup job utilities and remove `project_id` from backup logic
jirevwe Apr 8, 2026
8affecf
Remove `projectRepo` and `project` from `Exporter` to simplify struct…
jirevwe Apr 8, 2026
3af8459
Add support for offset in cron generation via `DurationToCronOffset`
jirevwe Apr 8, 2026
1fc402b
Remove `projectRepo` and simplify `Exporter` constructor; update back…
jirevwe Apr 8, 2026
e9c6ca1
Simplify error variable scoping in `config.Override` handling.
jirevwe Apr 8, 2026
0bb0de6
Update `Exporter` to use `expStart` and `expEnd` instead of `expDate`…
jirevwe Apr 9, 2026
391bed9
Refactor date filtering in `Exporter` queries to use start and end ti…
jirevwe Apr 9, 2026
681f512
Update `Exporter` and record-handling methods to support start and en…
jirevwe Apr 9, 2026
3d87b74
Update mock repository methods to support start and end timestamps in…
jirevwe Apr 9, 2026
c30f808
Update tests to use start and end timestamps in `ExportRecords`, repl…
jirevwe Apr 9, 2026
6cd5bd6
Add `cachedrepo` package for cache-aside repository utilities with un…
jirevwe Apr 9, 2026
a62a8c1
Refactor `CachedEndpointRepository` and tests to use `cachedrepo` uti…
jirevwe Apr 9, 2026
84b5971
Remove `CachedFilterRepository` and associated tests; migrate to `cac…
jirevwe Apr 9, 2026
5eda4f6
Remove `CachedProjectRepository` and associated tests; migrate to `ca…
jirevwe Apr 9, 2026
43c4ccf
Remove `CachedSubscriptionRepository` and associated tests; migrate t…
jirevwe Apr 9, 2026
1d234fd
Remove `CachedAPIKeyRepository` and associated implementation; migrat…
jirevwe Apr 9, 2026
e00bb76
Remove `CachedOrganisationRepository` and associated implementation; …
jirevwe Apr 9, 2026
7490c92
Remove `CachedPortalLinkRepository` and associated implementation; mi…
jirevwe Apr 9, 2026
9a3594d
Increase `PGBOUNCER_DEFAULT_POOL_SIZE` to 80 in local `.env` configur…
jirevwe Apr 9, 2026
a1709af
Clean up redundant comments in cached repository implementations.
jirevwe Apr 10, 2026
f2ee71f
Update tests to use explicit epoch start time in `ExportRecords` call…
jirevwe Apr 10, 2026
89ba623
Update tests to use current timestamps for event seeding; revise expo…
jirevwe Apr 10, 2026
c9ce84e
Add unique bucket and container creation for test isolation in backup…
jirevwe Apr 10, 2026
8b1ab12
Enable RetentionPolicy-based task registration and backup scheduling …
jirevwe Apr 10, 2026
4f49c73
Remove unused task registrations for `MonitorTwitterSources` and `Tok…
jirevwe Apr 10, 2026
bb26063
Add `NewExporterWithWindow` constructor for time-bounded manual/expor…
jirevwe Apr 10, 2026
ef74b41
Add `ManualBackup` task for on-demand, time-bounded backup operations
jirevwe Apr 10, 2026
238175e
Add `backup` command for on-demand event and delivery backup operations
jirevwe Apr 10, 2026
71068df
Add `ManualBackupJob` task type for on-demand backup operations
jirevwe Apr 10, 2026
7b4978e
Add `TriggerBackup` handler and API route for on-demand backup job cr…
jirevwe Apr 10, 2026
fa21ded
Ensure UTC timestamps in `Exporter` initialization; validate export w…
jirevwe Apr 13, 2026
ce01073
Refactor `EnqueueBackupJobIfIdle` to support time-bounded backup sche…
jirevwe Apr 13, 2026
d7edfae
Add admin guard and retention policy checks to `TriggerBackup` handler
jirevwe Apr 13, 2026
1573161
Add `notFoundErr` handling to `FetchWithNotFound` for improved cache …
jirevwe Apr 13, 2026
6f22c50
Invalidate cache entries in `DeleteFilter` using subscriptionID and e…
jirevwe Apr 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 5 additions & 23 deletions api/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/frain-dev/convoy/auth"
"github.com/frain-dev/convoy/config"
"github.com/frain-dev/convoy/datastore"
"github.com/frain-dev/convoy/datastore/cached"
"github.com/frain-dev/convoy/internal/organisation_members"
"github.com/frain-dev/convoy/internal/organisations"
"github.com/frain-dev/convoy/internal/pkg/middleware"
Expand Down Expand Up @@ -56,7 +57,7 @@ func (h *Handler) retrieveProject(r *http.Request) (*datastore.Project, error) {
var project *datastore.Project
var err error

projectRepo := projects.New(h.A.Logger, h.A.DB)
projectRepo := cached.NewCachedProjectRepository(projects.New(h.A.Logger, h.A.DB), h.A.Cache, 5*time.Minute, h.A.Logger)

switch {
case h.IsReqWithJWT(authUser), h.IsReqWithPersonalAccessToken(authUser):
Expand All @@ -81,29 +82,10 @@ func (h *Handler) retrieveProject(r *http.Request) (*datastore.Project, error) {

projectID := apiKey.Role.Project

var p datastore.Project

cacheKey := convoy.ProjectCacheKey.Get(projectID)
cacheErr := h.A.Cache.Get(r.Context(), cacheKey.String(), &p)

// If cache hit with valid data, return it
if cacheErr == nil && p.UID != "" {
h.A.Logger.Info("found item in cache")
return &p, nil
}

// Cache miss - fetch from database
pp, fetchErr := projectRepo.FetchProjectByID(r.Context(), projectID)
if fetchErr != nil {
return nil, fetchErr
}

cacheErr = h.A.Cache.Set(r.Context(), cacheKey.String(), &pp, time.Hour)
if cacheErr != nil {
h.A.Logger.Error("failed to cache item", "error", cacheErr)
project, err = projectRepo.FetchProjectByID(r.Context(), projectID)
if err != nil {
return nil, err
}

return pp, nil
case h.IsReqWithPortalLinkToken(authUser):
if len(authUser.Credential.Token) > 0 { // this is the legacy static token type
svc := portal_links.New(h.A.Logger, h.A.DB)
Expand Down
4 changes: 3 additions & 1 deletion api/handlers/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"errors"
"net/http"
"time"

"github.com/go-chi/chi/v5"
"github.com/go-chi/render"

"github.com/frain-dev/convoy"
"github.com/frain-dev/convoy/datastore"
"github.com/frain-dev/convoy/datastore/cached"
"github.com/frain-dev/convoy/internal/organisations"
"github.com/frain-dev/convoy/util"
)
Expand Down Expand Up @@ -61,7 +63,7 @@ func (h *Handler) RequireEnabledOrganisation() func(next http.Handler) http.Hand
if cachedOrg := r.Context().Value(convoy.OrganisationCtx); cachedOrg != nil {
org = cachedOrg.(*datastore.Organisation)
} else {
orgRepo := organisations.New(h.A.Logger, h.A.DB)
orgRepo := cached.NewCachedOrganisationRepository(organisations.New(h.A.Logger, h.A.DB), h.A.Cache, 5*time.Minute, h.A.Logger)
org, err = orgRepo.FetchOrganisationByID(r.Context(), project.OrganisationID)
if err != nil {
h.A.Logger.Error("Failed to fetch organisation for disabled check", "error", err)
Expand Down
34 changes: 30 additions & 4 deletions api/models/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,17 @@ type ConfigurationResponse struct {
}

type StoragePolicyConfiguration struct {
// Storage policy type e.g on_prem or s3
// Storage policy type e.g on_prem, s3, or azure_blob
Type datastore.StorageType `json:"type,omitempty" valid:"supported_storage~please provide a valid storage type,required"`

// S3 Bucket creds
S3 *S3Storage `json:"s3"`

// On_Prem directory
OnPrem *OnPremStorage `json:"on_prem"`

// Azure Blob Storage creds
AzureBlob *AzureBlobStorage `json:"azure_blob"`
}

func (sc *StoragePolicyConfiguration) Transform() *datastore.StoragePolicyConfiguration {
Expand All @@ -63,9 +66,10 @@ func (sc *StoragePolicyConfiguration) Transform() *datastore.StoragePolicyConfig
}

return &datastore.StoragePolicyConfiguration{
Type: sc.Type,
S3: sc.S3.transform(),
OnPrem: sc.OnPrem.transform(),
Type: sc.Type,
S3: sc.S3.transform(),
OnPrem: sc.OnPrem.transform(),
AzureBlob: sc.AzureBlob.transform(),
}
}

Expand Down Expand Up @@ -119,3 +123,25 @@ func (os *OnPremStorage) transform() *datastore.OnPremStorage {

return &datastore.OnPremStorage{Path: os.Path}
}

type AzureBlobStorage struct {
AccountName null.String `json:"account_name"`
AccountKey null.String `json:"account_key,omitempty"`
ContainerName null.String `json:"container_name"`
Endpoint null.String `json:"endpoint,omitempty"`
Prefix null.String `json:"prefix,omitempty"`
}

func (az *AzureBlobStorage) transform() *datastore.AzureBlobStorage {
if az == nil {
return nil
}

return &datastore.AzureBlobStorage{
AccountName: az.AccountName,
AccountKey: az.AccountKey,
ContainerName: az.ContainerName,
Endpoint: az.Endpoint,
Prefix: az.Prefix,
}
}
5 changes: 3 additions & 2 deletions cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
ingestSrv "github.com/frain-dev/convoy/cmd/ingest"
workerSrv "github.com/frain-dev/convoy/cmd/worker"
"github.com/frain-dev/convoy/config"
"github.com/frain-dev/convoy/datastore/cached"
"github.com/frain-dev/convoy/internal/api_keys"
"github.com/frain-dev/convoy/internal/configuration"
"github.com/frain-dev/convoy/internal/pkg/cli"
Expand Down Expand Up @@ -150,9 +151,9 @@ func startServerComponent(_ context.Context, a *cli.App) error {
lo.Info("Starting Convoy data plane")

userRepo := users.New(a.Logger, a.DB)
apiKeyRepo := api_keys.New(a.Logger, a.DB)
apiKeyRepo := cached.NewCachedAPIKeyRepository(api_keys.New(a.Logger, a.DB), a.Cache, 5*time.Minute, a.Logger)
configRepo := configuration.New(a.Logger, a.DB)
portalLinkRepo := portal_links.New(a.Logger, a.DB)
portalLinkRepo := cached.NewCachedPortalLinkRepository(portal_links.New(a.Logger, a.DB), a.Cache, 5*time.Minute, a.Logger)
err = realm_chain.Init(&cfg.Auth, apiKeyRepo, userRepo, portalLinkRepo, a.Cache, a.Logger)
if err != nil {
return fmt.Errorf("failed to initialize realm chain: %w", err)
Expand Down
23 changes: 15 additions & 8 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"github.com/frain-dev/convoy/auth/realm_chain"
"github.com/frain-dev/convoy/config"
"github.com/frain-dev/convoy/database/postgres"
"github.com/frain-dev/convoy/datastore/cached"
"github.com/frain-dev/convoy/internal/api_keys"
"github.com/frain-dev/convoy/internal/configuration"
"github.com/frain-dev/convoy/internal/pkg/cli"
"github.com/frain-dev/convoy/internal/pkg/exporter"
"github.com/frain-dev/convoy/internal/pkg/fflag"
"github.com/frain-dev/convoy/internal/pkg/keys"
"github.com/frain-dev/convoy/internal/pkg/metrics"
Expand Down Expand Up @@ -116,9 +118,9 @@ func StartConvoyServer(a *cli.App) error {
return err
}

apiKeyRepo := api_keys.New(a.Logger, a.DB)
apiKeyRepo := cached.NewCachedAPIKeyRepository(api_keys.New(a.Logger, a.DB), a.Cache, 5*time.Minute, a.Logger)
userRepo := users.New(a.Logger, a.DB)
portalLinkRepo := portal_links.New(a.Logger, a.DB)
portalLinkRepo := cached.NewCachedPortalLinkRepository(portal_links.New(a.Logger, a.DB), a.Cache, 5*time.Minute, a.Logger)
configRepo := configuration.New(a.Logger, a.DB)
err = realm_chain.Init(&cfg.Auth, apiKeyRepo, userRepo, portalLinkRepo, a.Cache, a.Logger)
if err != nil {
Expand Down Expand Up @@ -187,13 +189,18 @@ func StartConvoyServer(a *cli.App) error {
// lo.Infof("Registered metrics materialized view refresh every %d min", refreshInterval)
// }

// ensures that project data is backed up about 2 hours before they are deleted
if a.Licenser.RetentionPolicy() {
// runs at 10pm
s.RegisterTask("0 22 * * *", convoy.ScheduleQueue, convoy.BackupProjectData)
// runs at 1am
s.RegisterTask("0 1 * * *", convoy.ScheduleQueue, convoy.RetentionPolicies)
// Register cron-based backup tasks only when CDC backup is not enabled.
// When CDC is active, the BackupCollector in the worker handles exports continuously.
if !cfg.RetentionPolicy.CDCBackupEnabled {
backupInterval := exporter.ParseBackupInterval(cfg.RetentionPolicy.BackupInterval)
backupCron := exporter.DurationToCron(backupInterval)

s.RegisterTask(backupCron, convoy.ScheduleQueue, convoy.EnqueueBackupJobs)
s.RegisterTask(backupCron, convoy.ScheduleQueue, convoy.ProcessBackupJob)
s.RegisterTask(backupCron, convoy.ScheduleQueue, convoy.BackupProjectData)
}
// Retention always runs at 1am
s.RegisterTask("0 1 * * *", convoy.ScheduleQueue, convoy.RetentionPolicies)

err = metrics.RegisterQueueMetrics(a.Queue, a.DB, nil)
if err != nil {
Expand Down
60 changes: 52 additions & 8 deletions cmd/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/frain-dev/convoy/config"
"github.com/frain-dev/convoy/database/postgres"
"github.com/frain-dev/convoy/datastore"
"github.com/frain-dev/convoy/datastore/cached"
"github.com/frain-dev/convoy/internal/backup_jobs"
batch_retries "github.com/frain-dev/convoy/internal/batch_retries"
"github.com/frain-dev/convoy/internal/configuration"
"github.com/frain-dev/convoy/internal/delivery_attempts"
Expand All @@ -23,8 +25,11 @@ import (
"github.com/frain-dev/convoy/internal/filters"
"github.com/frain-dev/convoy/internal/meta_events"
"github.com/frain-dev/convoy/internal/organisations"
backup_collector "github.com/frain-dev/convoy/internal/pkg/backup_collector"
"github.com/frain-dev/convoy/internal/pkg/billing"
blobstore "github.com/frain-dev/convoy/internal/pkg/blob-store"
"github.com/frain-dev/convoy/internal/pkg/cli"
"github.com/frain-dev/convoy/internal/pkg/exporter"
"github.com/frain-dev/convoy/internal/pkg/fflag"
"github.com/frain-dev/convoy/internal/pkg/keys"
"github.com/frain-dev/convoy/internal/pkg/limiter"
Expand All @@ -50,8 +55,9 @@ import (
)

type Worker struct {
consumer *worker.Consumer
logger log.Logger
consumer *worker.Consumer
backupCollector *backup_collector.BackupCollector // nil if CDC backup disabled
logger log.Logger
}

// NewWorker initializes all worker components and returns a Worker instance.
Expand Down Expand Up @@ -115,16 +121,17 @@ func NewWorker(ctx context.Context, a *cli.App, cfg config.Configuration) (*Work
}
}

projectRepo := projects.New(a.Logger, a.DB)
projectRepo := cached.NewCachedProjectRepository(projects.New(a.Logger, a.DB), a.Cache, 5*time.Minute, lo)
metaEventRepo := meta_events.New(a.Logger, a.DB)
endpointRepo := endpoints.New(a.Logger, a.DB)
endpointRepo := cached.NewCachedEndpointRepository(endpoints.New(a.Logger, a.DB), a.Cache, 2*time.Minute, lo)
eventRepo := events.New(a.Logger, a.DB)
jobRepo := postgres.NewJobRepo(a.DB)
eventDeliveryRepo := event_deliveries.New(a.Logger, a.DB)
subRepo := subscriptions.New(a.Logger, a.DB)
subRepo := cached.NewCachedSubscriptionRepository(subscriptions.New(a.Logger, a.DB), a.Cache, 30*time.Second, lo)
configRepo := configuration.New(a.Logger, a.DB)
attemptRepo := delivery_attempts.New(a.Logger, a.DB)
filterRepo := filters.New(a.Logger, a.DB)
backupJobRepo := backup_jobs.New(a.Logger, a.DB)
filterRepo := cached.NewCachedFilterRepository(filters.New(a.Logger, a.DB), a.Cache, 2*time.Minute, lo)
batchRetryRepo := batch_retries.New(lo, a.DB)

rd, err := rdb.NewClientFromRedisConfig(cfg.Redis)
Expand Down Expand Up @@ -352,6 +359,8 @@ func NewWorker(ctx context.Context, a *cli.App, cfg config.Configuration) (*Work
if a.Licenser.RetentionPolicy() {
consumer.RegisterHandlers(convoy.RetentionPolicies, task.RetentionPolicies(rd.Client(), ret, lo), nil)
consumer.RegisterHandlers(convoy.BackupProjectData, task.BackupProjectData(configRepo, projectRepo, eventRepo, eventDeliveryRepo, attemptRepo, rd.Client(), lo), nil)
consumer.RegisterHandlers(convoy.EnqueueBackupJobs, task.EnqueueBackupJobs(configRepo, projectRepo, backupJobRepo, lo), nil)
consumer.RegisterHandlers(convoy.ProcessBackupJob, task.ProcessBackupJob(configRepo, projectRepo, eventRepo, eventDeliveryRepo, attemptRepo, backupJobRepo, lo), nil)
}

matchSubscriptionsDeps := task.MatchSubscriptionsDeps{
Expand Down Expand Up @@ -417,9 +426,31 @@ func NewWorker(ctx context.Context, a *cli.App, cfg config.Configuration) (*Work
return nil, fmt.Errorf("failed to register queue metrics: %w", err)
}

// Optionally start CDC-based backup collector
var collector *backup_collector.BackupCollector
lo.Info(fmt.Sprintf("CDC backup config: enabled=%v, retention=%v", cfg.RetentionPolicy.CDCBackupEnabled, cfg.RetentionPolicy.IsRetentionPolicyEnabled))
if cfg.RetentionPolicy.CDCBackupEnabled && cfg.RetentionPolicy.IsRetentionPolicyEnabled {
blobStoreClient, blobErr := blobstore.NewBlobStoreClient(configuration.StoragePolicy, lo)
if blobErr != nil {
return nil, fmt.Errorf("failed to create blob store for CDC backup: %w", blobErr)
}

flushInterval := exporter.ParseBackupInterval(cfg.RetentionPolicy.BackupInterval)

// ReplicationDSN connects directly to Postgres (bypassing pgbouncer)
// for the WAL replication protocol. Falls back to normal DSN if not set.
replDSN := cfg.RetentionPolicy.ReplicationDSN
if replDSN == "" {
replDSN = cfg.Database.BuildDsn()
}

collector = backup_collector.NewBackupCollector(a.DB.GetConn(), replDSN, blobStoreClient, flushInterval, lo)
}

return &Worker{
consumer: consumer,
logger: lo,
consumer: consumer,
backupCollector: collector,
logger: lo,
}, nil
}

Expand All @@ -429,13 +460,26 @@ func (w *Worker) Run(ctx context.Context, workerReady chan struct{}) error {
}
w.logger.Printf("Starting Convoy Consumer Pool")

// Start CDC backup collector if enabled
if w.backupCollector != nil {
if err := w.backupCollector.Start(ctx); err != nil {
w.logger.Error(fmt.Sprintf("failed to start backup collector: %v", err))
// Non-fatal — worker can still process events without CDC backup
}
}

if workerReady != nil {
close(workerReady)
}

// Wait for context to be canceled before returning
<-ctx.Done()
w.logger.Printf("Context canceled, stopping Convoy Consumer Pool...")

if w.backupCollector != nil {
w.backupCollector.Stop(ctx)
}

w.consumer.Stop()
w.logger.Printf("Convoy Consumer Pool stopped")

Expand Down
19 changes: 16 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ var DefaultConfiguration = Configuration{
RetentionPolicy: RetentionPolicyConfiguration{
Policy: "720h",
IsRetentionPolicyEnabled: false,
BackupInterval: "1h",
},
CircuitBreaker: CircuitBreakerConfiguration{
SampleRate: 30,
Expand Down Expand Up @@ -375,6 +376,9 @@ type DatadogConfiguration struct {
type RetentionPolicyConfiguration struct {
Policy string `json:"policy" envconfig:"CONVOY_RETENTION_POLICY"`
IsRetentionPolicyEnabled bool `json:"enabled" envconfig:"CONVOY_RETENTION_POLICY_ENABLED"`
BackupInterval string `json:"backup_interval" envconfig:"CONVOY_BACKUP_INTERVAL"`
CDCBackupEnabled bool `json:"cdc_backup_enabled" envconfig:"CONVOY_CDC_BACKUP_ENABLED"`
ReplicationDSN string `json:"replication_dsn" envconfig:"CONVOY_REPLICATION_DSN"`
}

type CircuitBreakerConfiguration struct {
Expand All @@ -393,9 +397,10 @@ type AnalyticsConfiguration struct {
}

type StoragePolicyConfiguration struct {
Type string `json:"type" envconfig:"CONVOY_STORAGE_POLICY_TYPE"`
S3 S3Storage `json:"s3"`
OnPrem OnPremStorage `json:"on_prem"`
Type string `json:"type" envconfig:"CONVOY_STORAGE_POLICY_TYPE"`
S3 S3Storage `json:"s3"`
OnPrem OnPremStorage `json:"on_prem"`
AzureBlob AzureBlobStorage `json:"azure_blob"`
}

type S3Storage struct {
Expand All @@ -412,6 +417,14 @@ type OnPremStorage struct {
Path string `json:"path" envconfig:"CONVOY_STORAGE_PREM_PATH"`
}

type AzureBlobStorage struct {
AccountName string `json:"account_name" envconfig:"CONVOY_STORAGE_AZURE_ACCOUNT_NAME"`
AccountKey string `json:"account_key" envconfig:"CONVOY_STORAGE_AZURE_ACCOUNT_KEY"`
ContainerName string `json:"container_name" envconfig:"CONVOY_STORAGE_AZURE_CONTAINER_NAME"`
Endpoint string `json:"endpoint" envconfig:"CONVOY_STORAGE_AZURE_ENDPOINT"`
Prefix string `json:"prefix" envconfig:"CONVOY_STORAGE_AZURE_PREFIX"`
}

type MetricsConfiguration struct {
IsEnabled bool `json:"enabled" envconfig:"CONVOY_METRICS_ENABLED"`
Backend MetricsBackend `json:"metrics_backend" envconfig:"CONVOY_METRICS_BACKEND"`
Expand Down
5 changes: 3 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func TestLoadConfig(t *testing.T) {
RetentionPolicy: RetentionPolicyConfiguration{
Policy: "720h",
IsRetentionPolicyEnabled: true,
BackupInterval: "1h",
},
CircuitBreaker: CircuitBreakerConfiguration{
SampleRate: 30,
Expand Down Expand Up @@ -208,7 +209,7 @@ func TestLoadConfig(t *testing.T) {
APIVersion: DefaultAPIVersion,
Host: "localhost:5005",
ConsumerPoolSize: 100,
RetentionPolicy: RetentionPolicyConfiguration{Policy: "720h"},
RetentionPolicy: RetentionPolicyConfiguration{Policy: "720h", BackupInterval: "1h"},
Database: DatabaseConfiguration{
Type: PostgresDatabaseProvider,
Scheme: "postgres",
Expand Down Expand Up @@ -307,7 +308,7 @@ func TestLoadConfig(t *testing.T) {
wantCfg: Configuration{
APIVersion: DefaultAPIVersion,
Host: "localhost:5005",
RetentionPolicy: RetentionPolicyConfiguration{Policy: "720h"},
RetentionPolicy: RetentionPolicyConfiguration{Policy: "720h", BackupInterval: "1h"},
ConsumerPoolSize: 100,
CircuitBreaker: CircuitBreakerConfiguration{
SampleRate: 30,
Expand Down
Loading
Loading