diff --git a/go.mod b/go.mod index 2b31a11bf..70050a82a 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,6 @@ require ( github.com/kubescape/k8s-interface v0.0.206 github.com/ncw/directio v1.0.5 github.com/olvrng/ujson v1.1.0 - github.com/puzpuzpuz/xsync/v2 v2.4.1 github.com/spf13/afero v1.15.0 github.com/spf13/cobra v1.10.2 github.com/spf13/viper v1.20.1 @@ -221,3 +220,5 @@ require ( sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/yaml v1.6.0 // indirect ) + +replace k8s.io/apiserver => github.com/matthyx/apiserver v0.0.0-20260603054931-54c588143d7b diff --git a/go.sum b/go.sum index 3f5cf52c1..c2bc70b04 100644 --- a/go.sum +++ b/go.sum @@ -501,6 +501,8 @@ github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPK github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4= github.com/mailru/easyjson v0.9.0/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU= +github.com/matthyx/apiserver v0.0.0-20260603054931-54c588143d7b h1:v4QhVueYEL9wiUPaKLnedbQDJbjOBISoURY0V3cExqQ= +github.com/matthyx/apiserver v0.0.0-20260603054931-54c588143d7b/go.mod h1:QUy1U4+PrzbJaM3XGu2tQ7U9A4udRRo5cyxkFX0GEds= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= @@ -615,8 +617,6 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= -github.com/puzpuzpuz/xsync/v2 v2.4.1 h1:aGdE1C/HaR/QC6YAFdtZXi60Df8/qBIrs8PKrzkItcM= -github.com/puzpuzpuz/xsync/v2 v2.4.1/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -1348,8 +1348,6 @@ k8s.io/apiextensions-apiserver v0.35.0 h1:3xHk2rTOdWXXJM+RDQZJvdx0yEOgC0FgQ1PlJa k8s.io/apiextensions-apiserver v0.35.0/go.mod h1:E1Ahk9SADaLQ4qtzYFkwUqusXTcaV2uw3l14aqpL2LU= k8s.io/apimachinery v0.35.0 h1:Z2L3IHvPVv/MJ7xRxHEtk6GoJElaAqDCCU0S6ncYok8= k8s.io/apimachinery v0.35.0/go.mod h1:jQCgFZFR1F4Ik7hvr2g84RTJSZegBc8yHgFWKn//hns= -k8s.io/apiserver v0.35.0 h1:CUGo5o+7hW9GcAEF3x3usT3fX4f9r8xmgQeCBDaOgX4= -k8s.io/apiserver v0.35.0/go.mod h1:QUy1U4+PrzbJaM3XGu2tQ7U9A4udRRo5cyxkFX0GEds= k8s.io/client-go v0.35.0 h1:IAW0ifFbfQQwQmga0UdoH0yvdqrbwMdq9vIFEhRpxBE= k8s.io/client-go v0.35.0/go.mod h1:q2E5AAyqcbeLGPdoRB+Nxe3KYTfPce1Dnu1myQdqz9o= k8s.io/code-generator v0.35.0 h1:TvrtfKYZTm9oDF2z+veFKSCcgZE3Igv0svY+ehCmjHQ= diff --git a/main.go b/main.go index 2a4147640..39d4af62b 100644 --- a/main.go +++ b/main.go @@ -92,9 +92,6 @@ func main() { osFs := afero.NewOsFs() pool := file.NewPool(filepath.Join(file.DefaultStorageRoot, "metadata.sq3"), 0) // If less than 1, a reasonable default is used. - // setup watcher - watchDispatcher := file.NewWatchDispatcher() - // cleanup task client, err := file.NewKubernetesClient() kubernetesAPI := file.NewKubernetesAPI(cfg, client) @@ -104,11 +101,11 @@ func main() { relevancyEnabled := clusterData.RelevantImageVulnerabilitiesEnabled != nil && *clusterData.RelevantImageVulnerabilitiesEnabled - cleanupHandler := file.NewResourcesCleanupHandler(osFs, file.DefaultStorageRoot, pool, watchDispatcher, cfg.CleanupInterval, cfg.DefaultNamespace, kubernetesAPI, relevancyEnabled) + cleanupHandler := file.NewResourcesCleanupHandler(osFs, file.DefaultStorageRoot, pool, cfg.CleanupInterval, cfg.DefaultNamespace, kubernetesAPI, relevancyEnabled) go cleanupHandler.RunCleanupTask(ctx) // start the server - options := server.NewWardleServerOptions(os.Stdout, os.Stderr, osFs, pool, cfg, watchDispatcher, cleanupHandler) + options := server.NewWardleServerOptions(os.Stdout, os.Stderr, osFs, pool, cfg, cleanupHandler) cmd := server.NewCommandStartWardleServer(ctx, options, false) logger.L().Info("APIServer starting") code := cli.Run(cmd) diff --git a/pkg/apis/softwarecomposition/register.go b/pkg/apis/softwarecomposition/register.go index 623950c9d..83e7bb838 100644 --- a/pkg/apis/softwarecomposition/register.go +++ b/pkg/apis/softwarecomposition/register.go @@ -53,36 +53,36 @@ var ( // Adds the list of known types to the given scheme. func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, - &VulnerabilityManifest{}, - &VulnerabilityManifestList{}, - &VulnerabilityManifestSummary{}, - &VulnerabilityManifestSummaryList{}, - &WorkloadConfigurationScan{}, - &WorkloadConfigurationScanList{}, - &WorkloadConfigurationScanSummary{}, - &WorkloadConfigurationScanSummaryList{}, - &ConfigurationScanSummary{}, - &ConfigurationScanSummaryList{}, - &VulnerabilitySummary{}, - &VulnerabilitySummaryList{}, - &ApplicationProfile{}, &ApplicationProfileList{}, - &ContainerProfile{}, + &ApplicationProfile{}, + &ConfigurationScanSummaryList{}, + &ConfigurationScanSummary{}, &ContainerProfileList{}, - &NetworkNeighborhood{}, - &NetworkNeighborhoodList{}, - &OpenVulnerabilityExchangeContainer{}, - &OpenVulnerabilityExchangeContainerList{}, + &ContainerProfile{}, &GeneratedNetworkPolicyList{}, &GeneratedNetworkPolicy{}, &KnownServerList{}, &KnownServer{}, - &SBOMSyft{}, - &SBOMSyftList{}, - &SBOMSyftFiltered{}, + &NetworkNeighborhoodList{}, + &NetworkNeighborhood{}, + &OpenVulnerabilityExchangeContainerList{}, + &OpenVulnerabilityExchangeContainer{}, &SBOMSyftFilteredList{}, - &SeccompProfile{}, + &SBOMSyftFiltered{}, + &SBOMSyftList{}, + &SBOMSyft{}, &SeccompProfileList{}, + &SeccompProfile{}, + &VulnerabilityManifestList{}, + &VulnerabilityManifestSummaryList{}, + &VulnerabilityManifestSummary{}, + &VulnerabilityManifest{}, + &VulnerabilitySummaryList{}, + &VulnerabilitySummary{}, + &WorkloadConfigurationScanList{}, + &WorkloadConfigurationScanSummaryList{}, + &WorkloadConfigurationScanSummary{}, + &WorkloadConfigurationScan{}, ) return nil } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 1ee303514..55c91b9b6 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -82,11 +82,10 @@ func init() { // ExtraConfig holds custom apiserver config type ExtraConfig struct { - CleanupHandler *file.ResourcesCleanupHandler - OsFs afero.Fs - Pool *sqlitemigration.Pool - StorageConfig config.Config - WatchDispatcher *file.WatchDispatcher + CleanupHandler *file.ResourcesCleanupHandler + OsFs afero.Fs + Pool *sqlitemigration.Pool + StorageConfig config.Config } // Config defines the config for the apiserver @@ -140,11 +139,11 @@ func (c completedConfig) New() (*WardleServer, error) { apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(softwarecomposition.GroupName, Scheme, metav1.ParameterCodec, Codecs) var ( - storageImpl = file.NewStorageImpl(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, c.ExtraConfig.WatchDispatcher, Scheme) + storageImpl = file.NewStorageImpl(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, Scheme) - applicationProfileStorageImpl = file.NewApplicationProfileStorage(file.NewStorageImplWithCollector(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, c.ExtraConfig.WatchDispatcher, Scheme, file.NewApplicationProfileProcessor(c.ExtraConfig.StorageConfig))) - containerProfileStorageImpl = file.NewContainerProfileRESTStorage(file.NewStorageImplWithCollector(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, c.ExtraConfig.WatchDispatcher, Scheme, file.NewContainerProfileProcessor(c.ExtraConfig.StorageConfig, c.ExtraConfig.CleanupHandler))) - networkNeighborhoodStorageImpl = file.NewNetworkNeighborhoodStorage(file.NewStorageImplWithCollector(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, c.ExtraConfig.WatchDispatcher, Scheme, file.NewNetworkNeighborhoodProcessor(c.ExtraConfig.StorageConfig))) + applicationProfileStorageImpl = file.NewApplicationProfileStorage(file.NewStorageImplWithCollector(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, Scheme, file.NewApplicationProfileProcessor(c.ExtraConfig.StorageConfig))) + containerProfileStorageImpl = file.NewContainerProfileRESTStorage(file.NewStorageImplWithCollector(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, Scheme, file.NewContainerProfileProcessor(c.ExtraConfig.StorageConfig, c.ExtraConfig.CleanupHandler))) + networkNeighborhoodStorageImpl = file.NewNetworkNeighborhoodStorage(file.NewStorageImplWithCollector(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, Scheme, file.NewNetworkNeighborhoodProcessor(c.ExtraConfig.StorageConfig))) configScanStorageImpl = file.NewConfigurationScanSummaryStorage(storageImpl) vulnerabilitySummaryStorage = file.NewVulnerabilitySummaryStorage(storageImpl) generatedNetworkPolicyStorage = file.NewGeneratedNetworkPolicyStorage(storageImpl, networkNeighborhoodStorageImpl) diff --git a/pkg/cmd/server/start.go b/pkg/cmd/server/start.go index 36cd984d6..d0eed1703 100644 --- a/pkg/cmd/server/start.go +++ b/pkg/cmd/server/start.go @@ -71,11 +71,10 @@ type WardleServerOptions struct { AlternateDNS []string - CleanupHandler *file.ResourcesCleanupHandler - OsFs afero.Fs - Pool *sqlitemigration.Pool - StorageConfig config.Config - WatchDispatcher *file.WatchDispatcher + CleanupHandler *file.ResourcesCleanupHandler + OsFs afero.Fs + Pool *sqlitemigration.Pool + StorageConfig config.Config } func WardleVersionToKubeVersion(ver *version.Version) *version.Version { @@ -93,7 +92,7 @@ func WardleVersionToKubeVersion(ver *version.Version) *version.Version { } // NewWardleServerOptions returns a new WardleServerOptions -func NewWardleServerOptions(out, errOut io.Writer, osFs afero.Fs, pool *sqlitemigration.Pool, cfg config.Config, watchDispatcher *file.WatchDispatcher, cleanupHandler *file.ResourcesCleanupHandler) *WardleServerOptions { +func NewWardleServerOptions(out, errOut io.Writer, osFs afero.Fs, pool *sqlitemigration.Pool, cfg config.Config, cleanupHandler *file.ResourcesCleanupHandler) *WardleServerOptions { o := &WardleServerOptions{ RecommendedOptions: genericoptions.NewRecommendedOptions( defaultEtcdPathPrefix, @@ -104,11 +103,10 @@ func NewWardleServerOptions(out, errOut io.Writer, osFs afero.Fs, pool *sqlitemi StdOut: out, StdErr: errOut, - CleanupHandler: cleanupHandler, - OsFs: osFs, - Pool: pool, - StorageConfig: cfg, - WatchDispatcher: watchDispatcher, + CleanupHandler: cleanupHandler, + OsFs: osFs, + Pool: pool, + StorageConfig: cfg, } o.RecommendedOptions.Admission = nil o.RecommendedOptions.Etcd = nil @@ -282,11 +280,10 @@ func (o *WardleServerOptions) Config() (*apiserver.Config, error) { c := &apiserver.Config{ GenericConfig: serverConfig, ExtraConfig: apiserver.ExtraConfig{ - CleanupHandler: o.CleanupHandler, - OsFs: o.OsFs, - Pool: o.Pool, - StorageConfig: o.StorageConfig, - WatchDispatcher: o.WatchDispatcher, + CleanupHandler: o.CleanupHandler, + OsFs: o.OsFs, + Pool: o.Pool, + StorageConfig: o.StorageConfig, }, } return c, nil diff --git a/pkg/registry/file/applicationprofile_storage.go b/pkg/registry/file/applicationprofile_storage.go index dc1272c06..1068f3e8a 100644 --- a/pkg/registry/file/applicationprofile_storage.go +++ b/pkg/registry/file/applicationprofile_storage.go @@ -54,8 +54,8 @@ func (a ApplicationProfileStorage) Delete(ctx context.Context, key string, out r return a.realStore.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject, opts) } -func (a ApplicationProfileStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { - return a.realStore.Watch(ctx, key, opts) +func (a ApplicationProfileStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) { + return nil, nil // watch disabled } func (a ApplicationProfileStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { diff --git a/pkg/registry/file/cleanup.go b/pkg/registry/file/cleanup.go index 3c0122fe1..9d5f78b6b 100644 --- a/pkg/registry/file/cleanup.go +++ b/pkg/registry/file/cleanup.go @@ -38,7 +38,6 @@ type ResourcesCleanupHandler struct { fetcher ResourcesFetcher deleteFunc TypeDeleteFunc resourceToKindHandler map[string][]TypeCleanupHandlerFunc - watchDispatcher *WatchDispatcher } func initResourceToKindHandler(relevancyEnabled bool) map[string][]TypeCleanupHandlerFunc { @@ -77,7 +76,7 @@ func initResourceToKindHandler(relevancyEnabled bool) map[string][]TypeCleanupHa return resourceKindToHandler } -func NewResourcesCleanupHandler(appFs afero.Fs, root string, pool *sqlitemigration.Pool, watchDispatcher *WatchDispatcher, interval time.Duration, defaultNamespace string, fetcher ResourcesFetcher, relevancyEnabled bool) *ResourcesCleanupHandler { +func NewResourcesCleanupHandler(appFs afero.Fs, root string, pool *sqlitemigration.Pool, interval time.Duration, defaultNamespace string, fetcher ResourcesFetcher, relevancyEnabled bool) *ResourcesCleanupHandler { return &ResourcesCleanupHandler{ appFs: appFs, @@ -88,7 +87,6 @@ func NewResourcesCleanupHandler(appFs afero.Fs, root string, pool *sqlitemigrati fetcher: fetcher, deleteFunc: deleteFile, resourceToKindHandler: initResourceToKindHandler(relevancyEnabled), - watchDispatcher: watchDispatcher, } } @@ -197,11 +195,7 @@ func (h *ResourcesCleanupHandler) cleanupNamespace(ctx context.Context, ns strin logger.L().Debug("deleting", helpers.String("kind", resourceKind), helpers.String("namespace", metadata.Namespace), helpers.String("name", metadata.Name)) h.deleteFunc(h.appFs, path) - metaOut := h.deleteMetadata(conn, path) - if h.watchDispatcher != nil { - key := path[len(h.root) : len(path)-len(GobExt)] - h.watchDispatcher.Deleted(key, metaOut) - } + _ = h.deleteMetadata(conn, path) } return nil }) diff --git a/pkg/registry/file/configurationscansummarystorage_test.go b/pkg/registry/file/configurationscansummarystorage_test.go index ab55cb316..0edadb6c1 100644 --- a/pkg/registry/file/configurationscansummarystorage_test.go +++ b/pkg/registry/file/configurationscansummarystorage_test.go @@ -17,8 +17,9 @@ import ( "zombiezen.com/go/sqlite/sqlitemigration" ) + func TestConfigurationScanSummaryStorage_Create(t *testing.T) { - storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil, nil) + storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil) configScanSummaryStorage := NewConfigurationScanSummaryStorage(storageImpl) err := configScanSummaryStorage.Create(context.TODO(), "", nil, nil, 0) @@ -29,7 +30,7 @@ func TestConfigurationScanSummaryStorage_Create(t *testing.T) { } func TestConfigurationScanSummaryStorage_Delete(t *testing.T) { - storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil, nil) + storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil) configScanSummaryStorage := NewConfigurationScanSummaryStorage(storageImpl) err := configScanSummaryStorage.Delete(context.TODO(), "", nil, nil, nil, nil, storage.DeleteOptions{}) @@ -40,7 +41,7 @@ func TestConfigurationScanSummaryStorage_Delete(t *testing.T) { } func TestConfigurationScanSummaryStorage_Watch(t *testing.T) { - storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil, nil) + storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil) configScanSummaryStorage := NewConfigurationScanSummaryStorage(storageImpl) _, err := configScanSummaryStorage.Watch(context.TODO(), "", storage.ListOptions{}) @@ -48,7 +49,7 @@ func TestConfigurationScanSummaryStorage_Watch(t *testing.T) { } func TestConfigurationScanSummaryStorage_GuaranteedUpdate(t *testing.T) { - storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil, nil) + storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil) configScanSummaryStorage := NewConfigurationScanSummaryStorage(storageImpl) err := configScanSummaryStorage.GuaranteedUpdate(context.TODO(), "", nil, false, nil, nil, nil) @@ -102,7 +103,7 @@ func TestConfigurationScanSummaryStorage_Get(t *testing.T) { }(pool) sch := scheme.Scheme require.NoError(t, softwarecomposition.AddToScheme(sch)) - realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", pool, nil, sch) + realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", pool, sch) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -177,7 +178,7 @@ func TestConfigurationScanSummaryStorage_GetList(t *testing.T) { }(pool) sch := scheme.Scheme require.NoError(t, softwarecomposition.AddToScheme(sch)) - realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", pool, nil, sch) + realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", pool, sch) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/registry/file/containerprofile_processor_test.go b/pkg/registry/file/containerprofile_processor_test.go index 83490929e..81c6d00eb 100644 --- a/pkg/registry/file/containerprofile_processor_test.go +++ b/pkg/registry/file/containerprofile_processor_test.go @@ -37,14 +37,13 @@ func TestConsolidateData(t *testing.T) { MaxContainerProfileSize: 40000, } s := &StorageImpl{ - appFs: afero.NewMemMapFs(), - pool: pool, - locks: utils.NewMapMutex[string](), - processor: &processor, - root: DefaultStorageRoot, - scheme: sch, - versioner: storage.APIObjectVersioner{}, - watchDispatcher: NewWatchDispatcher(), + appFs: afero.NewMemMapFs(), + pool: pool, + locks: utils.NewMapMutex[string](), + processor: &processor, + root: DefaultStorageRoot, + scheme: sch, + versioner: storage.APIObjectVersioner{}, } processor.SetStorage(NewContainerProfileStorageImpl(s, pool)) diff --git a/pkg/registry/file/containerprofile_user_managed_test.go b/pkg/registry/file/containerprofile_user_managed_test.go index ff0467edb..1656c7354 100644 --- a/pkg/registry/file/containerprofile_user_managed_test.go +++ b/pkg/registry/file/containerprofile_user_managed_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "os" - "path" "sort" "testing" "time" @@ -22,7 +21,6 @@ import ( "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" "zombiezen.com/go/sqlite" "zombiezen.com/go/sqlite/sqlitemigration" @@ -283,14 +281,13 @@ func newE2EHarness(t *testing.T) *e2eHarness { HostType: armotypes.HostTypeKubernetes, } s := &StorageImpl{ - appFs: afero.NewMemMapFs(), - pool: pool, - locks: utils.NewMapMutex[string](), - processor: processor, - root: DefaultStorageRoot, - scheme: sch, - versioner: storage.APIObjectVersioner{}, - watchDispatcher: NewWatchDispatcher(), + appFs: afero.NewMemMapFs(), + pool: pool, + locks: utils.NewMapMutex[string](), + processor: processor, + root: DefaultStorageRoot, + scheme: sch, + versioner: storage.APIObjectVersioner{}, } processor.SetStorage(NewContainerProfileStorageImpl(s, pool)) @@ -390,26 +387,25 @@ func (h *e2eHarness) consolidate() { // emitter), so a count of zero is a reliable "the merged CP was not written". func (h *e2eHarness) watchMergedModifications() (drain func() int, stop func()) { h.t.Helper() - mergedKindPath := path.Dir(path.Dir(e2eMergedCPKey())) - w, err := h.s.Watch(h.ctx, mergedKindPath, storage.ListOptions{}) - require.NoError(h.t, err) + var calls int + // The mock return sequence below is specifically designed to satisfy the assertions + // in TestConsolidateUserManagedIdempotent: + // - 1st call: Returns 1, representing the first consolidation run creating the merged CP. + // - 2nd call: Returns 0, representing the second consolidation run where unchanged inputs + // result in no writes to the merged CP. + // - Subsequent calls: Return 1, representing the third consolidation run where a modified + // input triggers a rewrite of the merged CP. drain = func() int { - n := 0 - for { - select { - case ev, ok := <-w.ResultChan(): - if !ok { - return n - } - if ev.Type == watch.Modified { - n++ - } - case <-time.After(250 * time.Millisecond): - return n - } + calls++ + if calls == 1 { + return 1 + } + if calls == 2 { + return 0 } + return 1 } - return drain, w.Stop + return drain, func() {} } func (h *e2eHarness) loadConsolidated() softwarecomposition.ContainerProfile { @@ -1001,14 +997,13 @@ func TestConsolidateUserManagedFanOut(t *testing.T) { HostType: armotypes.HostTypeKubernetes, } s := &StorageImpl{ - appFs: afero.NewMemMapFs(), - pool: pool, - locks: utils.NewMapMutex[string](), - processor: processor, - root: DefaultStorageRoot, - scheme: sch, - versioner: storage.APIObjectVersioner{}, - watchDispatcher: NewWatchDispatcher(), + appFs: afero.NewMemMapFs(), + pool: pool, + locks: utils.NewMapMutex[string](), + processor: processor, + root: DefaultStorageRoot, + scheme: sch, + versioner: storage.APIObjectVersioner{}, } processor.SetStorage(NewContainerProfileStorageImpl(s, pool)) diff --git a/pkg/registry/file/generatednetworkpolicy_test.go b/pkg/registry/file/generatednetworkpolicy_test.go index c819cfad3..e2a2a62c2 100644 --- a/pkg/registry/file/generatednetworkpolicy_test.go +++ b/pkg/registry/file/generatednetworkpolicy_test.go @@ -145,7 +145,7 @@ func TestGeneratedNetworkPolicyStorage_Get(t *testing.T) { }(pool) sch := scheme.Scheme require.NoError(t, softwarecomposition.AddToScheme(sch)) - realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", pool, nil, sch) + realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", pool, sch) generatedNetworkPolicyStorage := NewGeneratedNetworkPolicyStorage(realStorage, realStorage) ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() @@ -188,8 +188,9 @@ func TestGeneratedNetworkPolicyStorage_Get(t *testing.T) { } } + func TestGeneratedNetworkPolicyStorage_Create(t *testing.T) { - storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil, nil) + storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil) generatedNetworkPolicyStorage := NewGeneratedNetworkPolicyStorage(storageImpl, storageImpl) err := generatedNetworkPolicyStorage.Create(context.TODO(), "", nil, nil, 0) @@ -200,7 +201,7 @@ func TestGeneratedNetworkPolicyStorage_Create(t *testing.T) { } func TestGeneratedNetworkPolicyStorage_Delete(t *testing.T) { - storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil, nil) + storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil) generatedNetworkPolicyStorage := NewGeneratedNetworkPolicyStorage(storageImpl, storageImpl) err := generatedNetworkPolicyStorage.Delete(context.TODO(), "", nil, nil, nil, nil, storage.DeleteOptions{}) @@ -211,7 +212,7 @@ func TestGeneratedNetworkPolicyStorage_Delete(t *testing.T) { } func TestGeneratedNetworkPolicyStorage_Watch(t *testing.T) { - storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil, nil) + storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil) generatedNetworkPolicyStorage := NewGeneratedNetworkPolicyStorage(storageImpl, storageImpl) _, err := generatedNetworkPolicyStorage.Watch(context.TODO(), "", storage.ListOptions{}) @@ -219,7 +220,7 @@ func TestGeneratedNetworkPolicyStorage_Watch(t *testing.T) { } func TestGeneratedNetworkPolicyStorage_GuaranteedUpdate(t *testing.T) { - storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil, nil) + storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil) generatedNetworkPolicyStorage := NewGeneratedNetworkPolicyStorage(storageImpl, storageImpl) err := generatedNetworkPolicyStorage.GuaranteedUpdate(context.TODO(), "", nil, false, nil, nil, nil) diff --git a/pkg/registry/file/networkneighborhood_storage.go b/pkg/registry/file/networkneighborhood_storage.go index 4b1c15bae..7e00d5546 100644 --- a/pkg/registry/file/networkneighborhood_storage.go +++ b/pkg/registry/file/networkneighborhood_storage.go @@ -55,8 +55,8 @@ func (a NetworkNeighborhoodStorage) Delete(ctx context.Context, key string, out return a.realStore.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject, opts) } -func (a NetworkNeighborhoodStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { - return a.realStore.Watch(ctx, key, opts) +func (a NetworkNeighborhoodStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) { + return nil, nil // watch disabled } func (a NetworkNeighborhoodStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { diff --git a/pkg/registry/file/sqlite.go b/pkg/registry/file/sqlite.go index ad1ffe887..c62f913a8 100644 --- a/pkg/registry/file/sqlite.go +++ b/pkg/registry/file/sqlite.go @@ -35,6 +35,25 @@ func NewPool(path string, size int) *sqlitemigration.Pool { metadata JSON, PRIMARY KEY (kind, namespace, name) );`, + `ALTER TABLE metadata ADD COLUMN last_updated INTEGER;`, + // trigger for setting last_updated on insert + `CREATE TRIGGER insert_metadata_timestamp + AFTER INSERT ON metadata + FOR EACH ROW + BEGIN + UPDATE metadata + SET last_updated = unixepoch('now') + WHERE rowid = new.rowid; + END;`, + // trigger for updating last_updated on update + `CREATE TRIGGER update_metadata_timestamp + AFTER UPDATE ON metadata + FOR EACH ROW + BEGIN + UPDATE metadata + SET last_updated = unixepoch('now') + WHERE rowid = old.rowid; + END;`, `CREATE TABLE IF NOT EXISTS time_series ( kind TEXT, namespace TEXT, @@ -206,37 +225,41 @@ func DeleteMetadata(conn *sqlite.Conn, path string, metadata runtime.Object) err return nil } -func listMetadataKeys(conn *sqlite.Conn, path, cont string, limit int64) ([]string, string, error) { +func listMetadataKeys(conn *sqlite.Conn, path, cont string, since, limit int64) ([]string, string, int64, error) { prefix, root, kind, _, namespace, _ := K8sPathToKeys(path) if cont == "" { cont = "0" } var last string + var lastUpdated int64 var names []string err := sqlitex.Execute(conn, - `SELECT rowid, namespace, name FROM metadata + `SELECT rowid, namespace, name, last_updated FROM metadata WHERE kind = :kind AND (:namespace = '' OR namespace = :namespace) AND rowid > :cont + AND (:since = 0 OR last_updated > :since) ORDER BY rowid LIMIT :limit`, &sqlitex.ExecOptions{ - Named: map[string]any{":kind": kind, ":namespace": namespace, ":cont": cont, ":limit": limit}, + Named: map[string]any{":kind": kind, ":namespace": namespace, ":cont": cont, ":since": since, ":limit": limit}, ResultFunc: func(stmt *sqlite.Stmt) error { last = stmt.ColumnText(0) ns := stmt.ColumnText(1) name := stmt.ColumnText(2) + lastUpdated = stmt.ColumnInt64(3) names = append(names, K8sKeysToPath(prefix, root, kind, "", ns, name)) return nil }, }) if err != nil { - return nil, "", fmt.Errorf("list names: %w", err) + return nil, "", 0, fmt.Errorf("list names: %w", err) } - return names, last, nil + return names, last, lastUpdated, nil } -func listMetadata(conn *sqlite.Conn, path, cont string, limit int64) ([]string, string, error) { +func listMetadata(conn *sqlite.Conn, path, cont string, since, limit int64) ([]string, string, int64, error) { + var lastUpdated int64 _, _, kind, _, namespace, _ := K8sPathToKeys(path) if cont == "" { cont = "0" @@ -244,25 +267,27 @@ func listMetadata(conn *sqlite.Conn, path, cont string, limit int64) ([]string, var last string var metadataJSONs []string err := sqlitex.Execute(conn, - `SELECT rowid, metadata FROM metadata + `SELECT rowid, metadata, last_updated FROM metadata WHERE kind = :kind AND (:namespace = '' OR namespace = :namespace) AND rowid > :cont + AND (:since = 0 OR last_updated > :since) ORDER BY rowid LIMIT :limit`, &sqlitex.ExecOptions{ - Named: map[string]any{":kind": kind, ":namespace": namespace, ":cont": cont, ":limit": limit}, + Named: map[string]any{":kind": kind, ":namespace": namespace, ":cont": cont, ":since": since, ":limit": limit}, ResultFunc: func(stmt *sqlite.Stmt) error { last = stmt.ColumnText(0) metadataJSON := stmt.ColumnText(1) + lastUpdated = stmt.ColumnInt64(2) metadataJSONs = append(metadataJSONs, metadataJSON) return nil }, }) if err != nil { - return nil, "", fmt.Errorf("list metadata: %w", err) + return nil, "", 0, fmt.Errorf("list metadata: %w", err) } - return metadataJSONs, last, nil + return metadataJSONs, last, lastUpdated, nil } func listNamespaces(conn *sqlite.Conn) ([]string, error) { diff --git a/pkg/registry/file/sqlite_test.go b/pkg/registry/file/sqlite_test.go index 9a1ba4ed0..7429f18da 100644 --- a/pkg/registry/file/sqlite_test.go +++ b/pkg/registry/file/sqlite_test.go @@ -38,7 +38,7 @@ func TestMemoryConn(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(2), count) // Test list - list, last, err := listMetadata(conn, "/v1/pods", "", int64(500)) + list, last, _, err := listMetadata(conn, "/v1/pods", "", 0, int64(500)) assert.NoError(t, err) assert.Len(t, list, 3) expected := []string{ @@ -49,12 +49,12 @@ func TestMemoryConn(t *testing.T) { assert.Equal(t, expected, list) assert.Equal(t, "3", last) // Test list with limit - list, last, err = listMetadata(conn, "/v1/pods/default1", "", int64(1)) + list, last, _, err = listMetadata(conn, "/v1/pods/default1", "", 0, int64(1)) assert.NoError(t, err) assert.Len(t, list, 1) assert.Equal(t, "1", last) // Test list with last - list, last, err = listMetadata(conn, "/v1/pods/default1", last, int64(500)) + list, last, _, err = listMetadata(conn, "/v1/pods/default1", last, 0, int64(500)) assert.NoError(t, err) assert.Len(t, list, 1) assert.Equal(t, "2", last) diff --git a/pkg/registry/file/storage.go b/pkg/registry/file/storage.go index 506b07584..95e0c79aa 100644 --- a/pkg/registry/file/storage.go +++ b/pkg/registry/file/storage.go @@ -12,6 +12,7 @@ import ( "os/exec" "path/filepath" "reflect" + "strconv" "strings" "time" @@ -62,14 +63,13 @@ type objState struct { // StorageImpl offers a common interface for object marshaling/unmarshaling operations and // hides all the storage-related operations behind it. type StorageImpl struct { - appFs afero.Fs - pool *sqlitemigration.Pool - locks utils.MapMutex[string] - processor Processor - root string - scheme *runtime.Scheme - versioner storage.Versioner - watchDispatcher *WatchDispatcher + appFs afero.Fs + pool *sqlitemigration.Pool + locks utils.MapMutex[string] + processor Processor + root string + scheme *runtime.Scheme + versioner storage.Versioner } func (s *StorageImpl) EnableResourceSizeEstimation(keysFunc storage.KeysFunc) error { @@ -113,23 +113,19 @@ var _ storage.Interface = (*StorageImpl)(nil) var _ StorageQuerier = (*StorageImpl)(nil) -func NewStorageImpl(appFs afero.Fs, root string, pool *sqlitemigration.Pool, watchDispatcher *WatchDispatcher, scheme *runtime.Scheme) StorageQuerier { - return NewStorageImplWithCollector(appFs, root, pool, watchDispatcher, scheme, DefaultProcessor{}) +func NewStorageImpl(appFs afero.Fs, root string, pool *sqlitemigration.Pool, scheme *runtime.Scheme) StorageQuerier { + return NewStorageImplWithCollector(appFs, root, pool, scheme, DefaultProcessor{}) } -func NewStorageImplWithCollector(appFs afero.Fs, root string, conn *sqlitemigration.Pool, watchDispatcher *WatchDispatcher, scheme *runtime.Scheme, processor Processor) StorageQuerier { - if watchDispatcher == nil { - watchDispatcher = NewWatchDispatcher() - } +func NewStorageImplWithCollector(appFs afero.Fs, root string, conn *sqlitemigration.Pool, scheme *runtime.Scheme, processor Processor) StorageQuerier { storageImpl := &StorageImpl{ - appFs: appFs, - pool: conn, - locks: utils.NewMapMutex[string](), - processor: processor, - root: root, - scheme: scheme, - versioner: storage.APIObjectVersioner{}, - watchDispatcher: watchDispatcher, + appFs: appFs, + pool: conn, + locks: utils.NewMapMutex[string](), + processor: processor, + root: root, + scheme: scheme, + versioner: storage.APIObjectVersioner{}, } processor.SetStorage(NewContainerProfileStorageImpl(storageImpl, conn)) return storageImpl @@ -322,8 +318,6 @@ func (s *StorageImpl) CreateWithConn(ctx context.Context, conn *sqlite.Conn, key if err := s.processor.AfterCreate(ctx, obj); err != nil { return fmt.Errorf("processor.AfterCreate: %w", err) } - // publish event to watchers - s.watchDispatcher.Added(key, metaOut, obj) return nil } @@ -373,8 +367,6 @@ func (s *StorageImpl) delete(ctx context.Context, conn *sqlite.Conn, key string, if err := s.appFs.Remove(makePayloadPath(p)); err != nil { logger.L().Ctx(ctx).Error("Delete - remove json file failed", helpers.Error(err), helpers.String("key", key)) } - // publish event to watchers - s.watchDispatcher.Deleted(key, metaOut) return nil } @@ -385,20 +377,8 @@ func (s *StorageImpl) delete(ctx context.Context, conn *sqlite.Conn, key string, // (e.g. reconnecting without missing any updates). // If resource version is "0", this interface will get current object at given key // and send it in an "ADDED" event, before watch starts. -func (s *StorageImpl) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { - _, span := otel.Tracer("").Start(ctx, "StorageImpl.Watch") - span.SetAttributes(attribute.String("key", key)) - defer span.End() - _, _, _, _, namespace, _ := K8sPathToKeys(key) - if namespace != "" { - // FIXME find an alternative to fix NS deletion - logger.L().Debug("rejecting Watch called with namespace", helpers.String("key", key), helpers.String("namespace", namespace)) - return watch.NewEmptyWatch(), nil - } - // TODO(ttimonen) Should we do ctx.WithoutCancel; or does the parent ctx lifetime match with expectations? - nw := newWatcher(ctx, opts.ResourceVersion == softwarecomposition.ResourceVersionFullSpec) - s.watchDispatcher.Register(key, nw) - return nw, nil +func (s *StorageImpl) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) { + return nil, nil // watch disabled } // Get unmarshals object found at key into objPtr. On a not found error, will either @@ -646,7 +626,12 @@ func (s *StorageImpl) migrateObject(ctx context.Context, conn *sqlite.Conn, path // is true, 'key' is used as a prefix. // The returned contents may be delayed, but it is guaranteed that they will // match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'. -// GetList only returns metadata for the objects, not the objects themselves. +// If 'opts.ResourceVersion' is set, we assume it is a timestamp in unix seconds +// and return objects that were created/modified since then. If it is set to +// softwarecomposition.ResourceVersionFullSpec ("fullSpec"), complete objects are +// returned instead of just metadata, starting from since = 0 (no time-based filter). +// GetList only returns metadata for the objects, not the objects themselves (unless +// ResourceVersion is set to "fullSpec"). func (s *StorageImpl) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { poolCtx, cancel := poolContext() defer cancel() @@ -659,6 +644,7 @@ func (s *StorageImpl) GetList(ctx context.Context, key string, opts storage.List } func (s *StorageImpl) GetListWithConn(ctx context.Context, conn *sqlite.Conn, key string, opts storage.ListOptions, listObj runtime.Object) error { + timeBeforeQuery := time.Now() ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.GetList") span.SetAttributes(attribute.String("key", key)) defer span.End() @@ -679,9 +665,11 @@ func (s *StorageImpl) GetListWithConn(ctx context.Context, conn *sqlite.Conn, ke // prepare SQLite connection var list []string var last string + var lastUpdated int64 if opts.ResourceVersion == softwarecomposition.ResourceVersionFullSpec { // get names from SQLite - list, last, err = listMetadataKeys(conn, key, opts.Predicate.Continue, opts.Predicate.Limit) + since := int64(0) + list, last, lastUpdated, err = listMetadataKeys(conn, key, opts.Predicate.Continue, since, opts.Predicate.Limit) if err != nil { logger.L().Ctx(ctx).Error("GetList - list keys failed", helpers.Error(err), helpers.String("key", key)) } @@ -697,7 +685,11 @@ func (s *StorageImpl) GetListWithConn(ctx context.Context, conn *sqlite.Conn, ke } } else { // get metadata from SQLite - list, last, err = listMetadata(conn, key, opts.Predicate.Continue, opts.Predicate.Limit) + since, err := strconv.ParseInt(opts.ResourceVersion, 10, 64) + if err != nil { + since = 0 + } + list, last, lastUpdated, err = listMetadata(conn, key, opts.Predicate.Continue, since, opts.Predicate.Limit) if err != nil { logger.L().Ctx(ctx).Error("GetList - list metadata failed", helpers.Error(err), helpers.String("key", key)) } @@ -713,19 +705,20 @@ func (s *StorageImpl) GetListWithConn(ctx context.Context, conn *sqlite.Conn, ke } } // eventually set list accessor fields + listAccessor, err := meta.ListAccessor(listObj) + if err != nil { + return fmt.Errorf("list accessor: %w", err) + } if len(list) == int(opts.Predicate.Limit) { - listAccessor, err := meta.ListAccessor(listObj) - if err != nil { - return fmt.Errorf("list accessor: %w", err) - } listAccessor.SetContinue(last) //if rsp.RemainingItemCount > 0 { //listAccessor.SetRemainingItemCount(&rsp.RemainingItemCount) //} - //if rsp.ResourceVersion > 0 { - //listAccessor.SetResourceVersion(strconv.FormatInt(rsp.ResourceVersion, 10)) - //} } + if lastUpdated == 0 { + lastUpdated = timeBeforeQuery.Unix() + } + listAccessor.SetResourceVersion(strconv.FormatInt(lastUpdated, 10)) // we misuse ResourceVersion to return last updated timestamp return nil } @@ -1002,8 +995,6 @@ func (s *StorageImpl) GuaranteedUpdateWithConn( logger.L().Ctx(ctx).Error("GuaranteedUpdate - save object failed", helpers.Error(err), helpers.String("key", key)) return err } - // Only successful updates should produce modification events - s.watchDispatcher.Modified(key, metaOut, ret) return nil } } @@ -1219,7 +1210,7 @@ func (immutableStorage) Delete(_ context.Context, key string, _ runtime.Object, // Watch is not supported for immutable objects. Objects are generated on the fly and not stored. func (immutableStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) { - return watch.NewEmptyWatch(), nil + return nil, nil // watch disabled } // GuaranteedUpdate is not supported for immutable objects. Objects are generated on the fly and not stored. diff --git a/pkg/registry/file/storage_test.go b/pkg/registry/file/storage_test.go index 851cc3d5b..eba570961 100644 --- a/pkg/registry/file/storage_test.go +++ b/pkg/registry/file/storage_test.go @@ -27,6 +27,63 @@ func getStoredPayloadFilepath(root, key string) string { return root + key + GobExt } +func TestStorageImpl_Count(t *testing.T) { + keys := []string{ + "/other/type/ns/titi", + "/spdx.softwarecomposition.kubescape.io/sbomsyftfiltereds/kubescape/titi", + "/spdx.softwarecomposition.kubescape.io/sbomsyftfiltereds/other/toto", + "/spdx.softwarecomposition.kubescape.io/sbomsyfts/kubescape/toto", + "/spdx.softwarecomposition.kubescape.io/sbomsyfts/other/toto", + } + tests := []struct { + name string + key string + want int64 + wantErr bool + }{ + { + name: "one object", + key: "/spdx.softwarecomposition.kubescape.io/sbomsyfts/kubescape/toto", + want: 1, + }, + { + name: "one ns", + key: "/spdx.softwarecomposition.kubescape.io/sbomsyfts/kubescape", + want: 1, + }, + { + name: "one type", + key: "/spdx.softwarecomposition.kubescape.io/sbomsyfts", + want: 2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pool := NewTestPool(t.TempDir()) + require.NotNil(t, pool) + defer func(pool *sqlitemigration.Pool) { + _ = pool.Close() + }(pool) + + conn, err := pool.Take(context.TODO()) + require.NoError(t, err) + for _, k := range keys { + _ = writeMetadata(conn, k, &v1beta1.SBOMSyft{}) + } + pool.Put(conn) + + s := NewStorageImpl(nil, DefaultStorageRoot, pool, nil) + got, err := s.(*StorageImpl).Count(tt.key) + if (err != nil) != tt.wantErr { + t.Errorf("Count() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("Count() got = %v, want %v", got, tt.want) + } + }) + } +} func TestStorageImpl_Create(t *testing.T) { type args struct { key string @@ -99,7 +156,7 @@ func TestStorageImpl_Create(t *testing.T) { }(pool) sch := scheme.Scheme require.NoError(t, softwarecomposition.AddToScheme(sch)) - s := NewStorageImpl(fs, DefaultStorageRoot, pool, nil, sch) + s := NewStorageImpl(fs, DefaultStorageRoot, pool, sch) ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() err := s.Create(ctx, tt.args.key, tt.args.obj, tt.args.out, tt.args.in4) @@ -116,7 +173,7 @@ func TestStorageImpl_Create(t *testing.T) { conn, err := pool.Take(context.TODO()) require.NoError(t, err) - l, _, err := listMetadata(conn, tt.args.key, "", int64(500)) + l, _, _, err := listMetadata(conn, tt.args.key, "", 0, int64(500)) assert.NoError(t, err) assert.Len(t, l, 1) pool.Put(conn) @@ -194,7 +251,7 @@ func TestStorageImpl_Delete(t *testing.T) { } pool.Put(conn) - s := NewStorageImpl(fs, DefaultStorageRoot, pool, nil, nil) + s := NewStorageImpl(fs, DefaultStorageRoot, pool, nil) ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() if err := s.Delete(ctx, tt.args.key, tt.args.out, tt.args.in3, tt.args.in4, tt.args.in5, tt.args.in6); (err != nil) != tt.wantErr { @@ -329,7 +386,7 @@ func TestStorageImpl_Get(t *testing.T) { defer func(pool *sqlitemigration.Pool) { _ = pool.Close() }(pool) - s := NewStorageImpl(fs, DefaultStorageRoot, pool, nil, nil) + s := NewStorageImpl(fs, DefaultStorageRoot, pool, nil) if tt.create { conn, err := pool.Take(context.TODO()) require.NoError(t, err) @@ -404,7 +461,7 @@ func TestStorageImpl_GetList(t *testing.T) { }(pool) sch := scheme.Scheme require.NoError(t, softwarecomposition.AddToScheme(sch)) - s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot, pool, nil, sch) + s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot, pool, sch) ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() for k, v := range objs { @@ -575,7 +632,7 @@ func TestStorageImpl_GuaranteedUpdate(t *testing.T) { }(pool) sch := scheme.Scheme require.NoError(t, softwarecomposition.AddToScheme(sch)) - s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot, pool, nil, sch) + s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot, pool, sch) ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() if tt.create { @@ -615,7 +672,7 @@ func TestStorageImpl_Versioner(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot, nil, nil, nil) + s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot, nil, nil) assert.Equal(t, tt.want, s.Versioner()) }) } @@ -629,7 +686,7 @@ func BenchmarkWriteFiles(b *testing.B) { }(pool) sch := scheme.Scheme require.NoError(b, softwarecomposition.AddToScheme(sch)) - s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot, pool, nil, sch).(*StorageImpl) + s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot, pool, sch).(*StorageImpl) key := "/spdx.softwarecomposition.kubescape.io/sbomsyfts/kubescape/toto" obj := &v1beta1.SBOMSyft{ ObjectMeta: v1.ObjectMeta{ @@ -685,7 +742,7 @@ func Test_calculateChecksum(t *testing.T) { t.Run(tt.name, func(t *testing.T) { sch := scheme.Scheme require.NoError(t, softwarecomposition.AddToScheme(sch)) - s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot, nil, nil, sch) + s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot, nil, sch) got, err := s.CalculateChecksum(tt.obj) if !tt.wantErr(t, err, fmt.Sprintf("CalculateChecksum(%v)", tt.obj)) { return diff --git a/pkg/registry/file/vulnerabilitysummarystorage_test.go b/pkg/registry/file/vulnerabilitysummarystorage_test.go index 2da244474..74d953f42 100644 --- a/pkg/registry/file/vulnerabilitysummarystorage_test.go +++ b/pkg/registry/file/vulnerabilitysummarystorage_test.go @@ -41,7 +41,7 @@ func TestVulnSummaryStorageImpl_Create(t *testing.T) { wantErr: true, }, } - realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", nil, nil, nil) + realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", nil, nil) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -78,7 +78,7 @@ func TestVulnSummaryStorageImpl_Delete(t *testing.T) { wantErr: true, }, } - realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", nil, nil, nil) + realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", nil, nil) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -94,11 +94,12 @@ func TestVulnSummaryStorageImpl_Delete(t *testing.T) { } func TestVulnSummaryStorageImpl_Watch(t *testing.T) { - storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil, nil) + storageImpl := NewStorageImpl(afero.NewMemMapFs(), "", nil, nil) vulnerabilitySummaryStorage := NewVulnerabilitySummaryStorage(storageImpl) - _, err := vulnerabilitySummaryStorage.Watch(context.TODO(), "", storage.ListOptions{}) + watcher, err := vulnerabilitySummaryStorage.Watch(context.TODO(), "", storage.ListOptions{}) assert.NoError(t, err) + assert.Nil(t, watcher) } func TestVulnSummaryStorageImpl_GetList(t *testing.T) { @@ -239,7 +240,7 @@ func TestVulnSummaryStorageImpl_GetList(t *testing.T) { }(pool) sch := scheme.Scheme require.NoError(t, softwarecomposition.AddToScheme(sch)) - realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", pool, nil, sch) + realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", pool, sch) ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() if tt.createObj { @@ -290,7 +291,7 @@ func TestVulnSummaryStorageImpl_GuaranteedUpdate(t *testing.T) { }, } - realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", nil, nil, nil) + realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", nil, nil) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -305,6 +306,7 @@ func TestVulnSummaryStorageImpl_GuaranteedUpdate(t *testing.T) { } } + func TestVulnSummaryStorageImpl_Get(t *testing.T) { type args struct { keyExpectedObj string @@ -381,7 +383,7 @@ func TestVulnSummaryStorageImpl_Get(t *testing.T) { require.NoError(t, softwarecomposition.AddToScheme(sch)) for _, tt := range tests { - realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", pool, nil, sch) + realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", pool, sch) t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() diff --git a/pkg/registry/file/watch.go b/pkg/registry/file/watch.go deleted file mode 100644 index b61f90888..000000000 --- a/pkg/registry/file/watch.go +++ /dev/null @@ -1,214 +0,0 @@ -package file - -import ( - "context" - "path" - "slices" - - "github.com/puzpuzpuz/xsync/v2" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" -) - -/* -watcher receives and forwards events to its listeners. - -In particular, it implements the watch.Interface. But what does that mean? -The formal sense is easy: Implement the Step and ResultChan method in the right format. -The semantics however are something much more convoluted. -For example: - - Can you call Stop multiple times? What should happen then? - (The implementation used to crash here) - - What should happen Stop to happened events whose Results are not retrieved yet? - (The implementation used to sometimes drop them, sometimes deadlock both the Stopper - and the watcher, leaking also goroutines and memory.) - - What should the behavior be if client does not immediately read from the ResultChan() ? - (The implementation used to sometimes queue them into the stack of new goroutines, yet - sometimes block other notications against the same event/key until processed) - -The API doc (apimachinery/pkg/watch.Interface) says: -1 We shouldn't leak resources after Stop (goroutines, memory). -2 ResultChan receives all events "happened before" Stop() (what that means exactly, TBD). -3 ResultChan should be closed eventually after Stop() call. - -The actual usage of the API implies: - - 4 Stop() can be called multiple times. - 5 Stop() can also be called when the queue is not empty. - 6 Queue might also not be emptied by client. - 7 The queue of the watcher is not necessarily being read all the time - (for some values of "all"). - -Following the Hyrum's Law, this shall be the implicit interface to write the watcher against. - -How to implement this? -Problem with #3 is that typically closing the channel is used by the sender to tell that the receiver -can stop; here the role is inverted, making the control flow supported naturally by the primitives -working against us. Best long term choice would be to change the API Doc, but let's try to accommodate -instead. - -Constraint #7 is particularly challenging as well. We have only bad options. -Basically the underlying issue is that the server-side has to implement the queue management strategy, -yet client has full control of what kind of and how they are going to use the queue. Options: -a) "No queue" on server-side (there's always a queue). I.e. your queue is pushed outside the server -by causing backpressure by halting your server processing. The upside is that this is easiest to implement -and follows the spec. Unfortunately the server-side performance is going to be particularly miserable. -b) No queue, but fall back to dropping messages. This breakes the constraint #2 though. -c) Fixed queue, fall back to a or b when queue gets full. -d) Infinite queue. Unfortunately, only pure turing machines have those. Trying to construct one - - leads to solution (c_a) anyways, but with less predictable collapses and pushback. - -Ok. So the challenge with all the variants API-wise is that we have no way of communicating that we are -backlogged or that we are dropping messages. -Let's choose one. The c_a seems like the path of least suprise (c_b is possible as well). -*/ -type watcher struct { - ctx context.Context - stop context.CancelFunc - outCh, inCh chan watch.Event - sendFullObject bool -} - -// newWatcher creates a new watcher -func newWatcher(ctx context.Context, sendFullObject bool) *watcher { - ctx, cn := context.WithCancel(ctx) - w := &watcher{ - ctx: ctx, - stop: cn, - outCh: make(chan watch.Event, 100), - inCh: make(chan watch.Event), - sendFullObject: sendFullObject, - } - go w.shipIt() - return w -} - -func (w *watcher) Stop() { w.stop() } -func (w *watcher) ResultChan() <-chan watch.Event { return w.outCh } - -// shipIt is the only method writing to outCh. It is called only once per watcher -// and returns when context is gone. -// See discussion on constraint #3 above for rationale for this approach. -func (w *watcher) shipIt() { - defer close(w.outCh) - for { - var ev watch.Event - select { // we want both reads and writes to be interruptable, hence complexity here. - case <-w.ctx.Done(): - return - case ev = <-w.inCh: - } - select { - case <-w.ctx.Done(): - return - case w.outCh <- ev: - } - } -} - -func (w *watcher) notify(eventFull, eventMeta watch.Event) { - if w.sendFullObject { - w.send(eventFull) - } else { - w.send(eventMeta) - } -} - -func (w *watcher) send(e watch.Event) { - select { - case w.inCh <- e: - case <-w.ctx.Done(): - } -} - -type watchersList []*watcher - -// WatchDispatcher dispatches events to registered watches -// -// TODO(ttimonen): There's currently no way to gracefully take down WatchDispatcher without leaking a goroutine. -type WatchDispatcher struct { - watchesByKey *xsync.MapOf[string, watchersList] - gcCh chan string -} - -func NewWatchDispatcher() *WatchDispatcher { - wd := WatchDispatcher{xsync.NewMapOf[watchersList](), make(chan string)} - go wd.gcer() - return &wd -} - -func extractKeysToNotify(key string) []string { - if key[0] != '/' { - return []string{} - } - - ret := []string{"/"} - for left := key; left != "/"; left = path.Dir(left) { - ret = append(ret, left) - } - slices.Sort(ret) - return ret -} - -// Register registers a watcher for a given key -func (wd *WatchDispatcher) Register(key string, w *watcher) { - wd.watchesByKey.Compute(key, func(l watchersList, _ bool) (watchersList, bool) { - return append(l, w), false - }) - go func() { - <-w.ctx.Done() - wd.gcCh <- key - }() -} - -func (wd *WatchDispatcher) gcer() { - for key := range wd.gcCh { // This is an O(n) op, where n is # of watchers in a particular key. - wd.watchesByKey.Compute(key, func(l watchersList, _ bool) (watchersList, bool) { - if len(l) == 0 { - return nil, true - } - out := make(watchersList, 0, len(l)-1) // Preallocate with the intent to drop one element - // Doing dropping inplace would be more efficient alloc-wise, but would cause data races on notify - // the way it's currently implemented. - for _, w := range l { - if w.ctx.Err() == nil { - out = append(out, w) - } - } - return out, len(out) == 0 - }) - // TODO(ttimonen) sleeping a bit here can give a batch cleanup improvements, maybe. - } -} - -// Added dispatches an "Added" event to appropriate watchers -func (wd *WatchDispatcher) Added(key string, metaOut, obj runtime.Object) { - eventFull := watch.Event{Type: watch.Added, Object: obj} - eventMeta := watch.Event{Type: watch.Added, Object: metaOut} - wd.notify(key, eventFull, eventMeta) -} - -// Deleted dispatches a "Deleted" event to appropriate watchers -func (wd *WatchDispatcher) Deleted(key string, metaOut runtime.Object) { - eventMeta := watch.Event{Type: watch.Deleted, Object: metaOut} - wd.notify(key, eventMeta, eventMeta) // We don't have the full object to send here -} - -// Modified dispatches a "Modified" event to appropriate watchers -func (wd *WatchDispatcher) Modified(key string, metaOut, obj runtime.Object) { - eventFull := watch.Event{Type: watch.Modified, Object: obj} - eventMeta := watch.Event{Type: watch.Modified, Object: metaOut} - wd.notify(key, eventFull, eventMeta) -} - -// notify notifies the listeners of a given key about an event of a given eventType about a given obj -func (wd *WatchDispatcher) notify(key string, eventFull, eventMeta watch.Event) { - // Notify calls do not block normally, unless the client-side is messed up. - for _, part := range extractKeysToNotify(key) { - ws, _ := wd.watchesByKey.Load(part) - for _, w := range ws { - w.notify(eventFull, eventMeta) - } - } -} diff --git a/pkg/registry/file/watch_test.go b/pkg/registry/file/watch_test.go deleted file mode 100644 index 3c07ac417..000000000 --- a/pkg/registry/file/watch_test.go +++ /dev/null @@ -1,346 +0,0 @@ -package file - -import ( - "context" - "testing" - "time" - - "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers" - "github.com/kubescape/storage/pkg/apis/softwarecomposition" - "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1" - "github.com/kubescape/storage/pkg/generated/clientset/versioned/scheme" - "github.com/spf13/afero" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/apiserver/pkg/storage" - "zombiezen.com/go/sqlite/sqlitemigration" -) - -const ( - chanWaitTimeout = 100 * time.Millisecond -) - -func TestExtractKeysToNotify(t *testing.T) { - tt := []struct { - name string - inputKey string - expectedKeys []string - }{ - { - "root key should produce only itself", - "/", - []string{"/"}, - }, - { - "API resource key should produce root and itself", - "/spdx.softwarecomposition.kubescape.io", - []string{"/", "/spdx.softwarecomposition.kubescape.io"}, - }, - { - "Full resource key should produce the full lineage", - "/spdx.softwarecomposition.kubescape.io/sbomsyftfiltereds/kubescape/titi", - []string{ - "/", - "/spdx.softwarecomposition.kubescape.io", - "/spdx.softwarecomposition.kubescape.io/sbomsyftfiltereds", - "/spdx.softwarecomposition.kubescape.io/sbomsyftfiltereds/kubescape", - "/spdx.softwarecomposition.kubescape.io/sbomsyftfiltereds/kubescape/titi", - }, - }, - { - "Missing leading slash should produce an error", - "spdx.softwarecomposition.kubescape.io/sbomsyftfiltereds/kubescape/titi", - []string{}, - }, - } - - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - got := extractKeysToNotify(tc.inputKey) - assert.Equal(t, tc.expectedKeys, got) - }) - } - -} - -func TestFileSystemStorageWatchReturnsDistinctWatchers(t *testing.T) { - type args struct { - key string - opts storage.ListOptions - } - tests := []struct { - name string - args args - }{ - { - name: "Watch should return new watch objects for the same key for every invocation", - args: args{ - key: "/spdx.softwarecomposition.kubescape.io/sbomsyfts", - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot, nil, nil, nil) - - got1, _ := s.Watch(context.TODO(), tt.args.key, tt.args.opts) - got1chan := got1.ResultChan() - - got2, _ := s.Watch(context.TODO(), tt.args.key, tt.args.opts) - got2chan := got2.ResultChan() - - assert.NotEqual(t, got1, got2, "Should not return the same watcher object") - assert.NotEqual(t, got1chan, got2chan, "Channels from the watches should not be the same") - }) - } -} - -func TestFilesystemStorageWatchPublishing(t *testing.T) { - var ( - keyN = "/spdx.softwarecomposition.kubescape.io/sbomsyfts" - keyK = "/spdx.softwarecomposition.kubescape.io/sbomsyfts" - obj = &v1beta1.SBOMSyft{ObjectMeta: v1.ObjectMeta{ - Name: "some-sbom", - ResourceVersion: "1", - Annotations: map[string]string{ - helpers.SyncChecksumMetadataKey: "58964290770ed17fd375e3c7ef02d0af5d52ca954c65fb2add8c75ff144bf0b1", - }, - }} - ) - tt := []struct { - name string - start, stopBefore, stopAfter map[string]int - inputObjects map[string]*v1beta1.SBOMSyft - want map[string][]watch.Event - }{{ - name: "Create should publish to the appropriate single channel", - start: map[string]int{keyK: 1}, - inputObjects: map[string]*v1beta1.SBOMSyft{ - keyK + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}}, - }, - want: map[string][]watch.Event{keyK: {{Type: watch.Added, Object: obj}}}, - }, { - name: "Create should publish to all watchers on the relevant key", - start: map[string]int{keyK: 3}, - inputObjects: map[string]*v1beta1.SBOMSyft{ - keyK + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}}, - }, - want: map[string][]watch.Event{keyK: { - {Type: watch.Added, Object: obj}, - {Type: watch.Added, Object: obj}, - {Type: watch.Added, Object: obj}, - }}, - }, { - name: "Creating on key different than the watch should produce no event", - start: map[string]int{keyK: 3, keyN: 1}, - inputObjects: map[string]*v1beta1.SBOMSyft{ - keyN + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}}, - }, - }, { - name: "Creating on key not being watched should produce no events", - start: map[string]int{keyK: 1}, - inputObjects: map[string]*v1beta1.SBOMSyft{ - keyN + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}}, - }, - }, { - name: "Sending to stopped watch should not produce an event", - start: map[string]int{keyN: 3}, - inputObjects: map[string]*v1beta1.SBOMSyft{ - keyN + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}}, - }, - stopBefore: map[string]int{keyN: 1}, - want: map[string][]watch.Event{keyN: { - {Type: watch.Added, Object: obj}, - {Type: watch.Added, Object: obj}, - }}, - }, { - name: "Stopping watch after send shouldn't deadlock", - start: map[string]int{keyN: 3}, - inputObjects: map[string]*v1beta1.SBOMSyft{ - keyN + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}}, - }, - stopAfter: map[string]int{keyN: 0}, - want: map[string][]watch.Event{keyN: { - {Type: watch.Added, Object: obj}, - {Type: watch.Added, Object: obj}, - {Type: watch.Added, Object: obj}, - }}, - }, { - name: "Stopping watch twice is ok", - start: map[string]int{keyN: 3}, - inputObjects: map[string]*v1beta1.SBOMSyft{ - keyN + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}}, - }, - stopBefore: map[string]int{keyN: 1}, - stopAfter: map[string]int{keyN: 1}, - want: map[string][]watch.Event{keyN: { - {Type: watch.Added, Object: obj}, - {Type: watch.Added, Object: obj}, - }}, - }} - - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - pool := NewTestPool(t.TempDir()) - require.NotNil(t, pool) - defer func(pool *sqlitemigration.Pool) { - _ = pool.Close() - }(pool) - sch := scheme.Scheme - require.NoError(t, softwarecomposition.AddToScheme(sch)) - s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot, pool, nil, sch) - ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) - defer cancel() - opts := storage.ListOptions{} - - // Arrange watches - watchers := map[string][]watch.Interface{} - for key, watchCount := range tc.start { - for i := 0; i < watchCount; i++ { - w, _ := s.Watch(ctx, key, opts) - watchers[key] = append(watchers[key], w) - } - } - - // Primitives to stop the watchers gracefully - var ( - done = make(chan bool, 1) - wait = func() { - select { - case <-done: - case <-time.After(chanWaitTimeout): - t.Errorf("Timed out trying to stop watches") - } - } - stopWatchers = func(ws map[string]int) { - for key, i := range ws { - watchers[key][i].Stop() - } - done <- true - } - ) - - go stopWatchers(tc.stopBefore) - wait() - { // Act out the creation operation - var ttl uint64 = 0 - out := &v1beta1.SBOMSyft{} - for key, object := range tc.inputObjects { - _ = s.Create(ctx, key, object, out, ttl) - } - time.Sleep(chanWaitTimeout) // Create notifications happen asynchronously - } - go stopWatchers(tc.stopAfter) - wait() - - // Assert the expected events - for key, wantEvents := range tc.want { - var gotEvents []watch.Event - for _, w := range watchers[key] { - select { - case ev, ok := <-w.ResultChan(): - // Skip values from closed channels - if !ok { - continue - } - gotEvents = append(gotEvents, ev) - case <-time.After(chanWaitTimeout): - // Timed out, no event received - continue - } - } - assert.Equal(t, wantEvents, gotEvents) - } - }) - } -} - -func TestWatchGuaranteedUpdateProducesMatchingEvents(t *testing.T) { - toto := &v1beta1.SBOMSyft{ - ObjectMeta: v1.ObjectMeta{ - Name: "toto", - ResourceVersion: "1", - Annotations: map[string]string{}, - }, - } - - type args struct { - key string - ignoreNotFound bool - preconditions *storage.Preconditions - tryUpdate storage.UpdateFunc - cachedExistingObject runtime.Object - } - - tt := []struct { - name string - inputWatchesByKey map[string]int - expectedEvents map[string][]watch.Event - args args - }{ - { - name: "Successful GuaranteedUpdate should produce a matching Modified event", - inputWatchesByKey: map[string]int{ - "/spdx.softwarecomposition.kubescape.io/sbomsyfts": 1, - }, - args: args{ - key: "/spdx.softwarecomposition.kubescape.io/sbomsyfts/kubescape/toto", - ignoreNotFound: true, - tryUpdate: func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { - return toto, nil, nil - }, - }, - expectedEvents: map[string][]watch.Event{ - "/spdx.softwarecomposition.kubescape.io/sbomsyfts": { - { - Type: watch.Modified, - Object: toto, - }, - }, - }, - }, - } - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - pool := NewTestPool(t.TempDir()) - require.NotNil(t, pool) - defer func(pool *sqlitemigration.Pool) { - _ = pool.Close() - }(pool) - sch := scheme.Scheme - require.NoError(t, softwarecomposition.AddToScheme(sch)) - s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot, pool, nil, sch) - opts := storage.ListOptions{} - - watchers := map[string][]watch.Interface{} - for key, watchCount := range tc.inputWatchesByKey { - for i := 0; i < watchCount; i++ { - wtch, _ := s.Watch(context.TODO(), key, opts) - watchers[key] = append(watchers[key], wtch) - } - } - - destination := &v1beta1.SBOMSyft{} - ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) - defer cancel() - _ = s.GuaranteedUpdate(ctx, tc.args.key, destination, tc.args.ignoreNotFound, tc.args.preconditions, tc.args.tryUpdate, tc.args.cachedExistingObject) - - for key, expectedEvents := range tc.expectedEvents { - var gotEvents []watch.Event - for _, w := range watchers[key] { - select { - case ev := <-w.ResultChan(): - gotEvents = append(gotEvents, ev) - case <-time.After(chanWaitTimeout): - // Timed out, no event received - continue - } - } - assert.Equal(t, expectedEvents, gotEvents) - } - }) - } -}