diff --git a/pkg/app/piped/cmd/piped/piped.go b/pkg/app/piped/cmd/piped/piped.go index 4bfe8bd183..86909fab15 100644 --- a/pkg/app/piped/cmd/piped/piped.go +++ b/pkg/app/piped/cmd/piped/piped.go @@ -526,6 +526,13 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { input.Logger.Info("successfully cleaned gitClient for plan-preview") }() + ppOpts := []planpreview.Option{ + planpreview.WithLogger(input.Logger), + planpreview.WithWorkerNum(cfg.PlanPreview.WorkerNum), + planpreview.WithCommandQueueBufferSize(cfg.PlanPreview.CommandQueueBufferSize), + planpreview.WithCommandCheckInterval(cfg.PlanPreview.CommandCheckInterval.Duration()), + planpreview.WithCommandHandleTimeout(cfg.PlanPreview.CommandHandleTimeout.Duration()), + } h := planpreview.NewHandler( gc, apiClient, @@ -535,7 +542,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { decrypter, appManifestsCache, cfg, - planpreview.WithLogger(input.Logger), + ppOpts..., ) group.Go(func() error { return h.Run(ctx) diff --git a/pkg/app/piped/planpreview/handler.go b/pkg/app/piped/planpreview/handler.go index 395e5972aa..83dd717df3 100644 --- a/pkg/app/piped/planpreview/handler.go +++ b/pkg/app/piped/planpreview/handler.go @@ -51,25 +51,33 @@ type Option func(*options) func WithWorkerNum(n int) Option { return func(opts *options) { - opts.workerNum = n + if n > 0 { + opts.workerNum = n + } } } func WithCommandQueueBufferSize(s int) Option { return func(opts *options) { - opts.commandQueueBufferSize = s + if s > 0 { + opts.commandQueueBufferSize = s + } } } func WithCommandCheckInterval(i time.Duration) Option { return func(opts *options) { - opts.commandCheckInterval = i + if i > 0 { + opts.commandCheckInterval = i + } } } func WithCommandHandleTimeout(t time.Duration) Option { return func(opts *options) { - opts.commandHandleTimeout = t + if t > 0 { + opts.commandHandleTimeout = t + } } } diff --git a/pkg/app/pipedv1/cmd/piped/piped.go b/pkg/app/pipedv1/cmd/piped/piped.go index 099529cc23..eb10091c82 100644 --- a/pkg/app/pipedv1/cmd/piped/piped.go +++ b/pkg/app/pipedv1/cmd/piped/piped.go @@ -491,6 +491,13 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { input.Logger.Info("successfully cleaned gitClient for plan-preview") }() + ppOpts := []planpreview.Option{ + planpreview.WithLogger(input.Logger), + planpreview.WithWorkerNum(cfg.PlanPreview.WorkerNum), + planpreview.WithCommandQueueBufferSize(cfg.PlanPreview.CommandQueueBufferSize), + planpreview.WithCommandCheckInterval(cfg.PlanPreview.CommandCheckInterval.Duration()), + planpreview.WithCommandHandleTimeout(cfg.PlanPreview.CommandHandleTimeout.Duration()), + } h := planpreview.NewHandler( gc, apiClient, @@ -500,7 +507,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { decrypter, cfg, pluginRegistry, - planpreview.WithLogger(input.Logger), + ppOpts..., ) group.Go(func() error { return h.Run(ctx) diff --git a/pkg/app/pipedv1/planpreview/handler.go b/pkg/app/pipedv1/planpreview/handler.go index d2c4f99b85..ccd0260e5f 100644 --- a/pkg/app/pipedv1/planpreview/handler.go +++ b/pkg/app/pipedv1/planpreview/handler.go @@ -51,25 +51,33 @@ type Option func(*options) func WithWorkerNum(n int) Option { return func(opts *options) { - opts.workerNum = n + if n > 0 { + opts.workerNum = n + } } } func WithCommandQueueBufferSize(s int) Option { return func(opts *options) { - opts.commandQueueBufferSize = s + if s > 0 { + opts.commandQueueBufferSize = s + } } } func WithCommandCheckInterval(i time.Duration) Option { return func(opts *options) { - opts.commandCheckInterval = i + if i > 0 { + opts.commandCheckInterval = i + } } } func WithCommandHandleTimeout(t time.Duration) Option { return func(opts *options) { - opts.commandHandleTimeout = t + if t > 0 { + opts.commandHandleTimeout = t + } } } diff --git a/pkg/config/piped.go b/pkg/config/piped.go index 7bc418fe74..8219c58047 100644 --- a/pkg/config/piped.go +++ b/pkg/config/piped.go @@ -79,6 +79,8 @@ type PipedSpec struct { SecretManagement *SecretManagement `json:"secretManagement,omitempty"` // Optional settings for event watcher. EventWatcher PipedEventWatcher `json:"eventWatcher"` + // Optional settings for plan-preview command handling. + PlanPreview PipedPlanPreview `json:"planPreview"` // List of labels to filter all applications this piped will handle. AppSelector map[string]string `json:"appSelector,omitempty"` } @@ -141,6 +143,9 @@ func (s *PipedSpec) Validate() error { if err := s.EventWatcher.Validate(); err != nil { return err } + if err := s.PlanPreview.Validate(); err != nil { + return err + } for _, n := range s.Notifications.Receivers { if n.Slack != nil { if err := n.Slack.Validate(); err != nil { @@ -1211,3 +1216,30 @@ type PipedEventWatcherGitRepo struct { // This is prioritized if both includes and this one are given. Excludes []string `json:"excludes,omitempty"` } + +type PipedPlanPreview struct { + // WorkerNum is the number of worker goroutines processing plan-preview commands. + WorkerNum int `json:"workerNum,omitempty"` + // CommandQueueBufferSize is the buffer size of the internal command channel. + CommandQueueBufferSize int `json:"commandQueueBufferSize,omitempty"` + // CommandCheckInterval is how often to poll for new plan-preview commands. + CommandCheckInterval Duration `json:"commandCheckInterval,omitempty"` + // CommandHandleTimeout is the default timeout for building each plan-preview result when the command does not specify one. + CommandHandleTimeout Duration `json:"commandHandleTimeout,omitempty"` +} + +func (p *PipedPlanPreview) Validate() error { + if p.WorkerNum < 0 { + return errors.New("planPreview.workerNum must be greater than or equal to 0") + } + if p.CommandQueueBufferSize < 0 { + return errors.New("planPreview.commandQueueBufferSize must be greater than or equal to 0") + } + if p.CommandCheckInterval < 0 { + return errors.New("planPreview.commandCheckInterval must be greater than or equal to 0") + } + if p.CommandHandleTimeout < 0 { + return errors.New("planPreview.commandHandleTimeout must be greater than or equal to 0") + } + return nil +} diff --git a/pkg/config/piped_test.go b/pkg/config/piped_test.go index cabc18cb47..2cb8ab5155 100644 --- a/pkg/config/piped_test.go +++ b/pkg/config/piped_test.go @@ -361,6 +361,12 @@ func TestPipedConfig(t *testing.T) { }, }, }, + PlanPreview: PipedPlanPreview{ + WorkerNum: 5, + CommandQueueBufferSize: 20, + CommandCheckInterval: Duration(5 * time.Second), + CommandHandleTimeout: Duration(10 * time.Minute), + }, }, expectedError: nil, }, @@ -378,6 +384,95 @@ func TestPipedConfig(t *testing.T) { } } +func TestPipedPlanPreviewValidate(t *testing.T) { + testcases := []struct { + name string + planPreview PipedPlanPreview + wantErr bool + wantPipedPlanPreview PipedPlanPreview + }{ + { + name: "negative workerNum", + wantErr: true, + planPreview: PipedPlanPreview{ + WorkerNum: -1, + }, + wantPipedPlanPreview: PipedPlanPreview{ + WorkerNum: -1, + }, + }, + { + name: "negative commandQueueBufferSize", + wantErr: true, + planPreview: PipedPlanPreview{ + CommandQueueBufferSize: -1, + }, + wantPipedPlanPreview: PipedPlanPreview{ + CommandQueueBufferSize: -1, + }, + }, + { + name: "negative commandCheckInterval", + wantErr: true, + planPreview: PipedPlanPreview{ + CommandCheckInterval: Duration(-time.Second), + }, + wantPipedPlanPreview: PipedPlanPreview{ + CommandCheckInterval: Duration(-time.Second), + }, + }, + { + name: "negative commandHandleTimeout", + wantErr: true, + planPreview: PipedPlanPreview{ + CommandHandleTimeout: Duration(-time.Minute), + }, + wantPipedPlanPreview: PipedPlanPreview{ + CommandHandleTimeout: Duration(-time.Minute), + }, + }, + { + name: "all zero", + wantErr: false, + planPreview: PipedPlanPreview{ + WorkerNum: 0, + CommandQueueBufferSize: 0, + CommandCheckInterval: Duration(0), + CommandHandleTimeout: Duration(0), + }, + wantPipedPlanPreview: PipedPlanPreview{ + WorkerNum: 0, + CommandQueueBufferSize: 0, + CommandCheckInterval: Duration(0), + CommandHandleTimeout: Duration(0), + }, + }, + { + name: "valid values", + wantErr: false, + planPreview: PipedPlanPreview{ + WorkerNum: 5, + CommandQueueBufferSize: 20, + CommandCheckInterval: Duration(5 * time.Second), + CommandHandleTimeout: Duration(10 * time.Minute), + }, + wantPipedPlanPreview: PipedPlanPreview{ + WorkerNum: 5, + CommandQueueBufferSize: 20, + CommandCheckInterval: Duration(5 * time.Second), + CommandHandleTimeout: Duration(10 * time.Minute), + }, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + err := tc.planPreview.Validate() + assert.Equal(t, tc.wantErr, err != nil) + assert.Equal(t, tc.wantPipedPlanPreview, tc.planPreview) + }) + } +} + func TestPipedEventWatcherValidate(t *testing.T) { testcases := []struct { name string @@ -1137,6 +1232,12 @@ func TestPipedSpecClone(t *testing.T) { }, }, }, + PlanPreview: PipedPlanPreview{ + WorkerNum: 5, + CommandQueueBufferSize: 20, + CommandCheckInterval: Duration(5 * time.Second), + CommandHandleTimeout: Duration(10 * time.Minute), + }, }, expectedSpec: &PipedSpec{ ProjectID: "test-project", @@ -1335,6 +1436,12 @@ func TestPipedSpecClone(t *testing.T) { }, }, }, + PlanPreview: PipedPlanPreview{ + WorkerNum: 5, + CommandQueueBufferSize: 20, + CommandCheckInterval: Duration(5 * time.Second), + CommandHandleTimeout: Duration(10 * time.Minute), + }, }, expectedError: nil, }, diff --git a/pkg/config/testdata/piped/piped-config.yaml b/pkg/config/testdata/piped/piped-config.yaml index 9f92f437d9..610097969e 100644 --- a/pkg/config/testdata/piped/piped-config.yaml +++ b/pkg/config/testdata/piped/piped-config.yaml @@ -243,3 +243,9 @@ spec: includes: - event-watcher-dev.yaml - event-watcher-stg.yaml + + planPreview: + workerNum: 5 + commandQueueBufferSize: 20 + commandCheckInterval: 5s + commandHandleTimeout: 10m diff --git a/pkg/configv1/piped.go b/pkg/configv1/piped.go index d9fff0ee76..828c9f828d 100644 --- a/pkg/configv1/piped.go +++ b/pkg/configv1/piped.go @@ -64,6 +64,8 @@ type PipedSpec struct { SecretManagement *SecretManagement `json:"secretManagement,omitempty"` // Optional settings for event watcher. EventWatcher PipedEventWatcher `json:"eventWatcher"` + // Optional settings for plan-preview command handling. + PlanPreview PipedPlanPreview `json:"planPreview"` // List of labels to filter all applications this piped will handle. AppSelector map[string]string `json:"appSelector,omitempty"` } @@ -113,6 +115,9 @@ func (s *PipedSpec) Validate() error { if err := s.EventWatcher.Validate(); err != nil { return err } + if err := s.PlanPreview.Validate(); err != nil { + return err + } for _, n := range s.Notifications.Receivers { if n.Slack != nil { if err := n.Slack.Validate(); err != nil { @@ -632,6 +637,33 @@ type PipedEventWatcherGitRepo struct { Excludes []string `json:"excludes,omitempty"` } +type PipedPlanPreview struct { + // WorkerNum is the number of worker goroutines processing plan-preview commands. + WorkerNum int `json:"workerNum,omitempty"` + // CommandQueueBufferSize is the buffer size of the internal command channel. + CommandQueueBufferSize int `json:"commandQueueBufferSize,omitempty"` + // CommandCheckInterval is how often to poll for new plan-preview commands. + CommandCheckInterval Duration `json:"commandCheckInterval,omitempty"` + // CommandHandleTimeout is the default timeout for building each plan-preview result when the command does not specify one. + CommandHandleTimeout Duration `json:"commandHandleTimeout,omitempty"` +} + +func (p *PipedPlanPreview) Validate() error { + if p.WorkerNum < 0 { + return errors.New("planPreview.workerNum must be greater than or equal to 0") + } + if p.CommandQueueBufferSize < 0 { + return errors.New("planPreview.commandQueueBufferSize must be greater than or equal to 0") + } + if p.CommandCheckInterval < 0 { + return errors.New("planPreview.commandCheckInterval must be greater than or equal to 0") + } + if p.CommandHandleTimeout < 0 { + return errors.New("planPreview.commandHandleTimeout must be greater than or equal to 0") + } + return nil +} + // PipedPlugin defines the plugin configuration for the piped. type PipedPlugin struct { // The name of the plugin. diff --git a/pkg/configv1/piped_test.go b/pkg/configv1/piped_test.go index 0c444f877d..e24b8f578c 100644 --- a/pkg/configv1/piped_test.go +++ b/pkg/configv1/piped_test.go @@ -247,6 +247,12 @@ func TestPipedConfig(t *testing.T) { }, }, }, + PlanPreview: PipedPlanPreview{ + WorkerNum: 5, + CommandQueueBufferSize: 20, + CommandCheckInterval: Duration(5 * time.Second), + CommandHandleTimeout: Duration(10 * time.Minute), + }, }, expectedError: nil, }, @@ -264,6 +270,95 @@ func TestPipedConfig(t *testing.T) { } } +func TestPipedPlanPreviewValidate(t *testing.T) { + testcases := []struct { + name string + planPreview PipedPlanPreview + wantErr bool + wantPipedPlanPreview PipedPlanPreview + }{ + { + name: "negative workerNum", + wantErr: true, + planPreview: PipedPlanPreview{ + WorkerNum: -1, + }, + wantPipedPlanPreview: PipedPlanPreview{ + WorkerNum: -1, + }, + }, + { + name: "negative commandQueueBufferSize", + wantErr: true, + planPreview: PipedPlanPreview{ + CommandQueueBufferSize: -1, + }, + wantPipedPlanPreview: PipedPlanPreview{ + CommandQueueBufferSize: -1, + }, + }, + { + name: "negative commandCheckInterval", + wantErr: true, + planPreview: PipedPlanPreview{ + CommandCheckInterval: Duration(-time.Second), + }, + wantPipedPlanPreview: PipedPlanPreview{ + CommandCheckInterval: Duration(-time.Second), + }, + }, + { + name: "negative commandHandleTimeout", + wantErr: true, + planPreview: PipedPlanPreview{ + CommandHandleTimeout: Duration(-time.Minute), + }, + wantPipedPlanPreview: PipedPlanPreview{ + CommandHandleTimeout: Duration(-time.Minute), + }, + }, + { + name: "all zero", + wantErr: false, + planPreview: PipedPlanPreview{ + WorkerNum: 0, + CommandQueueBufferSize: 0, + CommandCheckInterval: Duration(0), + CommandHandleTimeout: Duration(0), + }, + wantPipedPlanPreview: PipedPlanPreview{ + WorkerNum: 0, + CommandQueueBufferSize: 0, + CommandCheckInterval: Duration(0), + CommandHandleTimeout: Duration(0), + }, + }, + { + name: "valid values", + wantErr: false, + planPreview: PipedPlanPreview{ + WorkerNum: 5, + CommandQueueBufferSize: 20, + CommandCheckInterval: Duration(5 * time.Second), + CommandHandleTimeout: Duration(10 * time.Minute), + }, + wantPipedPlanPreview: PipedPlanPreview{ + WorkerNum: 5, + CommandQueueBufferSize: 20, + CommandCheckInterval: Duration(5 * time.Second), + CommandHandleTimeout: Duration(10 * time.Minute), + }, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + err := tc.planPreview.Validate() + assert.Equal(t, tc.wantErr, err != nil) + assert.Equal(t, tc.wantPipedPlanPreview, tc.planPreview) + }) + } +} + func TestPipedEventWatcherValidate(t *testing.T) { testcases := []struct { name string @@ -765,6 +860,12 @@ func TestPipedSpecClone(t *testing.T) { }, }, }, + PlanPreview: PipedPlanPreview{ + WorkerNum: 5, + CommandQueueBufferSize: 20, + CommandCheckInterval: Duration(5 * time.Second), + CommandHandleTimeout: Duration(10 * time.Minute), + }, }, expectedSpec: &PipedSpec{ ProjectID: "test-project", @@ -855,6 +956,12 @@ func TestPipedSpecClone(t *testing.T) { }, }, }, + PlanPreview: PipedPlanPreview{ + WorkerNum: 5, + CommandQueueBufferSize: 20, + CommandCheckInterval: Duration(5 * time.Second), + CommandHandleTimeout: Duration(10 * time.Minute), + }, }, expectedError: nil, }, diff --git a/pkg/configv1/testdata/piped/piped-config.yaml b/pkg/configv1/testdata/piped/piped-config.yaml index 143007daa5..a4d451d134 100644 --- a/pkg/configv1/testdata/piped/piped-config.yaml +++ b/pkg/configv1/testdata/piped/piped-config.yaml @@ -201,3 +201,9 @@ spec: includes: - event-watcher-dev.yaml - event-watcher-stg.yaml + + planPreview: + workerNum: 5 + commandQueueBufferSize: 20 + commandCheckInterval: 5s + commandHandleTimeout: 10m