From 740d05c74cc0da5edc67a951fe62ceae0edb246b Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Mon, 25 May 2026 10:39:22 -0600 Subject: [PATCH 1/3] throttle on threads --- pkg/migration/runner.go | 81 ++++--- pkg/throttler/aurora.go | 114 ++++++++++ pkg/throttler/aurora_activethreads.go | 197 ++++++++++++++++++ pkg/throttler/aurora_activethreads_test.go | 112 ++++++++++ ...mmitlatency.go => aurora_commitlatency.go} | 0 ...y_test.go => aurora_commitlatency_test.go} | 0 pkg/throttler/aurora_test.go | 89 ++++++++ 7 files changed, 568 insertions(+), 25 deletions(-) create mode 100644 pkg/throttler/aurora.go create mode 100644 pkg/throttler/aurora_activethreads.go create mode 100644 pkg/throttler/aurora_activethreads_test.go rename pkg/throttler/{commitlatency.go => aurora_commitlatency.go} (100%) rename pkg/throttler/{commitlatency_test.go => aurora_commitlatency_test.go} (100%) create mode 100644 pkg/throttler/aurora_test.go diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index 88f387ed..dacfaf04 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -42,10 +42,17 @@ var ( ) type Runner struct { - migration *Migration - db *sql.DB - dbConfig *dbconn.DBConfig - replicas []*sql.DB + migration *Migration + db *sql.DB + dbConfig *dbconn.DBConfig + replicas []*sql.DB + // monitorDB is a small dedicated connection pool used by the Aurora + // throttlers to poll perf-schema / global-status. Sharing the main + // r.db pool let throttler polls queue behind chunk writes, which + // delayed the very signal we wanted to react to (and counted the + // throttler's own SELECT as an active query thread). nil unless Aurora + // throttling is enabled. + monitorDB *sql.DB checkpointTable *table.TableInfo // Changes enccapsulates all changes @@ -670,6 +677,9 @@ func (r *Runner) closeReplicas() error { // - one replication throttler per --replica-dsn (slowest wins) // - a commit-latency throttler if the source is detected as Aurora and // --max-commit-latency is positive (issue #468) +// - an active-threads throttler if the source is detected as Aurora and +// the migration user can read the relevant perf-schema tables (issue +// #831) // // Multiple replica DSNs can be specified as a comma-separated list. // This is common logic shared between resume and new migration paths. @@ -691,26 +701,33 @@ func (r *Runner) setupThrottler(ctx context.Context) error { throttlers = append(throttlers, replicaThrottlers...) } - if r.migration.MaxCommitLatency > 0 { - isAurora, err := throttler.IsAurora(ctx, r.db) - if err != nil { - // Probe failure (e.g., performance_schema disabled, no privileges) - // is non-fatal — Aurora-only feature on a non-Aurora server, or - // a perf-schema-locked Aurora user. Log at Debug so operators can - // diagnose if they expected throttling, without alerting users - // who don't care. - r.logger.Debug("Aurora probe failed, skipping commit-latency throttler", "error", err) - } else if isAurora { - cl, err := throttler.NewCommitLatencyThrottler(r.db, r.migration.MaxCommitLatency, r.logger) - if err != nil { - _ = r.closeReplicas() - return fmt.Errorf("could not create commit-latency throttler: %w", err) - } - r.logger.Info("Aurora detected, enabling commit-latency throttler", - "threshold", r.migration.MaxCommitLatency) - throttlers = append(throttlers, cl) - } + // Aurora throttlers — assembled by the shared throttler.AuroraSetup + // helper so the move runner can use the same wiring. Build returns a + // zero result on non-Aurora sources (and when MaxCommitLatency is 0), + // which lets us treat the helper as unconditional here. + // + // OpenMonitor is invoked lazily by the helper only after IsAurora + // returns true, so non-Aurora users never pay the connect cost. + // MaxOpenConnections=2 lets both Aurora throttlers poll concurrently + // without serializing on a single conn, with a touch of headroom. + auroraRes, err := throttler.AuroraSetup{ + Source: r.db, + OpenMonitor: func() (*sql.DB, error) { + monitorCfg := *r.dbConfig // shallow copy — MaxOpenConnections is value-typed + monitorCfg.MaxOpenConnections = 2 + return dbconn.NewWithConnectionType(r.dsn(), &monitorCfg, "monitor database") + }, + CommitLatencyThreshold: r.migration.MaxCommitLatency, + Logger: r.logger, + }.Build(ctx) + if err != nil { + _ = r.closeReplicas() + return err } + if auroraRes.MonitorDB != nil { + r.monitorDB = auroraRes.MonitorDB + } + throttlers = append(throttlers, auroraRes.Throttlers...) if len(throttlers) == 0 { return nil // use default Noop throttler @@ -721,8 +738,13 @@ func (r *Runner) setupThrottler(ctx context.Context) error { if err := r.throttler.Open(ctx); err != nil { // multiThrottler already closes child throttlers on partial Open // failure, but the *sql.DB connections backing replica throttlers - // are owned by r.replicas — clean those up too rather than leaving - // them dangling until Runner.Close() runs. + // are owned by r.replicas (and the Aurora monitor pool is owned + // by r.monitorDB) — clean those up too rather than leaving them + // dangling until Runner.Close() runs. + if r.monitorDB != nil { + _ = r.monitorDB.Close() + r.monitorDB = nil + } _ = r.closeReplicas() return fmt.Errorf("opening throttlers: %w", err) } @@ -1024,6 +1046,15 @@ func (r *Runner) Close() error { errs = append(errs, err) } } + // Close the Aurora monitor pool after the throttler so its background + // pollers observe Close() / ctx cancellation before we yank the pool + // out from under them. No-op when not Aurora. + if r.monitorDB != nil { + if err := r.monitorDB.Close(); err != nil { + errs = append(errs, err) + } + r.monitorDB = nil + } if err := r.closeReplicas(); err != nil { errs = append(errs, err) } diff --git a/pkg/throttler/aurora.go b/pkg/throttler/aurora.go new file mode 100644 index 00000000..13c732d4 --- /dev/null +++ b/pkg/throttler/aurora.go @@ -0,0 +1,114 @@ +package throttler + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "time" +) + +// AuroraSetup orchestrates probing for Aurora and assembling the Aurora- +// specific throttlers (commit-latency + active-threads). It exists so both +// the migration runner and the move runner can wire up the same throttlers +// without duplicating the IsAurora / monitor-pool / probe / construct dance. +// +// The throttler package intentionally does not import dbconn — opening the +// monitor pool happens via the caller-supplied OpenMonitor closure, which +// lets the caller own DSN, TLS, and pool sizing. +type AuroraSetup struct { + // Source is the caller's main *sql.DB. Used only for the one-shot + // IsAurora and CanReadActiveThreads probes — these are cheap and run + // once at setup, so making them share the main pool is fine. + Source *sql.DB + + // OpenMonitor opens a dedicated *sql.DB used exclusively by the Aurora + // throttlers for recurring polls. Called at most once, only after + // IsAurora has returned true, so non-Aurora callers never pay the + // connect cost. The caller owns closing the returned DB — see + // AuroraResult.MonitorDB. + OpenMonitor func() (*sql.DB, error) + + // CommitLatencyThreshold gates the whole Aurora throttler family. A + // non-positive value disables both throttlers entirely (this is how + // callers opt out without touching their detection logic). + CommitLatencyThreshold time.Duration + + Logger *slog.Logger +} + +// AuroraResult is the output of AuroraSetup.Build. When Throttlers is empty +// MonitorDB is nil — there's no pool to close. When Throttlers is non-empty +// MonitorDB is non-nil and the caller owns its lifecycle. +type AuroraResult struct { + Throttlers []Throttler + MonitorDB *sql.DB +} + +// Build probes the source for Aurora and assembles the Aurora throttlers. +// +// Returns a zero AuroraResult (nil throttlers, nil monitor DB, nil error) in +// any of these benign cases: +// - CommitLatencyThreshold <= 0 (caller opted out) +// - IsAurora probe failed (non-Aurora source, or perf_schema not readable — +// logged at Debug so the non-Aurora common case stays quiet) +// - IsAurora returned false +// +// Returns a non-nil error only when something the caller almost certainly +// wants to surface goes wrong: OpenMonitor itself fails, or constructing a +// throttler fails for a reason other than "missing privileges" (which is +// expected and downgraded to Info — see CanReadActiveThreads handling). +// +// On a successful Aurora build, the monitor pool is opened and both +// throttlers (or just commit-latency if active-threads grants are missing) +// are returned. The caller composes them via NewMultiThrottler with whatever +// other throttlers it has and is responsible for calling Close on each +// throttler AND Close on MonitorDB at shutdown. +func (s AuroraSetup) Build(ctx context.Context) (AuroraResult, error) { + if s.CommitLatencyThreshold <= 0 { + return AuroraResult{}, nil + } + + isAurora, err := IsAurora(ctx, s.Source) + switch { + case err != nil: + // Non-Aurora MySQL with locked-down perf_schema lands here too; + // keep it at Debug so the common case isn't noisy. + s.Logger.Debug("Aurora probe failed, skipping Aurora throttlers", "error", err) + return AuroraResult{}, nil + case !isAurora: + return AuroraResult{}, nil + } + + monitorDB, err := s.OpenMonitor() + if err != nil { + return AuroraResult{}, fmt.Errorf("could not open monitor DB for Aurora throttlers: %w", err) + } + + cl, err := NewCommitLatencyThrottler(monitorDB, s.CommitLatencyThreshold, s.Logger) + if err != nil { + _ = monitorDB.Close() + return AuroraResult{}, fmt.Errorf("could not create commit-latency throttler: %w", err) + } + s.Logger.Info("Aurora detected, enabling commit-latency throttler", + "threshold", s.CommitLatencyThreshold) + throttlers := []Throttler{cl} + + // Active-threads has its own privilege gate — perf-schema table grants + // are independent of global_status. On confirmed Aurora a missing grant + // is operator-visible (we say so at Info), but it doesn't stop us from + // using the commit-latency signal. + if ok, err := CanReadActiveThreads(ctx, s.Source); err != nil { + s.Logger.Info("Aurora active-threads throttler disabled: grant SELECT on performance_schema.threads and performance_schema.events_waits_current to enable", + "error", err) + } else if ok { + at, err := NewActiveThreadsThrottler(monitorDB, s.Logger) + if err != nil { + _ = monitorDB.Close() + return AuroraResult{}, fmt.Errorf("could not create active-threads throttler: %w", err) + } + throttlers = append(throttlers, at) + } + + return AuroraResult{Throttlers: throttlers, MonitorDB: monitorDB}, nil +} diff --git a/pkg/throttler/aurora_activethreads.go b/pkg/throttler/aurora_activethreads.go new file mode 100644 index 00000000..068a0c5e --- /dev/null +++ b/pkg/throttler/aurora_activethreads.go @@ -0,0 +1,197 @@ +package throttler + +import ( + "context" + "database/sql" + "errors" + "fmt" + "log/slog" + "sync/atomic" + "time" +) + +// Active-thread throttling — the second Aurora signal, from issue #831. +// +// This is similar to gh-ost and pt-osc using a "threads running" metric to +// detect CPU pressure and back off when the MySQL server is busy. +// +// However, our implementation has one important difference: +// We exclude the threads that are waiting on redo-log flush. +// +// The thought process here is that redo-log waiters are parked anyway, and thus +// not consuming CPU. We can also oversubscribe them because they will group-commit +// to the underlying storage anyway. +// +// In previous experiments I have tried to also count redo-log threads at 50% etc, +// but not including them seems to be the most robust heuristic. +// +// Also different from previous tools use of "threads running" is that we compare +// the threads to the instance vCPU count, which we can deterministically get on +// Aurora via @@innodb_purge_threads. +// +// This is called *aurora* active threads, but the only thing aurora specific is that +// the vCPU count is retrievable from a variable. I've tried on MySQL to find a consistent +// way to get the vCPU count, but there doesn't seem to be a reliable way. So for now, +// it misses out on this functionality. +const ( + // activeThreadsQuery counts query-running threads on the server and + // subtracts those parked on redo-log waits. LEFT JOIN handles the case + // where events_waits_current has no row for a thread (e.g., wait + // consumer disabled) — that thread is then counted as on-CPU, which is + // the safe-conservative behavior. + activeThreadsQuery = `SELECT + GREATEST(COUNT(*) - COALESCE(SUM(CASE WHEN ewc.EVENT_NAME = 'wait/io/redo_log_flush' AND ewc.END_EVENT_ID IS NULL THEN 1 ELSE 0 END), 0), 0) AS active_cpu_threads + FROM performance_schema.threads pps + LEFT JOIN performance_schema.events_waits_current ewc ON pps.THREAD_ID = ewc.THREAD_ID + WHERE pps.PROCESSLIST_ID IS NOT NULL + AND pps.PROCESSLIST_COMMAND = 'Query'` + + // auroravCPUsQuery reads the vCPU count from @@innodb_purge_threads. + // Aurora pins this to the instance vCPU count (see issue #831). On RDS + // MySQL the var also exists but its value tracks Aurora-style sizing + // only on Aurora — we gate the throttler on IsAurora, so this query is + // only ever issued there. + auroravCPUsQuery = `SELECT @@innodb_purge_threads` +) + +// activeThreadsPollInterval mirrors commitLatencyPollInterval — we want fast +// enough to catch sustained CPU pressure without hammering the perf-schema +// join. Var (not const) so tests can shorten it. +var activeThreadsPollInterval = 5 * time.Second + +// CanReadActiveThreads probes whether the current user can run the active- +// threads query. Returns (true, nil) when the query runs cleanly, (false, +// err) when it fails (typically: SELECT on performance_schema.threads or +// events_waits_current denied). Called from runner setup so we can skip the +// throttler quietly on accounts without perf-schema read access — IsAurora +// already proved global_status is readable, but perf-schema table grants are +// independent. +func CanReadActiveThreads(ctx context.Context, db *sql.DB) (bool, error) { + var n int64 + if err := db.QueryRowContext(ctx, activeThreadsQuery).Scan(&n); err != nil { + return false, fmt.Errorf("probing active-threads query (requires SELECT on performance_schema.threads, events_waits_current): %w", err) + } + return true, nil +} + +// ActiveThreads throttles when the count of query-running threads (excluding +// those waiting on the redo log) exceeds the instance vCPU count. Aurora- +// only — depends on @@innodb_purge_threads matching vCPUs. +type ActiveThreads struct { + db *sql.DB + logger *slog.Logger + + // vCPUs is the throttle threshold, captured once at Open and treated as + // immutable for the migration's lifetime. Aurora instance type changes + // require a restart so it cannot move under us. + vCPUs int64 + + isThrottled atomic.Bool + isClosed atomic.Bool + + // lastActiveThreads holds the most recent observation for logging. + lastActiveThreads atomic.Int64 +} + +var _ Throttler = (*ActiveThreads)(nil) + +// NewActiveThreadsThrottler returns a Throttler that polls performance_schema +// to count on-CPU query threads and throttles when that count exceeds the +// instance vCPU count. +func NewActiveThreadsThrottler(db *sql.DB, logger *slog.Logger) (*ActiveThreads, error) { + if db == nil { + return nil, errors.New("active-threads throttler requires a non-nil DB") + } + return &ActiveThreads{ + db: db, + logger: logger, + }, nil +} + +func (a *ActiveThreads) Open(ctx context.Context) error { + if err := a.db.QueryRowContext(ctx, auroravCPUsQuery).Scan(&a.vCPUs); err != nil { + return fmt.Errorf("reading @@innodb_purge_threads for vCPU count: %w", err) + } + if a.vCPUs <= 0 { + return fmt.Errorf("@@innodb_purge_threads returned non-positive value %d", a.vCPUs) + } + a.logger.Info("Aurora active-threads throttler enabled", "vCPUs", a.vCPUs) + if err := a.UpdateLag(ctx); err != nil { + return err + } + go a.run(ctx) + return nil +} + +func (a *ActiveThreads) run(ctx context.Context) { + ticker := time.NewTicker(activeThreadsPollInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if a.isClosed.Load() { + return + } + if err := a.UpdateLag(ctx); err != nil { + a.logger.Error("error sampling Aurora active threads", "error", err) + } + } + } +} + +func (a *ActiveThreads) Close() error { + a.isClosed.Store(true) + return nil +} + +func (a *ActiveThreads) IsThrottled() bool { + return a.isThrottled.Load() +} + +// BlockWait blocks until active CPU threads fall to or below vCPUs, or up to +// 60s. Matches the commit-latency throttler's loop shape so the multi- +// throttler waits uniformly across signals. +func (a *ActiveThreads) BlockWait(ctx context.Context) { + timer := time.NewTimer(blockWaitInterval) + defer timer.Stop() + + for range 60 { + if !a.isThrottled.Load() { + return + } + timer.Reset(blockWaitInterval) + select { + case <-ctx.Done(): + return + case <-timer.C: + } + } + a.logger.Warn("active-threads throttler timed out", + "active_threads", a.lastActiveThreads.Load(), + "vCPUs", a.vCPUs) +} + +// UpdateLag samples active CPU threads and updates throttled state. +func (a *ActiveThreads) UpdateLag(ctx context.Context) error { + var active int64 + if err := a.db.QueryRowContext(ctx, activeThreadsQuery).Scan(&active); err != nil { + return fmt.Errorf("sampling Aurora active threads: %w", err) + } + a.applySample(active) + return nil +} + +// applySample updates state from a single observation. Split out so tests can +// drive the calculation without a real Aurora. +func (a *ActiveThreads) applySample(active int64) { + a.lastActiveThreads.Store(active) + throttled := active > a.vCPUs + prev := a.isThrottled.Swap(throttled) + if throttled && !prev { + a.logger.Warn("active CPU threads exceed vCPUs, throttling", + "active_threads", active, + "vCPUs", a.vCPUs) + } +} diff --git a/pkg/throttler/aurora_activethreads_test.go b/pkg/throttler/aurora_activethreads_test.go new file mode 100644 index 00000000..d1d5035b --- /dev/null +++ b/pkg/throttler/aurora_activethreads_test.go @@ -0,0 +1,112 @@ +package throttler + +import ( + "context" + "database/sql" + "io" + "log/slog" + "testing" + "time" + + "github.com/block/spirit/pkg/testutils" + "github.com/block/spirit/pkg/utils" + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/require" +) + +func newTestActiveThreads(t *testing.T, vCPUs int64) *ActiveThreads { + t.Helper() + return &ActiveThreads{ + vCPUs: vCPUs, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + } +} + +func TestActiveThreads_BelowVCPUs(t *testing.T) { + a := newTestActiveThreads(t, 8) + a.applySample(4) + require.False(t, a.IsThrottled()) + require.Equal(t, int64(4), a.lastActiveThreads.Load()) +} + +func TestActiveThreads_AtVCPUsIsNotThrottled(t *testing.T) { + // Threshold is strictly greater than vCPUs — sitting exactly at vCPUs is + // not over-subscribed, so we should not throttle. + a := newTestActiveThreads(t, 8) + a.applySample(8) + require.False(t, a.IsThrottled()) +} + +func TestActiveThreads_AboveVCPUsThrottles(t *testing.T) { + a := newTestActiveThreads(t, 8) + a.applySample(9) + require.True(t, a.IsThrottled()) + require.Equal(t, int64(9), a.lastActiveThreads.Load()) +} + +func TestActiveThreads_RecoversBelowVCPUs(t *testing.T) { + a := newTestActiveThreads(t, 8) + a.applySample(20) + require.True(t, a.IsThrottled()) + a.applySample(3) + require.False(t, a.IsThrottled()) +} + +func TestActiveThreads_BlockWaitReturnsImmediatelyWhenUnthrottled(t *testing.T) { + a := newTestActiveThreads(t, 8) + start := time.Now() + a.BlockWait(t.Context()) + require.Less(t, time.Since(start), 50*time.Millisecond) +} + +func TestActiveThreads_BlockWaitRespectsContext(t *testing.T) { + a := newTestActiveThreads(t, 8) + a.isThrottled.Store(true) + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + start := time.Now() + a.BlockWait(ctx) + require.Less(t, time.Since(start), 200*time.Millisecond) +} + +func TestActiveThreads_BlockWaitReturnsWhenThrottlingClears(t *testing.T) { + prev := blockWaitInterval + blockWaitInterval = 10 * time.Millisecond + t.Cleanup(func() { blockWaitInterval = prev }) + + a := newTestActiveThreads(t, 8) + a.isThrottled.Store(true) + + go func() { + time.Sleep(30 * time.Millisecond) + a.isThrottled.Store(false) + }() + + start := time.Now() + a.BlockWait(t.Context()) + elapsed := time.Since(start) + require.GreaterOrEqual(t, elapsed, 20*time.Millisecond) + require.Less(t, elapsed, 500*time.Millisecond) +} + +func TestNewActiveThreadsThrottler_RejectsNilDB(t *testing.T) { + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + _, err := NewActiveThreadsThrottler(nil, logger) + require.ErrorContains(t, err, "non-nil DB") +} + +func TestCanReadActiveThreads_LocalMySQL(t *testing.T) { + db, err := sql.Open("mysql", testutils.DSN()) + require.NoError(t, err) + defer utils.CloseAndLog(db) + + // On a stock MySQL with perf_schema enabled the query should run cleanly + // even though there's no Aurora-specific data — it's a vanilla perf- + // schema join. If this ever returns false on a healthy MySQL we want to + // know, since runner.setupThrottler depends on it as the gate. + ok, err := CanReadActiveThreads(t.Context(), db) + require.NoError(t, err) + require.True(t, ok) +} diff --git a/pkg/throttler/commitlatency.go b/pkg/throttler/aurora_commitlatency.go similarity index 100% rename from pkg/throttler/commitlatency.go rename to pkg/throttler/aurora_commitlatency.go diff --git a/pkg/throttler/commitlatency_test.go b/pkg/throttler/aurora_commitlatency_test.go similarity index 100% rename from pkg/throttler/commitlatency_test.go rename to pkg/throttler/aurora_commitlatency_test.go diff --git a/pkg/throttler/aurora_test.go b/pkg/throttler/aurora_test.go new file mode 100644 index 00000000..16997583 --- /dev/null +++ b/pkg/throttler/aurora_test.go @@ -0,0 +1,89 @@ +package throttler + +import ( + "database/sql" + "errors" + "io" + "log/slog" + "testing" + "time" + + "github.com/block/spirit/pkg/testutils" + "github.com/block/spirit/pkg/utils" + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/require" +) + +func discardLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +func TestAuroraSetup_DisabledByThresholdReturnsEmpty(t *testing.T) { + // CommitLatencyThreshold <= 0 means the operator opted out. The helper + // must not probe the source, must not call OpenMonitor, and must + // return a zero result so the caller's append() / nil-check logic + // stays a no-op. + openCalled := false + res, err := AuroraSetup{ + Source: nil, // unused — proves we never touched it + OpenMonitor: func() (*sql.DB, error) { + openCalled = true + return nil, nil + }, + CommitLatencyThreshold: 0, + Logger: discardLogger(), + }.Build(t.Context()) + require.NoError(t, err) + require.Nil(t, res.Throttlers) + require.Nil(t, res.MonitorDB) + require.False(t, openCalled, "OpenMonitor must not be called when threshold disables the helper") +} + +func TestAuroraSetup_NonAuroraReturnsEmpty(t *testing.T) { + // Real local MySQL: IsAurora returns false (no AuroraDb_* vars). The + // helper must skip and not open a monitor pool — that's the gate we + // rely on to keep non-Aurora callers from paying any cost. + db, err := sql.Open("mysql", testutils.DSN()) + require.NoError(t, err) + defer utils.CloseAndLog(db) + + openCalled := false + res, err := AuroraSetup{ + Source: db, + OpenMonitor: func() (*sql.DB, error) { + openCalled = true + return nil, errors.New("must not be called") + }, + CommitLatencyThreshold: 100 * time.Millisecond, + Logger: discardLogger(), + }.Build(t.Context()) + require.NoError(t, err) + require.Nil(t, res.Throttlers) + require.Nil(t, res.MonitorDB) + require.False(t, openCalled, "OpenMonitor must not be called on a non-Aurora source") +} + +func TestAuroraSetup_IsAuroraProbeFailureIsNonFatal(t *testing.T) { + // Closed DB → IsAurora returns an error (driver/sql.ErrConnDone-style). + // The helper logs at Debug and returns a zero result rather than + // failing the whole migration; we'd rather skip Aurora throttling + // than refuse to migrate on a transient probe failure. + db, err := sql.Open("mysql", testutils.DSN()) + require.NoError(t, err) + require.NoError(t, db.Close()) + + openCalled := false + res, err := AuroraSetup{ + Source: db, + OpenMonitor: func() (*sql.DB, error) { + openCalled = true + return nil, errors.New("must not be called") + }, + CommitLatencyThreshold: 100 * time.Millisecond, + Logger: discardLogger(), + }.Build(t.Context()) + require.NoError(t, err, "probe failure should be non-fatal") + require.Nil(t, res.Throttlers) + require.Nil(t, res.MonitorDB) + require.False(t, openCalled, "OpenMonitor must not be called when the probe fails") +} From be7346966768f677345769c0bf3d72b10c9db787 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Mon, 25 May 2026 12:14:37 -0600 Subject: [PATCH 2/3] copilot --- pkg/migration/runner.go | 17 +++++--- pkg/throttler/aurora.go | 78 ++++++++++++++++++++++-------------- pkg/throttler/aurora_test.go | 36 ++++++++++------- 3 files changed, 81 insertions(+), 50 deletions(-) diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index dacfaf04..e9559e23 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -702,14 +702,19 @@ func (r *Runner) setupThrottler(ctx context.Context) error { } // Aurora throttlers — assembled by the shared throttler.AuroraSetup - // helper so the move runner can use the same wiring. Build returns a - // zero result on non-Aurora sources (and when MaxCommitLatency is 0), - // which lets us treat the helper as unconditional here. + // helper so the move runner can use the same wiring. The two Aurora + // throttlers (commit-latency, active-threads) have independent gates: + // setting MaxCommitLatency=0 disables only commit-latency, leaving + // active-threads enabled when Aurora is detected and the user has + // SELECT on performance_schema.threads + events_waits_current. Build + // returns a zero result on non-Aurora sources so this call is safe + // to make unconditionally. // // OpenMonitor is invoked lazily by the helper only after IsAurora - // returns true, so non-Aurora users never pay the connect cost. - // MaxOpenConnections=2 lets both Aurora throttlers poll concurrently - // without serializing on a single conn, with a touch of headroom. + // returns true AND at least one throttler will be built, so non- + // Aurora users never pay the connect cost. MaxOpenConnections=2 lets + // both Aurora throttlers poll concurrently without serializing on a + // single conn, with a touch of headroom. auroraRes, err := throttler.AuroraSetup{ Source: r.db, OpenMonitor: func() (*sql.DB, error) { diff --git a/pkg/throttler/aurora.go b/pkg/throttler/aurora.go index 13c732d4..e07c6651 100644 --- a/pkg/throttler/aurora.go +++ b/pkg/throttler/aurora.go @@ -16,6 +16,9 @@ import ( // The throttler package intentionally does not import dbconn — opening the // monitor pool happens via the caller-supplied OpenMonitor closure, which // lets the caller own DSN, TLS, and pool sizing. +// +// The two Aurora throttlers are independent signals and have independent +// gates. Disabling one does not disable the other — see Build for details. type AuroraSetup struct { // Source is the caller's main *sql.DB. Used only for the one-shot // IsAurora and CanReadActiveThreads probes — these are cheap and run @@ -24,14 +27,16 @@ type AuroraSetup struct { // OpenMonitor opens a dedicated *sql.DB used exclusively by the Aurora // throttlers for recurring polls. Called at most once, only after - // IsAurora has returned true, so non-Aurora callers never pay the + // IsAurora has returned true and at least one Aurora throttler is + // going to be constructed, so non-Aurora callers never pay the // connect cost. The caller owns closing the returned DB — see // AuroraResult.MonitorDB. OpenMonitor func() (*sql.DB, error) - // CommitLatencyThreshold gates the whole Aurora throttler family. A - // non-positive value disables both throttlers entirely (this is how - // callers opt out without touching their detection logic). + // CommitLatencyThreshold gates the commit-latency throttler. A non- + // positive value disables that throttler only — the active-threads + // throttler is independent and is still enabled when Aurora is + // detected and the privilege probe succeeds. CommitLatencyThreshold time.Duration Logger *slog.Logger @@ -47,28 +52,29 @@ type AuroraResult struct { // Build probes the source for Aurora and assembles the Aurora throttlers. // +// The two Aurora throttlers have independent gates: +// - Commit-latency is enabled when CommitLatencyThreshold > 0. +// - Active-threads is enabled when Aurora is detected and the user has +// SELECT on performance_schema.threads and events_waits_current. +// // Returns a zero AuroraResult (nil throttlers, nil monitor DB, nil error) in // any of these benign cases: -// - CommitLatencyThreshold <= 0 (caller opted out) // - IsAurora probe failed (non-Aurora source, or perf_schema not readable — // logged at Debug so the non-Aurora common case stays quiet) // - IsAurora returned false +// - Aurora was detected but neither gate produced a throttler (commit- +// latency disabled by threshold AND active-threads denied by privilege) // // Returns a non-nil error only when something the caller almost certainly // wants to surface goes wrong: OpenMonitor itself fails, or constructing a // throttler fails for a reason other than "missing privileges" (which is // expected and downgraded to Info — see CanReadActiveThreads handling). // -// On a successful Aurora build, the monitor pool is opened and both -// throttlers (or just commit-latency if active-threads grants are missing) -// are returned. The caller composes them via NewMultiThrottler with whatever -// other throttlers it has and is responsible for calling Close on each -// throttler AND Close on MonitorDB at shutdown. +// On a successful Aurora build, the monitor pool is opened and the enabled +// throttlers are returned. The caller composes them via NewMultiThrottler +// with whatever other throttlers it has and is responsible for calling +// Close on each throttler AND Close on MonitorDB at shutdown. func (s AuroraSetup) Build(ctx context.Context) (AuroraResult, error) { - if s.CommitLatencyThreshold <= 0 { - return AuroraResult{}, nil - } - isAurora, err := IsAurora(ctx, s.Source) switch { case err != nil: @@ -80,28 +86,42 @@ func (s AuroraSetup) Build(ctx context.Context) (AuroraResult, error) { return AuroraResult{}, nil } + // Decide independently which throttlers will be built before opening + // the monitor pool, so we don't open a pool we'd immediately discard. + enableCommitLatency := s.CommitLatencyThreshold > 0 + enableActiveThreads := true + if ok, probeErr := CanReadActiveThreads(ctx, s.Source); probeErr != nil { + // Surface at Info because Aurora is confirmed and the operator + // likely expected this throttler to be enabled. + s.Logger.Info("Aurora active-threads throttler disabled: grant SELECT on performance_schema.threads and performance_schema.events_waits_current to enable", + "error", probeErr) + enableActiveThreads = false + } else if !ok { + enableActiveThreads = false + } + if !enableCommitLatency && !enableActiveThreads { + return AuroraResult{}, nil + } + monitorDB, err := s.OpenMonitor() if err != nil { return AuroraResult{}, fmt.Errorf("could not open monitor DB for Aurora throttlers: %w", err) } - cl, err := NewCommitLatencyThrottler(monitorDB, s.CommitLatencyThreshold, s.Logger) - if err != nil { - _ = monitorDB.Close() - return AuroraResult{}, fmt.Errorf("could not create commit-latency throttler: %w", err) + var throttlers []Throttler + + if enableCommitLatency { + cl, err := NewCommitLatencyThrottler(monitorDB, s.CommitLatencyThreshold, s.Logger) + if err != nil { + _ = monitorDB.Close() + return AuroraResult{}, fmt.Errorf("could not create commit-latency throttler: %w", err) + } + s.Logger.Info("Aurora detected, enabling commit-latency throttler", + "threshold", s.CommitLatencyThreshold) + throttlers = append(throttlers, cl) } - s.Logger.Info("Aurora detected, enabling commit-latency throttler", - "threshold", s.CommitLatencyThreshold) - throttlers := []Throttler{cl} - // Active-threads has its own privilege gate — perf-schema table grants - // are independent of global_status. On confirmed Aurora a missing grant - // is operator-visible (we say so at Info), but it doesn't stop us from - // using the commit-latency signal. - if ok, err := CanReadActiveThreads(ctx, s.Source); err != nil { - s.Logger.Info("Aurora active-threads throttler disabled: grant SELECT on performance_schema.threads and performance_schema.events_waits_current to enable", - "error", err) - } else if ok { + if enableActiveThreads { at, err := NewActiveThreadsThrottler(monitorDB, s.Logger) if err != nil { _ = monitorDB.Close() diff --git a/pkg/throttler/aurora_test.go b/pkg/throttler/aurora_test.go index 16997583..db07d945 100644 --- a/pkg/throttler/aurora_test.go +++ b/pkg/throttler/aurora_test.go @@ -18,31 +18,37 @@ func discardLogger() *slog.Logger { return slog.New(slog.NewTextHandler(io.Discard, nil)) } -func TestAuroraSetup_DisabledByThresholdReturnsEmpty(t *testing.T) { - // CommitLatencyThreshold <= 0 means the operator opted out. The helper - // must not probe the source, must not call OpenMonitor, and must - // return a zero result so the caller's append() / nil-check logic - // stays a no-op. +func TestAuroraSetup_NonAuroraReturnsEmpty(t *testing.T) { + // Real local MySQL: IsAurora returns false (no AuroraDb_* vars). The + // helper must skip and not open a monitor pool — that's the gate we + // rely on to keep non-Aurora callers from paying any cost. + db, err := sql.Open("mysql", testutils.DSN()) + require.NoError(t, err) + defer utils.CloseAndLog(db) + openCalled := false res, err := AuroraSetup{ - Source: nil, // unused — proves we never touched it + Source: db, OpenMonitor: func() (*sql.DB, error) { openCalled = true - return nil, nil + return nil, errors.New("must not be called") }, - CommitLatencyThreshold: 0, + CommitLatencyThreshold: 100 * time.Millisecond, Logger: discardLogger(), }.Build(t.Context()) require.NoError(t, err) require.Nil(t, res.Throttlers) require.Nil(t, res.MonitorDB) - require.False(t, openCalled, "OpenMonitor must not be called when threshold disables the helper") + require.False(t, openCalled, "OpenMonitor must not be called on a non-Aurora source") } -func TestAuroraSetup_NonAuroraReturnsEmpty(t *testing.T) { - // Real local MySQL: IsAurora returns false (no AuroraDb_* vars). The - // helper must skip and not open a monitor pool — that's the gate we - // rely on to keep non-Aurora callers from paying any cost. +func TestAuroraSetup_NonAuroraSkipsMonitorEvenWithZeroThreshold(t *testing.T) { + // Regression for PR #880: CommitLatencyThreshold=0 used to short- + // circuit Build before probing IsAurora, which silently disabled the + // active-threads throttler too. The two gates are now independent, + // so Build must still probe IsAurora when threshold is 0 — but on a + // non-Aurora source it still ends up with no throttlers and must not + // open a monitor pool. db, err := sql.Open("mysql", testutils.DSN()) require.NoError(t, err) defer utils.CloseAndLog(db) @@ -54,13 +60,13 @@ func TestAuroraSetup_NonAuroraReturnsEmpty(t *testing.T) { openCalled = true return nil, errors.New("must not be called") }, - CommitLatencyThreshold: 100 * time.Millisecond, + CommitLatencyThreshold: 0, Logger: discardLogger(), }.Build(t.Context()) require.NoError(t, err) require.Nil(t, res.Throttlers) require.Nil(t, res.MonitorDB) - require.False(t, openCalled, "OpenMonitor must not be called on a non-Aurora source") + require.False(t, openCalled, "OpenMonitor must not be called on a non-Aurora source even with threshold=0") } func TestAuroraSetup_IsAuroraProbeFailureIsNonFatal(t *testing.T) { From a0a336c5977d6c4b6b86307f80c6a8afb628ef54 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Mon, 25 May 2026 13:22:06 -0600 Subject: [PATCH 3/3] copilot --- pkg/throttler/aurora.go | 80 +++++++++++++++++----- pkg/throttler/aurora_activethreads.go | 16 ++--- pkg/throttler/aurora_activethreads_test.go | 8 +-- pkg/throttler/aurora_test.go | 19 +++++ 4 files changed, 94 insertions(+), 29 deletions(-) diff --git a/pkg/throttler/aurora.go b/pkg/throttler/aurora.go index e07c6651..da294b67 100644 --- a/pkg/throttler/aurora.go +++ b/pkg/throttler/aurora.go @@ -3,9 +3,23 @@ package throttler import ( "context" "database/sql" + "errors" "fmt" "log/slog" "time" + + "github.com/go-sql-driver/mysql" +) + +// MySQL error codes for "you don't have permission" failures. The active- +// threads probe distinguishes these from other errors so the log message +// can suggest a concrete fix (grant SELECT) when it's actually a grants +// problem, and avoid that misleading suggestion otherwise. +const ( + errAccessDenied = 1045 // ER_ACCESS_DENIED_ERROR + errDBAccessDenied = 1044 // ER_DBACCESS_DENIED_ERROR + errTableAccessDenied = 1142 // ER_TABLEACCESS_DENIED_ERROR (SELECT on a table denied) + errSpecificAccessDenied = 1227 // ER_SPECIFIC_ACCESS_DENIED_ERROR ) // AuroraSetup orchestrates probing for Aurora and assembling the Aurora- @@ -63,18 +77,28 @@ type AuroraResult struct { // logged at Debug so the non-Aurora common case stays quiet) // - IsAurora returned false // - Aurora was detected but neither gate produced a throttler (commit- -// latency disabled by threshold AND active-threads denied by privilege) -// -// Returns a non-nil error only when something the caller almost certainly -// wants to surface goes wrong: OpenMonitor itself fails, or constructing a -// throttler fails for a reason other than "missing privileges" (which is -// expected and downgraded to Info — see CanReadActiveThreads handling). +// latency disabled by threshold AND active-threads probe failed for +// any reason — privilege denied or otherwise, both downgraded to Info) // -// On a successful Aurora build, the monitor pool is opened and the enabled -// throttlers are returned. The caller composes them via NewMultiThrottler -// with whatever other throttlers it has and is responsible for calling -// Close on each throttler AND Close on MonitorDB at shutdown. +// Returns a non-nil error only for setup failures the caller almost +// certainly wants to surface: nil required fields, OpenMonitor failing, or +// throttler construction failing. The active-threads privilege probe is +// best-effort by design — its failure (whatever the cause) disables that +// throttler but never aborts the migration. func (s AuroraSetup) Build(ctx context.Context) (AuroraResult, error) { + // Validate required fields up-front. AuroraSetup is an exported struct + // and these are all dereferenced unconditionally inside Build; a + // descriptive error beats a nil-pointer panic. + if s.Source == nil { + return AuroraResult{}, errors.New("AuroraSetup.Source is required") + } + if s.OpenMonitor == nil { + return AuroraResult{}, errors.New("AuroraSetup.OpenMonitor is required") + } + if s.Logger == nil { + return AuroraResult{}, errors.New("AuroraSetup.Logger is required") + } + isAurora, err := IsAurora(ctx, s.Source) switch { case err != nil: @@ -90,13 +114,20 @@ func (s AuroraSetup) Build(ctx context.Context) (AuroraResult, error) { // the monitor pool, so we don't open a pool we'd immediately discard. enableCommitLatency := s.CommitLatencyThreshold > 0 enableActiveThreads := true - if ok, probeErr := CanReadActiveThreads(ctx, s.Source); probeErr != nil { + if probeErr := CanReadActiveThreads(ctx, s.Source); probeErr != nil { // Surface at Info because Aurora is confirmed and the operator - // likely expected this throttler to be enabled. - s.Logger.Info("Aurora active-threads throttler disabled: grant SELECT on performance_schema.threads and performance_schema.events_waits_current to enable", - "error", probeErr) - enableActiveThreads = false - } else if !ok { + // likely expected this throttler to be enabled. Distinguish + // "looks like a grants problem" from other failures so the log + // message only suggests `GRANT SELECT` when that's plausibly + // the fix — a transient network error shouldn't send operators + // down a fruitless permissions investigation. + if isPrivilegeDeniedError(probeErr) { + s.Logger.Info("Aurora active-threads throttler disabled: grant SELECT on performance_schema.threads and performance_schema.events_waits_current to enable", + "error", probeErr) + } else { + s.Logger.Info("Aurora active-threads throttler disabled: probe failed", + "error", probeErr) + } enableActiveThreads = false } if !enableCommitLatency && !enableActiveThreads { @@ -132,3 +163,20 @@ func (s AuroraSetup) Build(ctx context.Context) (AuroraResult, error) { return AuroraResult{Throttlers: throttlers, MonitorDB: monitorDB}, nil } + +// isPrivilegeDeniedError reports whether err looks like the MySQL server +// refusing the query for permissions reasons (vs. network, syntax, or +// missing-table errors). Used to tailor the active-threads probe-failure +// log message. +func isPrivilegeDeniedError(err error) bool { + var me *mysql.MySQLError + if !errors.As(err, &me) { + return false + } + switch me.Number { + case errAccessDenied, errDBAccessDenied, errTableAccessDenied, errSpecificAccessDenied: + return true + default: + return false + } +} diff --git a/pkg/throttler/aurora_activethreads.go b/pkg/throttler/aurora_activethreads.go index 068a0c5e..53aee378 100644 --- a/pkg/throttler/aurora_activethreads.go +++ b/pkg/throttler/aurora_activethreads.go @@ -46,12 +46,12 @@ const ( WHERE pps.PROCESSLIST_ID IS NOT NULL AND pps.PROCESSLIST_COMMAND = 'Query'` - // auroravCPUsQuery reads the vCPU count from @@innodb_purge_threads. + // auroraVCPUsQuery reads the vCPU count from @@innodb_purge_threads. // Aurora pins this to the instance vCPU count (see issue #831). On RDS // MySQL the var also exists but its value tracks Aurora-style sizing // only on Aurora — we gate the throttler on IsAurora, so this query is // only ever issued there. - auroravCPUsQuery = `SELECT @@innodb_purge_threads` + auroraVCPUsQuery = `SELECT @@innodb_purge_threads` ) // activeThreadsPollInterval mirrors commitLatencyPollInterval — we want fast @@ -60,18 +60,18 @@ const ( var activeThreadsPollInterval = 5 * time.Second // CanReadActiveThreads probes whether the current user can run the active- -// threads query. Returns (true, nil) when the query runs cleanly, (false, -// err) when it fails (typically: SELECT on performance_schema.threads or +// threads query. Returns nil when the query runs cleanly, a wrapped error +// when it fails (typically: SELECT on performance_schema.threads or // events_waits_current denied). Called from runner setup so we can skip the // throttler quietly on accounts without perf-schema read access — IsAurora // already proved global_status is readable, but perf-schema table grants are // independent. -func CanReadActiveThreads(ctx context.Context, db *sql.DB) (bool, error) { +func CanReadActiveThreads(ctx context.Context, db *sql.DB) error { var n int64 if err := db.QueryRowContext(ctx, activeThreadsQuery).Scan(&n); err != nil { - return false, fmt.Errorf("probing active-threads query (requires SELECT on performance_schema.threads, events_waits_current): %w", err) + return fmt.Errorf("probing active-threads query (requires SELECT on performance_schema.threads, events_waits_current): %w", err) } - return true, nil + return nil } // ActiveThreads throttles when the count of query-running threads (excluding @@ -109,7 +109,7 @@ func NewActiveThreadsThrottler(db *sql.DB, logger *slog.Logger) (*ActiveThreads, } func (a *ActiveThreads) Open(ctx context.Context) error { - if err := a.db.QueryRowContext(ctx, auroravCPUsQuery).Scan(&a.vCPUs); err != nil { + if err := a.db.QueryRowContext(ctx, auroraVCPUsQuery).Scan(&a.vCPUs); err != nil { return fmt.Errorf("reading @@innodb_purge_threads for vCPU count: %w", err) } if a.vCPUs <= 0 { diff --git a/pkg/throttler/aurora_activethreads_test.go b/pkg/throttler/aurora_activethreads_test.go index d1d5035b..faf6a674 100644 --- a/pkg/throttler/aurora_activethreads_test.go +++ b/pkg/throttler/aurora_activethreads_test.go @@ -104,9 +104,7 @@ func TestCanReadActiveThreads_LocalMySQL(t *testing.T) { // On a stock MySQL with perf_schema enabled the query should run cleanly // even though there's no Aurora-specific data — it's a vanilla perf- - // schema join. If this ever returns false on a healthy MySQL we want to - // know, since runner.setupThrottler depends on it as the gate. - ok, err := CanReadActiveThreads(t.Context(), db) - require.NoError(t, err) - require.True(t, ok) + // schema join. If this ever errors on a healthy MySQL we want to know, + // since runner.setupThrottler depends on it as the gate. + require.NoError(t, CanReadActiveThreads(t.Context(), db)) } diff --git a/pkg/throttler/aurora_test.go b/pkg/throttler/aurora_test.go index db07d945..c87ac623 100644 --- a/pkg/throttler/aurora_test.go +++ b/pkg/throttler/aurora_test.go @@ -69,6 +69,25 @@ func TestAuroraSetup_NonAuroraSkipsMonitorEvenWithZeroThreshold(t *testing.T) { require.False(t, openCalled, "OpenMonitor must not be called on a non-Aurora source even with threshold=0") } +func TestAuroraSetup_RejectsNilRequiredFields(t *testing.T) { + db, err := sql.Open("mysql", testutils.DSN()) + require.NoError(t, err) + defer utils.CloseAndLog(db) + openMon := func() (*sql.DB, error) { return nil, nil } + logger := discardLogger() + + // Each required field, exercised in isolation. Without these guards + // Build would NPE deep in the probe / OpenMonitor / log calls. + _, err = AuroraSetup{Source: nil, OpenMonitor: openMon, Logger: logger}.Build(t.Context()) + require.ErrorContains(t, err, "Source is required") + + _, err = AuroraSetup{Source: db, OpenMonitor: nil, Logger: logger}.Build(t.Context()) + require.ErrorContains(t, err, "OpenMonitor is required") + + _, err = AuroraSetup{Source: db, OpenMonitor: openMon, Logger: nil}.Build(t.Context()) + require.ErrorContains(t, err, "Logger is required") +} + func TestAuroraSetup_IsAuroraProbeFailureIsNonFatal(t *testing.T) { // Closed DB → IsAurora returns an error (driver/sql.ErrConnDone-style). // The helper logs at Debug and returns a zero result rather than