Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/app/piped/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,10 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
decrypter,
appManifestsCache,
cfg,
planpreview.WithWorkerNum(cfg.PlanPreview.WorkerNum),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this is dangerous, I would think a safer approach is checking whether these values from PlanPreview options are not zero value first: if it != 0, then add the option

For example

if cfg.PlanPreview.WorkerNum > 0 {
    planpreview.WithWorkerNum(cfg.PlanPreview.WorkerNum),
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh, good one. Yeah, how about I frame it like this, does it seem right now:

ppOpts := []planpreview.Option{
			planpreview.WithLogger(input.Logger),
		}
		if cfg.PlanPreview.WorkerNum > 0 {
			ppOpts = append(ppOpts, planpreview.WithWorkerNum(cfg.PlanPreview.WorkerNum))
		}
		if cfg.PlanPreview.CommandQueueBufferSize > 0 {
			ppOpts = append(ppOpts, planpreview.WithCommandQueueBufferSize(cfg.PlanPreview.CommandQueueBufferSize))
		}
		if cfg.PlanPreview.CommandCheckInterval > 0 {
			ppOpts = append(ppOpts, planpreview.WithCommandCheckInterval(cfg.PlanPreview.CommandCheckInterval.Duration()))
		}
		if cfg.PlanPreview.CommandHandleTimeout > 0 {
			ppOpts = append(ppOpts, planpreview.WithCommandHandleTimeout(cfg.PlanPreview.CommandHandleTimeout.Duration()))
		}

h := planpreview.NewHandler(
			//....
			ppOpts...,
		)

Copy link
Copy Markdown
Contributor

@armistcxy armistcxy Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like a good approach now, i think you should verify whether CommandQueueBufferSize = 0 is valid. Go has buffer size = 0 for channel right ^ ^, maybe this case can be similar

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I checked this, since we do make(chan ..., commandQueueBufferSize) in the handler, make(chan T, 0) is valid in Go (unbuffered), so CommandQueueBufferSize = 0 itself wouldn’t break anything.

in pkg/app/piped/planpreview/handler.go:

h := &Handler{
		gitClient:     gc,
		commandLister: cl,
		commandCh:     make(chan model.ReportableCommand, opt.commandQueueBufferSize),
		prevCommands:  map[string]struct{}{},
		options:       opt,
		logger:        opt.logger.Named("plan-preview-handler"),
	}

The tricky part is config: since it’s a plain int, omitted and 0 both become 0 after decode. So if we pass 0 through, even “not set” would behave like unbuffered instead of using the default (10), which might be unexpected.

Right now with the > 0 guard, both cases just fall back to the default, which I felt safer.

If we want to support unbuffered explicitly, we’d probably need a way to distinguish omitted vs 0.

What do you think, should we support that, or keep default-only for now?

planpreview.WithCommandQueueBufferSize(cfg.PlanPreview.CommandQueueBufferSize),
planpreview.WithCommandCheckInterval(cfg.PlanPreview.CommandCheckInterval.Duration()),
planpreview.WithCommandHandleTimeout(cfg.PlanPreview.CommandHandleTimeout.Duration()),
planpreview.WithLogger(input.Logger),
)
group.Go(func() error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/app/pipedv1/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,10 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
decrypter,
cfg,
pluginRegistry,
planpreview.WithWorkerNum(cfg.PlanPreview.WorkerNum),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as v0

planpreview.WithCommandQueueBufferSize(cfg.PlanPreview.CommandQueueBufferSize),
planpreview.WithCommandCheckInterval(cfg.PlanPreview.CommandCheckInterval.Duration()),
planpreview.WithCommandHandleTimeout(cfg.PlanPreview.CommandHandleTimeout.Duration()),
planpreview.WithLogger(input.Logger),
)
group.Go(func() error {
Expand Down
32 changes: 32 additions & 0 deletions pkg/config/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type PipedSpec struct {
SecretManagement *SecretManagement `json:"secretManagement,omitempty"`
// Optional settings for event watcher.
EventWatcher PipedEventWatcher `json:"eventWatcher"`
// Optional settings for plan-preview command handling.
PlanPreview PipedPlanPreview `json:"planPreview"`
// List of labels to filter all applications this piped will handle.
AppSelector map[string]string `json:"appSelector,omitempty"`
}
Expand Down Expand Up @@ -141,6 +143,9 @@ func (s *PipedSpec) Validate() error {
if err := s.EventWatcher.Validate(); err != nil {
return err
}
if err := s.PlanPreview.Validate(); err != nil {
return err
}
for _, n := range s.Notifications.Receivers {
if n.Slack != nil {
if err := n.Slack.Validate(); err != nil {
Expand Down Expand Up @@ -1211,3 +1216,30 @@ type PipedEventWatcherGitRepo struct {
// This is prioritized if both includes and this one are given.
Excludes []string `json:"excludes,omitempty"`
}

type PipedPlanPreview struct {
// Number of workers to handle plan-preview commands.
WorkerNum int `json:"workerNum,omitempty"`
// Channel buffer size used for plan-preview commands.
CommandQueueBufferSize int `json:"commandQueueBufferSize,omitempty"`
// Interval to fetch plan-preview commands.
CommandCheckInterval Duration `json:"commandCheckInterval,omitempty"`
// Timeout to handle each plan-preview command.
CommandHandleTimeout Duration `json:"commandHandleTimeout,omitempty"`
}

func (p *PipedPlanPreview) Validate() error {
if p.WorkerNum < 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You allow WorkerNum = 0, I scare that commands can be block by this allowance, please verify the behavior to find the right condition for validating

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I checked the handler behavior here in piped/planpreview/handler.go:

for i := 0; i < h.options.workerNum; i++ {
		go startWorker(ctx, h.commandCh)
	}

From the handler logic, workerNum = 0 would result in no workers being started, so commands wouldn’t be processed if it reached there as is.

In the current flow, we guard this at the callsite (> 0 check in piped.go), so a zero value isn’t passed to WithWorkerNum and the handler falls back to its default worker count.

I’m happy to enforce > 0 in Validate() as well if you think that’s preferable, though it may also affect cases where the field is omitted and defaults are expected.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the right thing to do here, but isn't that we are using defensive programming in a lot of place, I don't think this is a good idea

maybe just concentrate on validating config at one place, what do you think ?

Copy link
Copy Markdown
Contributor Author

@rawadhossain rawadhossain Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think that would be cleaner. I’ve moved the > 0 checks into the With* option functions in the handler, so the “0 = keep default” logic is handled in one place now.

Validate() is just handling invalid cases (like negatives). Since with plain int/Duration, omitted and 0 both decode the same after YAML parsing, we can’t really reject 0 there without breaking configs where users just omit the planPreview block.

Let me know your thoughts on this.

return errors.New("planPreview.workerNum must be greater than or equal to 0")
}
if p.CommandQueueBufferSize < 0 {
return errors.New("planPreview.commandQueueBufferSize must be greater than or equal to 0")
}
if p.CommandCheckInterval < 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CommandCheckInterval will later be used for creating time.Ticker

commandTicker := time.NewTicker(h.options.commandCheckInterval)

This will cause panic if h.options.commandCheckInterval = 0

Copy link
Copy Markdown
Contributor Author

@rawadhossain rawadhossain Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, time.NewTicker will panic for non positive durations, so this definitely needs to be guarded.

We handle this the same way as above in piped.go by passing the option when the value is > 0, so zero isn’t forwarded and the handler uses its default interval.

if cfg.PlanPreview.CommandCheckInterval > 0 {
    ppOpts = append(ppOpts, planpreview.WithCommandCheckInterval(cfg.PlanPreview.CommandCheckInterval.Duration()))
}

return errors.New("planPreview.commandCheckInterval must be greater than or equal to 0")
}
if p.CommandHandleTimeout < 0 {
return errors.New("planPreview.commandHandleTimeout must be greater than or equal to 0")
}
return nil
}
107 changes: 107 additions & 0 deletions pkg/config/piped_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,12 @@ func TestPipedConfig(t *testing.T) {
},
},
},
PlanPreview: PipedPlanPreview{
WorkerNum: 5,
CommandQueueBufferSize: 20,
CommandCheckInterval: Duration(5 * time.Second),
CommandHandleTimeout: Duration(10 * time.Minute),
},
},
expectedError: nil,
},
Expand All @@ -378,6 +384,95 @@ func TestPipedConfig(t *testing.T) {
}
}

