diff --git a/watcher/sbomwatcher.go b/watcher/sbomwatcher.go index 6f4bc3d..7154c19 100644 --- a/watcher/sbomwatcher.go +++ b/watcher/sbomwatcher.go @@ -2,6 +2,7 @@ package watcher import ( "context" + "errors" "slices" "strings" "time" @@ -20,6 +21,14 @@ import ( "k8s.io/client-go/tools/pager" ) +// Bounded exponential backoff for SBOM events whose owning pod has not been +// observed yet. Declared as vars (not consts) so tests can shorten them. +var ( + sbomRetryInitialDelay = 2 * time.Second + sbomRetryMaxDelay = 60 * time.Second + sbomRetryMaxAttempts = 5 +) + // SBOMWatch watches and processes changes on SBOMs func (wh *WatchHandler) SBOMWatch(ctx context.Context, workerPool *ants.PoolWithFunc) { eventQueue := NewCooldownQueueWithParams(15*time.Second, 1*time.Second) @@ -169,6 +178,41 @@ func (wh *WatchHandler) HandleSBOMEvents(eventQueue *CooldownQueue, producedComm } if err := validateContainerData(containerData); err != nil { + if errors.Is(err, ErrMissingWlid) { + // The pod that owns this image hasn't been observed yet + // (initial-list race, or pod informer not yet warm). Re-enqueue + // with bounded exponential backoff rather than dispatch a scan + // command with an empty Wlid — kubevuln silently drops those + // from the platform submission path. + key := obj.ObjectMeta.Namespace + "/" + obj.ObjectMeta.Name + attempt := wh.sbomRetryAttempts.Get(key) + if attempt >= sbomRetryMaxAttempts { + wh.sbomRetryAttempts.Delete(key) + logger.L().Warning("dropping SBOM scan after exhausting retries waiting for Wlid", + helpers.String("name", obj.ObjectMeta.Name), + helpers.String("namespace", obj.ObjectMeta.Namespace), + helpers.String("imageID", containerData.ImageID), + helpers.Int("attempts", attempt)) + errorCh <- err + continue + } + wh.sbomRetryAttempts.Set(key, attempt+1) + delay := sbomRetryBackoff(attempt) + logger.L().Debug("Wlid not yet known for SBOM, re-enqueueing", + helpers.String("name", obj.ObjectMeta.Name), + helpers.String("namespace", obj.ObjectMeta.Namespace), + helpers.String("imageID", containerData.ImageID), + helpers.Int("attempt", attempt+1), + helpers.String("delay", delay.String())) + go func(ev watch.Event, d time.Duration) { + time.Sleep(d) + if eventQueue.Closed() { + return + } + eventQueue.Enqueue(ev) + }(e, delay) + continue + } logger.L().Error("failed to get container data from SBOM", helpers.String("name", obj.ObjectMeta.Name), helpers.String("namespace", obj.ObjectMeta.Namespace), @@ -177,6 +221,8 @@ func (wh *WatchHandler) HandleSBOMEvents(eventQueue *CooldownQueue, producedComm errorCh <- err continue } + // success: clear any retry bookkeeping for this SBOM + wh.sbomRetryAttempts.Delete(obj.ObjectMeta.Namespace + "/" + obj.ObjectMeta.Name) cmd := &apis.Command{ Wlid: containerData.Wlid, @@ -225,5 +271,18 @@ func validateContainerData(containerData *utils.ContainerData) error { if containerData.ImageTag == "" { return ErrMissingImageTag } + if containerData.Wlid == "" { + return ErrMissingWlid + } return nil } + +// sbomRetryBackoff returns the delay before retry number `attempt` (0-indexed). +// Doubles each time, capped at sbomRetryMaxDelay. +func sbomRetryBackoff(attempt int) time.Duration { + d := sbomRetryInitialDelay << attempt + if d <= 0 || d > sbomRetryMaxDelay { + return sbomRetryMaxDelay + } + return d +} diff --git a/watcher/sbomwatcher_test.go b/watcher/sbomwatcher_test.go index 3def963..246b364 100644 --- a/watcher/sbomwatcher_test.go +++ b/watcher/sbomwatcher_test.go @@ -3,6 +3,7 @@ package watcher import ( "context" _ "embed" + "sync" "testing" "time" @@ -21,16 +22,39 @@ import ( k8sfake "k8s.io/client-go/kubernetes/fake" ) +const ( + testImageID = "docker.io/library/nginx@sha256:aa0afebbb3cfa473099a62c4b32e9b3fb73ed23f2a75a65ce1d4b4f55a5c2ef2" + testImageHashOnly = "aa0afebbb3cfa473099a62c4b32e9b3fb73ed23f2a75a65ce1d4b4f55a5c2ef2" + testImageTag = "nginx:1.14.0" + testWlid = "wlid://cluster-test/namespace-default/deployment-nginx" + testContainerName = "nginx" +) + +func newTestHandler(t *testing.T, startingObjects ...runtime.Object) *WatchHandler { + t.Helper() + clusterConfig := utilsmetadata.ClusterConfig{} + cfg, err := config.LoadConfig("../configuration") + assert.NoError(t, err) + operatorConfig := config.NewOperatorConfig(config.CapabilitiesConfig{}, clusterConfig, &beUtils.Credentials{}, cfg) + + k8sClient := k8sfake.NewClientset() + k8sAPI := utils.NewK8sInterfaceFake(k8sClient) + storageClient := kssfake.NewSimpleClientset(startingObjects...) + return NewWatchHandler(operatorConfig, k8sAPI, storageClient, nil) +} + func TestHandleSBOMEvents(t *testing.T) { tt := []struct { name string + seedContainerData bool // pre-populate ImageToContainerData so Wlid is known inputEvents []watch.Event expectedObjectNames []string expectedCommands []*apis.Command expectedErrors []error }{ { - name: "Adding a new SBOM should produce a matching scan command", + name: "Adding a new SBOM with known Wlid produces a matching scan command", + seedContainerData: true, inputEvents: []watch.Event{ { Type: watch.Added, @@ -38,8 +62,8 @@ func TestHandleSBOMEvents(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "replicaset-nginx-6ccd565b7d-nginx-49d3-1861", Annotations: map[string]string{ - helpersv1.ImageIDMetadataKey: "docker.io/library/nginx@sha256:aa0afebbb3cfa473099a62c4b32e9b3fb73ed23f2a75a65ce1d4b4f55a5c2ef2", - helpersv1.ImageTagMetadataKey: "nginx:1.14.0", + helpersv1.ImageIDMetadataKey: testImageID, + helpersv1.ImageTagMetadataKey: testImageTag, }, }, }, @@ -50,8 +74,8 @@ func TestHandleSBOMEvents(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "replicaset-nginx-6ccd565b7d-nginx-e4ff-657a", Annotations: map[string]string{ - helpersv1.ImageIDMetadataKey: "docker.io/library/nginx@sha256:aa0afebbb3cfa473099a62c4b32e9b3fb73ed23f2a75a65ce1d4b4f55a5c2ef2", - helpersv1.ImageTagMetadataKey: "nginx:1.14.0", + helpersv1.ImageIDMetadataKey: testImageID, + helpersv1.ImageTagMetadataKey: testImageTag, }, }, }, @@ -59,20 +83,26 @@ func TestHandleSBOMEvents(t *testing.T) { }, expectedCommands: []*apis.Command{ { + Wlid: testWlid, CommandName: apis.TypeScanImages, Args: map[string]interface{}{ utils.ArgsContainerData: &utils.ContainerData{ - ImageID: "docker.io/library/nginx@sha256:aa0afebbb3cfa473099a62c4b32e9b3fb73ed23f2a75a65ce1d4b4f55a5c2ef2", - ImageTag: "nginx:1.14.0", + ContainerName: testContainerName, + ImageID: testImageID, + ImageTag: testImageTag, + Wlid: testWlid, }, }, }, { + Wlid: testWlid, CommandName: apis.TypeScanImages, Args: map[string]interface{}{ utils.ArgsContainerData: &utils.ContainerData{ - ImageID: "docker.io/library/nginx@sha256:aa0afebbb3cfa473099a62c4b32e9b3fb73ed23f2a75a65ce1d4b4f55a5c2ef2", - ImageTag: "nginx:1.14.0", + ContainerName: testContainerName, + ImageID: testImageID, + ImageTag: testImageTag, + Wlid: testWlid, }, }, }, @@ -83,7 +113,8 @@ func TestHandleSBOMEvents(t *testing.T) { }, }, { - name: "Missing image tag", + name: "Missing image tag", + seedContainerData: true, inputEvents: []watch.Event{ { Type: watch.Added, @@ -91,7 +122,7 @@ func TestHandleSBOMEvents(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "replicaset-nginx-6ccd565b7d-nginx-49d3-1861", Annotations: map[string]string{ - helpersv1.ImageIDMetadataKey: "docker.io/library/nginx@sha256:aa0afebbb3cfa473099a62c4b32e9b3fb73ed23f2a75a65ce1d4b4f55a5c2ef2", + helpersv1.ImageIDMetadataKey: testImageID, helpersv1.ImageTagMetadataKey: "", // missing image tag }, }, @@ -117,28 +148,24 @@ func TestHandleSBOMEvents(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { - // Prepare starting startingObjects for storage var startingObjects []runtime.Object for _, e := range tc.inputEvents { startingObjects = append(startingObjects, e.Object) } ctx := context.Background() - clusterConfig := utilsmetadata.ClusterConfig{} - cfg, err := config.LoadConfig("../configuration") - assert.NoError(t, err) - operatorConfig := config.NewOperatorConfig(config.CapabilitiesConfig{}, clusterConfig, &beUtils.Credentials{}, cfg) - - k8sClient := k8sfake.NewClientset() - k8sAPI := utils.NewK8sInterfaceFake(k8sClient) - storageClient := kssfake.NewSimpleClientset(startingObjects...) + wh := newTestHandler(t, startingObjects...) + if tc.seedContainerData { + wh.ImageToContainerData.Set(testImageHashOnly, utils.ContainerData{ + ContainerName: testContainerName, + Wlid: testWlid, + }) + } eventQueue := NewCooldownQueueWithParams(1*time.Second, 1*time.Second) cmdCh := make(chan *apis.Command) errorCh := make(chan error) - wh := NewWatchHandler(operatorConfig, k8sAPI, storageClient, nil) - go wh.HandleSBOMEvents(eventQueue, cmdCh, errorCh) go func() { @@ -169,17 +196,215 @@ func TestHandleSBOMEvents(t *testing.T) { } } - actualObjects, _ := storageClient.SpdxV1beta1().SBOMSyfts("").List(ctx, metav1.ListOptions{}) + actualObjects, _ := wh.storageClient.SpdxV1beta1().SBOMSyfts("").List(ctx, metav1.ListOptions{}) var actualObjectNames []string for _, obj := range actualObjects.Items { actualObjectNames = append(actualObjectNames, obj.ObjectMeta.Name) } - assert.Equal(t, tc.expectedObjectNames, actualObjectNames, "Objects in the storage don’t match") - assert.Equal(t, tc.expectedErrors, actualErrors, "Errors don’t match") - assert.ElementsMatch(t, tc.expectedCommands, actualCommands, "Commands don’t match") + assert.Equal(t, tc.expectedObjectNames, actualObjectNames, "Objects in the storage don't match") + assert.Equal(t, tc.expectedErrors, actualErrors, "Errors don't match") + assert.ElementsMatch(t, tc.expectedCommands, actualCommands, "Commands don't match") + }) + } +} + +// TestValidateContainerData covers the validation gate directly, including the +// new ErrMissingWlid case added to fix kubescape/operator#378. +func TestValidateContainerData(t *testing.T) { + cases := []struct { + name string + data *utils.ContainerData + wantErr error + }{ + { + name: "all fields present", + data: &utils.ContainerData{ImageID: testImageID, ImageTag: testImageTag, Wlid: testWlid}, + wantErr: nil, + }, + { + name: "missing ImageID", + data: &utils.ContainerData{ImageTag: testImageTag, Wlid: testWlid}, + wantErr: ErrMissingImageID, + }, + { + name: "missing ImageTag", + data: &utils.ContainerData{ImageID: testImageID, Wlid: testWlid}, + wantErr: ErrMissingImageTag, + }, + { + name: "missing Wlid (regression for #378)", + data: &utils.ContainerData{ImageID: testImageID, ImageTag: testImageTag}, + wantErr: ErrMissingWlid, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + assert.ErrorIs(t, validateContainerData(tc.data), tc.wantErr) + }) + } +} + +// TestSBOMRetryBackoff verifies the bounded exponential backoff schedule. +func TestSBOMRetryBackoff(t *testing.T) { + // Force a known schedule. + origInit, origMax := sbomRetryInitialDelay, sbomRetryMaxDelay + sbomRetryInitialDelay = 1 * time.Second + sbomRetryMaxDelay = 8 * time.Second + t.Cleanup(func() { + sbomRetryInitialDelay = origInit + sbomRetryMaxDelay = origMax + }) + + assert.Equal(t, 1*time.Second, sbomRetryBackoff(0)) + assert.Equal(t, 2*time.Second, sbomRetryBackoff(1)) + assert.Equal(t, 4*time.Second, sbomRetryBackoff(2)) + assert.Equal(t, 8*time.Second, sbomRetryBackoff(3)) + // Capped. + assert.Equal(t, 8*time.Second, sbomRetryBackoff(4)) + assert.Equal(t, 8*time.Second, sbomRetryBackoff(20)) + // Even a huge attempt that would overflow the shift stays capped. + assert.Equal(t, 8*time.Second, sbomRetryBackoff(64)) +} + +// TestHandleSBOMEvents_WlidArrivesLate reproduces the issue: an SBOM event +// arrives before the owning pod is observed. The handler must NOT dispatch a +// scan command with Wlid="". Once the pod information lands, the re-enqueued +// SBOM should produce a correctly-attributed scan command. +func TestHandleSBOMEvents_WlidArrivesLate(t *testing.T) { + origInit, origMax, origAttempts := sbomRetryInitialDelay, sbomRetryMaxDelay, sbomRetryMaxAttempts + sbomRetryInitialDelay = 100 * time.Millisecond + sbomRetryMaxDelay = 200 * time.Millisecond + sbomRetryMaxAttempts = 10 + t.Cleanup(func() { + sbomRetryInitialDelay = origInit + sbomRetryMaxDelay = origMax + sbomRetryMaxAttempts = origAttempts + }) + + sbom := &spdxv1beta1.SBOMSyft{ + ObjectMeta: metav1.ObjectMeta{ + Name: "replicaset-nginx-late-wlid", + Namespace: "default", + Annotations: map[string]string{ + helpersv1.ImageIDMetadataKey: testImageID, + helpersv1.ImageTagMetadataKey: testImageTag, + }, + }, + } + + wh := newTestHandler(t, sbom) + eventQueue := NewCooldownQueueWithParams(150*time.Millisecond, 50*time.Millisecond) + cmdCh := make(chan *apis.Command, 4) + errorCh := make(chan error, 4) + + go wh.HandleSBOMEvents(eventQueue, cmdCh, errorCh) + + // Enqueue the SBOM while ImageToContainerData is empty. + eventQueue.Enqueue(watch.Event{Type: watch.Added, Object: sbom}) + + // After a couple of retry cycles, simulate the pod informer populating the map. + time.AfterFunc(350*time.Millisecond, func() { + wh.ImageToContainerData.Set(testImageHashOnly, utils.ContainerData{ + ContainerName: testContainerName, + Wlid: testWlid, }) + }) + + var cmd *apis.Command + select { + case cmd = <-cmdCh: + case <-time.After(5 * time.Second): + t.Fatalf("expected a scan command after Wlid became available") + } + + assert.Equal(t, testWlid, cmd.Wlid, "scan command must carry the correct Wlid, not an empty string") + assert.Equal(t, apis.TypeScanImages, cmd.CommandName) + // No errors should have been emitted (re-enqueue path does not produce errors + // until retries are exhausted). + select { + case err := <-errorCh: + t.Fatalf("unexpected error before retry exhaustion: %v", err) + default: } + + // Bookkeeping should be cleared after success. + key := sbom.Namespace + "/" + sbom.Name + assert.Equal(t, 0, wh.sbomRetryAttempts.Get(key), "retry counter must be cleared on success") + + eventQueue.Stop() +} + +// TestHandleSBOMEvents_WlidNeverArrives_ExhaustsRetries verifies the +// "don't retry forever" requirement from the issue discussion: after a bounded +// number of attempts, the SBOM is dropped with ErrMissingWlid and no malformed +// scan command is ever produced. +func TestHandleSBOMEvents_WlidNeverArrives_ExhaustsRetries(t *testing.T) { + origInit, origMax, origAttempts := sbomRetryInitialDelay, sbomRetryMaxDelay, sbomRetryMaxAttempts + sbomRetryInitialDelay = 50 * time.Millisecond + sbomRetryMaxDelay = 50 * time.Millisecond + sbomRetryMaxAttempts = 3 + t.Cleanup(func() { + sbomRetryInitialDelay = origInit + sbomRetryMaxDelay = origMax + sbomRetryMaxAttempts = origAttempts + }) + + sbom := &spdxv1beta1.SBOMSyft{ + ObjectMeta: metav1.ObjectMeta{ + Name: "replicaset-orphan-sbom", + Namespace: "default", + Annotations: map[string]string{ + helpersv1.ImageIDMetadataKey: testImageID, + helpersv1.ImageTagMetadataKey: testImageTag, + }, + }, + } + + wh := newTestHandler(t, sbom) + eventQueue := NewCooldownQueueWithParams(60*time.Millisecond, 20*time.Millisecond) + cmdCh := make(chan *apis.Command, 4) + errorCh := make(chan error, 4) + + // Drain cmdCh in the background so we can assert nothing was sent. + var cmdMu sync.Mutex + var receivedCmds []*apis.Command + cmdDone := make(chan struct{}) + go func() { + defer close(cmdDone) + for c := range cmdCh { + cmdMu.Lock() + receivedCmds = append(receivedCmds, c) + cmdMu.Unlock() + } + }() + + go wh.HandleSBOMEvents(eventQueue, cmdCh, errorCh) + + eventQueue.Enqueue(watch.Event{Type: watch.Added, Object: sbom}) + + var finalErr error + select { + case finalErr = <-errorCh: + case <-time.After(5 * time.Second): + t.Fatalf("expected ErrMissingWlid after retries exhausted") + } + assert.ErrorIs(t, finalErr, ErrMissingWlid) + + cmdMu.Lock() + assert.Empty(t, receivedCmds, "no scan commands should be dispatched when Wlid never resolves") + cmdMu.Unlock() + + // Bookkeeping must be cleared on exhaustion to avoid leaking memory if the + // SBOM is later re-observed. + key := sbom.Namespace + "/" + sbom.Name + assert.Equal(t, 0, wh.sbomRetryAttempts.Get(key), "retry counter must be cleared on exhaustion") + + eventQueue.Stop() + // HandleSBOMEvents closes cmdCh implicitly? No - it only closes errorCh. + // Close cmdCh manually so the drain goroutine exits, then wait. + close(cmdCh) + <-cmdDone } diff --git a/watcher/watchhandler.go b/watcher/watchhandler.go index 5d3132d..5ec9fcd 100644 --- a/watcher/watchhandler.go +++ b/watcher/watchhandler.go @@ -17,6 +17,7 @@ const retryInterval = 1 * time.Second var ( ErrMissingWLID = fmt.Errorf("missing WLID") + ErrMissingWlid = ErrMissingWLID // alias used by SBOM watcher retry path ErrMissingSlug = fmt.Errorf("missing slug") ErrMissingImageTag = fmt.Errorf("missing image ID") ErrMissingImageID = fmt.Errorf("missing image tag") @@ -29,6 +30,7 @@ type WatchHandler struct { ImageToContainerData maps.SafeMap[string, utils.ContainerData] // map of : SlugToImageID maps.SafeMap[string, string] // map of : string WlidAndImageID mapset.Set[string] // set of + sbomRetryAttempts maps.SafeMap[string, int] // map of : retry attempts so far storageClient kssc.Interface cfg config.IConfig k8sAPI *k8sinterface.KubernetesApi