func TestPipedPlanPreviewValidate(t *testing.T) {
testcases := []struct {
name string
planPreview PipedPlanPreview
wantErr bool
wantPipedPlanPreview PipedPlanPreview
}{
{
name: "negative workerNum",
wantErr: true,
planPreview: PipedPlanPreview{
WorkerNum: -1,
},
wantPipedPlanPreview: PipedPlanPreview{
WorkerNum: -1,
},
},
{
name: "negative commandQueueBufferSize",
wantErr: true,
planPreview: PipedPlanPreview{
CommandQueueBufferSize: -1,
},
wantPipedPlanPreview: PipedPlanPreview{
CommandQueueBufferSize: -1,
},
},
{
name: "negative commandCheckInterval",
wantErr: true,
planPreview: PipedPlanPreview{
CommandCheckInterval: Duration(-time.Second),
},
wantPipedPlanPreview: PipedPlanPreview{
CommandCheckInterval: Duration(-time.Second),
},
},
{
name: "negative commandHandleTimeout",
wantErr: true,
planPreview: PipedPlanPreview{
CommandHandleTimeout: Duration(-time.Minute),
},
wantPipedPlanPreview: PipedPlanPreview{
CommandHandleTimeout: Duration(-time.Minute),
},
},
{
name: "all zero",
wantErr: false,
planPreview: PipedPlanPreview{
WorkerNum: 0,
CommandQueueBufferSize: 0,
CommandCheckInterval: Duration(0),
CommandHandleTimeout: Duration(0),
},
wantPipedPlanPreview: PipedPlanPreview{
WorkerNum: 0,
CommandQueueBufferSize: 0,
CommandCheckInterval: Duration(0),
CommandHandleTimeout: Duration(0),
},
},
{
name: "valid values",
wantErr: false,
planPreview: PipedPlanPreview{
WorkerNum: 5,
CommandQueueBufferSize: 20,
CommandCheckInterval: Duration(5 * time.Second),
CommandHandleTimeout: Duration(10 * time.Minute),
},
wantPipedPlanPreview: PipedPlanPreview{
WorkerNum: 5,
CommandQueueBufferSize: 20,
CommandCheckInterval: Duration(5 * time.Second),
CommandHandleTimeout: Duration(10 * time.Minute),
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
err := tc.planPreview.Validate()
assert.Equal(t, tc.wantErr, err != nil)
assert.Equal(t, tc.wantPipedPlanPreview, tc.planPreview)
})
}
}

func TestPipedEventWatcherValidate(t *testing.T) {
testcases := []struct {
name string
Expand Down Expand Up @@ -1137,6 +1232,12 @@ func TestPipedSpecClone(t *testing.T) {
},
},
},
PlanPreview: PipedPlanPreview{
WorkerNum: 5,
CommandQueueBufferSize: 20,
CommandCheckInterval: Duration(5 * time.Second),
CommandHandleTimeout: Duration(10 * time.Minute),
},
},
expectedSpec: &PipedSpec{
ProjectID: "test-project",
Expand Down Expand Up @@ -1335,6 +1436,12 @@ func TestPipedSpecClone(t *testing.T) {
},
},
},
PlanPreview: PipedPlanPreview{
WorkerNum: 5,
CommandQueueBufferSize: 20,
CommandCheckInterval: Duration(5 * time.Second),
CommandHandleTimeout: Duration(10 * time.Minute),
},
},
expectedError: nil,
},
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/testdata/piped/piped-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,9 @@ spec:
includes:
- event-watcher-dev.yaml
- event-watcher-stg.yaml

planPreview:
workerNum: 5
commandQueueBufferSize: 20
commandCheckInterval: 5s
commandHandleTimeout: 10m
32 changes: 32 additions & 0 deletions pkg/configv1/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type PipedSpec struct {
SecretManagement *SecretManagement `json:"secretManagement,omitempty"`
// Optional settings for event watcher.
EventWatcher PipedEventWatcher `json:"eventWatcher"`
// Optional settings for plan-preview command handling.
PlanPreview PipedPlanPreview `json:"planPreview"`
// List of labels to filter all applications this piped will handle.
AppSelector map[string]string `json:"appSelector,omitempty"`
}
Expand Down Expand Up @@ -113,6 +115,9 @@ func (s *PipedSpec) Validate() error {
if err := s.EventWatcher.Validate(); err != nil {
return err
}
if err := s.PlanPreview.Validate(); err != nil {
return err
}
for _, n := range s.Notifications.Receivers {
if n.Slack != nil {
if err := n.Slack.Validate(); err != nil {
Expand Down Expand Up @@ -632,6 +637,33 @@ type PipedEventWatcherGitRepo struct {
Excludes []string `json:"excludes,omitempty"`
}

type PipedPlanPreview struct {
// Number of workers to handle plan-preview commands.
WorkerNum int `json:"workerNum,omitempty"`
// Channel buffer size used for plan-preview commands.
CommandQueueBufferSize int `json:"commandQueueBufferSize,omitempty"`
// Interval to fetch plan-preview commands.
CommandCheckInterval Duration `json:"commandCheckInterval,omitempty"`
// Timeout to handle each plan-preview command.
CommandHandleTimeout Duration `json:"commandHandleTimeout,omitempty"`
}

func (p *PipedPlanPreview) Validate() error {
if p.WorkerNum < 0 {
return errors.New("planPreview.workerNum must be greater than or equal to 0")
}
if p.CommandQueueBufferSize < 0 {
return errors.New("planPreview.commandQueueBufferSize must be greater than or equal to 0")
}
if p.CommandCheckInterval < 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same reason with v0, commandCheckInterval = 0 would cause panic

return errors.New("planPreview.commandCheckInterval must be greater than or equal to 0")
}
if p.CommandHandleTimeout < 0 {
return errors.New("planPreview.commandHandleTimeout must be greater than or equal to 0")
}
return nil
}

// PipedPlugin defines the plugin configuration for the piped.
type PipedPlugin struct {
// The name of the plugin.
Expand Down
Loading
Loading