From da6dde6352cbc667ffea8122d0c0af870c7fcd13 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Tue, 16 Jun 2026 15:41:26 +0200 Subject: [PATCH] fix: clean up expired/deleted containerprofile time series database entries Signed-off-by: Matthias Bertschy --- pkg/registry/file/cleanup.go | 5 +- .../file/containerprofile_processor.go | 50 +++++++----- .../file/containerprofile_processor_test.go | 79 +++++++++++++++++-- pkg/registry/file/containerprofile_storage.go | 12 +++ .../containerprofile_user_managed_test.go | 17 ++-- pkg/registry/file/sqlite.go | 1 + pkg/registry/file/storage.go | 8 ++ pkg/registry/file/utils.go | 14 ++-- 8 files changed, 150 insertions(+), 36 deletions(-) diff --git a/pkg/registry/file/cleanup.go b/pkg/registry/file/cleanup.go index 3c0122fe1..db6399962 100644 --- a/pkg/registry/file/cleanup.go +++ b/pkg/registry/file/cleanup.go @@ -197,7 +197,10 @@ func (h *ResourcesCleanupHandler) cleanupNamespace(ctx context.Context, ns strin logger.L().Debug("deleting", helpers.String("kind", resourceKind), helpers.String("namespace", metadata.Namespace), helpers.String("name", metadata.Name)) h.deleteFunc(h.appFs, path) - metaOut := h.deleteMetadata(conn, path) + metaOut, err := h.deleteMetadata(conn, path) + if err != nil { + return fmt.Errorf("failed to delete metadata: %w", err) + } if h.watchDispatcher != nil { key := path[len(h.root) : len(path)-len(GobExt)] h.watchDispatcher.Deleted(key, metaOut) diff --git a/pkg/registry/file/containerprofile_processor.go b/pkg/registry/file/containerprofile_processor.go index f08920afd..0017d8f4a 100644 --- a/pkg/registry/file/containerprofile_processor.go +++ b/pkg/registry/file/containerprofile_processor.go @@ -668,6 +668,35 @@ func (a *ContainerProfileProcessor) consolidateContinuousTimeSeries( func (a *ContainerProfileProcessor) updateProfileStatus(ctx context.Context, key, seriesID string, profile *softwarecomposition.ContainerProfile, newTimeSeries []softwarecomposition.TimeSeriesContainers, expired bool) ([]softwarecomposition.TimeSeriesContainers, bool, error) { + // If the time series is expired, we finalize it as Completed/Partial (unless it is already Completed/Full) + // and clear the time series data so we don't leak zombie records. + if expired { + // Try to mark it as Completed/Full if we actually have a Completed status and the series is continuous + var isFull bool + if len(newTimeSeries) == 1 && isZeroTime(newTimeSeries[0].PreviousReportTimestamp) && newTimeSeries[0].Status == helpers.Completed { + isFull = profile.SetCompletedStatus(newTimeSeries[0]) + } + + if isFull { + logger.L().Debug("ContainerProfileProcessor.updateProfileStatus - expired profile is completed/full, skipping further processing", + loggerhelpers.String("key", key), loggerhelpers.String("seriesID", seriesID)) + + // Remove all time series data + if err := a.ContainerProfileStorage.DeleteTimeSeriesContainerEntries(ctx, key); err != nil { + return newTimeSeries, false, fmt.Errorf("failed to delete time series data: %w", err) + } + return newTimeSeries[:0], true, nil + } + + // Otherwise, mark it as Completed/Partial (unless already Completed/Full) + if profile.Annotations[helpers.StatusMetadataKey] != helpers.Completed || profile.Annotations[helpers.CompletionMetadataKey] != helpers.Full { + profile.Annotations[helpers.StatusMetadataKey] = helpers.Completed + profile.Annotations[helpers.CompletionMetadataKey] = helpers.Partial + } + return newTimeSeries[:0], false, nil + } + + // Normal active (non-expired) flow below: // An aggregated series is removed only if it has one element, no previous report timestamp, and is completed or failed if len(newTimeSeries) != 1 || !isZeroTime(newTimeSeries[0].PreviousReportTimestamp) { profile.SetLearningStatus(newTimeSeries[0]) // series is missing some TS entries @@ -685,15 +714,7 @@ func (a *ContainerProfileProcessor) updateProfileStatus(ctx context.Context, key if err := a.ContainerProfileStorage.DeleteTimeSeriesContainerEntries(ctx, key); err != nil { return newTimeSeries, false, fmt.Errorf("failed to delete time series data: %w", err) } - return newTimeSeries, true, nil // skip further processing - } - // If this is an expired time series, mark it as partial completed instead of clearing it - if expired { - // Safeguard: never change a completed full profile - if profile.Annotations[helpers.StatusMetadataKey] != helpers.Completed || profile.Annotations[helpers.CompletionMetadataKey] != helpers.Full { - profile.Annotations[helpers.StatusMetadataKey] = helpers.Completed - profile.Annotations[helpers.CompletionMetadataKey] = helpers.Partial - } + return newTimeSeries[:0], true, nil } // Clear this time series as it is finished newTimeSeries = newTimeSeries[:0] @@ -704,16 +725,7 @@ func (a *ContainerProfileProcessor) updateProfileStatus(ctx context.Context, key newTimeSeries = newTimeSeries[:0] default: - // If this is an expired time series, mark it as partial completed - if expired { - // Safeguard: never change a completed full profile - if profile.Annotations[helpers.StatusMetadataKey] != helpers.Completed || profile.Annotations[helpers.CompletionMetadataKey] != helpers.Full { - profile.Annotations[helpers.StatusMetadataKey] = helpers.Completed - profile.Annotations[helpers.CompletionMetadataKey] = helpers.Partial - } - } else { - profile.SetLearningStatus(newTimeSeries[0]) // series is complete but not finished - } + profile.SetLearningStatus(newTimeSeries[0]) // series is complete but not finished } return newTimeSeries, false, nil diff --git a/pkg/registry/file/containerprofile_processor_test.go b/pkg/registry/file/containerprofile_processor_test.go index 83490929e..4437e16b1 100644 --- a/pkg/registry/file/containerprofile_processor_test.go +++ b/pkg/registry/file/containerprofile_processor_test.go @@ -27,9 +27,7 @@ func TestConsolidateData(t *testing.T) { defer func(pool *sqlitemigration.Pool) { _ = pool.Close() }(pool) - conn, err := pool.Take(context.TODO()) - require.NoError(t, err) - + var err error sch := scheme.Scheme require.NoError(t, softwarecomposition.AddToScheme(sch)) processor := ContainerProfileProcessor{ @@ -82,6 +80,10 @@ func TestConsolidateData(t *testing.T) { err = processor.ConsolidateTimeSeries(ctx) assert.NoError(t, err) + conn, err := pool.Take(ctx) + require.NoError(t, err) + defer pool.Put(conn) + applicationProfile := softwarecomposition.ApplicationProfile{} key := "/spdx.softwarecomposition.kubescape.io/applicationprofiles/node-agent-test-hjjz/replicaset-multiple-containers-deployment-d4b8dd5fd" err = s.GetWithConn(ctx, conn, key, storage.GetOptions{}, &applicationProfile) @@ -109,9 +111,6 @@ func TestConsolidateData(t *testing.T) { assert.NoError(t, err) assert.Equal(t, softwarecomposition.CallID("test-call-id"), containerProfile.Spec.IdentifiedCallStacks[0].CallID) - // Clean up - pool.Put(conn) - assert.NoError(t, pool.Close()) } func Test_isZeroTime(t *testing.T) { @@ -299,3 +298,71 @@ func TestSendConsolidatedSlugToChannel(t *testing.T) { }) } } + +func TestUpdateProfileStatusExpired(t *testing.T) { + processor := ContainerProfileProcessor{} + + profile := &softwarecomposition.ContainerProfile{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{}, + }, + } + + ts := []softwarecomposition.TimeSeriesContainers{ + { + Status: helpersv1.Learning, + TsSuffix: "123", + }, + } + + res, skip, err := processor.updateProfileStatus(context.TODO(), "key", "seriesID", profile, ts, true) + assert.NoError(t, err) + assert.False(t, skip) + assert.Len(t, res, 0) // should be cleared + assert.Equal(t, helpersv1.Completed, profile.Annotations[helpersv1.StatusMetadataKey]) + assert.Equal(t, helpersv1.Partial, profile.Annotations[helpersv1.CompletionMetadataKey]) +} + +type mockContainerProfileStorage struct { + fakeStorage + deleteCalled bool + deleteKey string +} + +func (m *mockContainerProfileStorage) DeleteTimeSeriesContainerEntries(ctx context.Context, key string) error { + m.deleteCalled = true + m.deleteKey = key + return nil +} + +func TestUpdateProfileStatusExpiredFull(t *testing.T) { + mockStorage := &mockContainerProfileStorage{} + processor := ContainerProfileProcessor{ + ContainerProfileStorage: mockStorage, + } + + profile := &softwarecomposition.ContainerProfile{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{}, + }, + } + + // For isFull to be true: len(newTimeSeries) == 1, previous report timestamp is zero, status is Completed. + ts := []softwarecomposition.TimeSeriesContainers{ + { + Status: helpersv1.Completed, + Completion: helpersv1.Full, + TsSuffix: "123", + PreviousReportTimestamp: "0001-01-01 00:00:00 +0000 UTC", + }, + } + + res, skip, err := processor.updateProfileStatus(context.TODO(), "test-key", "seriesID", profile, ts, true) + assert.NoError(t, err) + assert.True(t, skip) + assert.Len(t, res, 0) // should be cleared + assert.True(t, mockStorage.deleteCalled) + assert.Equal(t, "test-key", mockStorage.deleteKey) + assert.Equal(t, helpersv1.Completed, profile.Annotations[helpersv1.StatusMetadataKey]) + assert.Equal(t, helpersv1.Full, profile.Annotations[helpersv1.CompletionMetadataKey]) +} diff --git a/pkg/registry/file/containerprofile_storage.go b/pkg/registry/file/containerprofile_storage.go index 821e12b3b..958c5fd63 100644 --- a/pkg/registry/file/containerprofile_storage.go +++ b/pkg/registry/file/containerprofile_storage.go @@ -28,6 +28,7 @@ import ( // a ug- CRD (kubescape/storage#315 review). const ( ContainerProfileKind = "containerprofile" + ContainerProfileKindPlural = "containerprofiles" ContainerProfileMergedKind = "containerprofile-merged" ) @@ -370,3 +371,14 @@ func (c *ContainerProfileStorageImpl) WriteTimeSeriesEntry(ctx context.Context, conn := ctx.Value(connKey).(*sqlite.Conn) return WriteTimeSeriesEntry(conn, kind, namespace, name, seriesID, tsSuffix, reportTimestamp, status, completion, previousReportTimestamp, hasData) } + +func IsContainerProfileKind(kind string) bool { + return kind == ContainerProfileKind || kind == ContainerProfileKindPlural +} + +func NormalizeContainerProfileKind(kind string) string { + if kind == ContainerProfileKindPlural { + return ContainerProfileKind + } + return kind +} diff --git a/pkg/registry/file/containerprofile_user_managed_test.go b/pkg/registry/file/containerprofile_user_managed_test.go index ff0467edb..7298dcbe5 100644 --- a/pkg/registry/file/containerprofile_user_managed_test.go +++ b/pkg/registry/file/containerprofile_user_managed_test.go @@ -315,6 +315,12 @@ func (h *e2eHarness) createCP(fixture string) { &profile, nil, 0)) } +func (h *e2eHarness) writeTSEntryDirect(kind, namespace, name, seriesID, tsSuffix, reportTimestamp, status, completion, previousReportTimestamp string, hasData bool) { + h.t.Helper() + err := WriteTimeSeriesEntry(h.conn, kind, namespace, name, seriesID, tsSuffix, reportTimestamp, status, completion, previousReportTimestamp, hasData) + require.NoError(h.t, err) +} + // createFreshReport ingests a TS ContainerProfile for the e2e workload with a // *current* report timestamp, so a large DeleteThreshold keeps the workload in // Learning (the expired path never marks it Completed). Each call uses a fresh @@ -592,11 +598,11 @@ func TestConsolidateUserManagedIdempotent(t *testing.T) { // the assertions below when a tick happened to straddle a second. time.Sleep(1100 * time.Millisecond) - // Second tick: no new time-series data and the ug- AP unchanged. The - // consolidator still revisits the workload (expired), re-running the merge - // with identical inputs. The rebuilt merged must be recognised as unchanged - // and NOT rewritten: no watch event fires, the persisted object is byte-for- - // byte identical, and its ResourceVersion is stable. + // Second tick: no new time-series data and the ug- AP unchanged. + // Since the expired time series was cleared on the first tick, we inject a report + // to trigger consolidation and verify that rebuilding with identical inputs is recognized + // as unchanged and NOT rewritten (no watch event, stable ResourceVersion). + h.writeTSEntryDirect("containerprofile", "kube-system", "replicaset-coredns-5d78c9869d-coredns-185f-129c", "4580f9fc-7563-41d8-bb60-e2eeca72f495", "c68b821c86194262b389d919d1355ee6", "2025-06-24 10:29:46.810421941 +0000 UTC m=+66.976503851", "ready", "partial", "0001-01-01 00:00:00 +0000 UTC", true) h.consolidate() second := h.requireMerged() assert.Equal(t, 0, drainMergedWrites(), @@ -615,6 +621,7 @@ func TestConsolidateUserManagedIdempotent(t *testing.T) { {Name: "coredns", Capabilities: []string{"USER_MANAGED_CAP_V2"}}, }, }) + h.writeTSEntryDirect("containerprofile", "kube-system", "replicaset-coredns-5d78c9869d-coredns-185f-129c", "4580f9fc-7563-41d8-bb60-e2eeca72f495", "c68b821c86194262b389d919d1355ee6", "2025-06-24 10:29:46.810421941 +0000 UTC m=+66.976503851", "ready", "partial", "0001-01-01 00:00:00 +0000 UTC", true) h.consolidate() third := h.requireMerged() assert.GreaterOrEqual(t, drainMergedWrites(), 1, diff --git a/pkg/registry/file/sqlite.go b/pkg/registry/file/sqlite.go index ad1ffe887..e2e67b266 100644 --- a/pkg/registry/file/sqlite.go +++ b/pkg/registry/file/sqlite.go @@ -286,6 +286,7 @@ func listNamespaces(conn *sqlite.Conn) ([]string, error) { // DeleteTimeSeriesContainerEntries deletes all time series entries for a completed container. func DeleteTimeSeriesContainerEntries(conn *sqlite.Conn, path string) error { _, _, kind, _, namespace, name := K8sPathToKeys(path) + kind = NormalizeContainerProfileKind(kind) err := sqlitex.Execute(conn, `DELETE FROM time_series WHERE kind = ? diff --git a/pkg/registry/file/storage.go b/pkg/registry/file/storage.go index 506b07584..9b2c0ab89 100644 --- a/pkg/registry/file/storage.go +++ b/pkg/registry/file/storage.go @@ -373,6 +373,14 @@ func (s *StorageImpl) delete(ctx context.Context, conn *sqlite.Conn, key string, if err := s.appFs.Remove(makePayloadPath(p)); err != nil { logger.L().Ctx(ctx).Error("Delete - remove json file failed", helpers.Error(err), helpers.String("key", key)) } + // delete time series entries if this is a containerprofile + _, _, kind, _, _, _ := K8sPathToKeys(key) + if IsContainerProfileKind(kind) { + if err := DeleteTimeSeriesContainerEntries(conn, key); err != nil { + logger.L().Ctx(ctx).Error("Delete - delete time series entries failed", helpers.Error(err), helpers.String("key", key)) + return fmt.Errorf("delete time series entries: %w", err) + } + } // publish event to watchers s.watchDispatcher.Deleted(key, metaOut) return nil diff --git a/pkg/registry/file/utils.go b/pkg/registry/file/utils.go index 22fdef520..993676ed7 100644 --- a/pkg/registry/file/utils.go +++ b/pkg/registry/file/utils.go @@ -7,8 +7,6 @@ import ( "path/filepath" "strings" - "github.com/kubescape/go-logger" - "github.com/kubescape/go-logger/helpers" helpers2 "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers" "github.com/olvrng/ujson" "github.com/spf13/afero" @@ -49,14 +47,20 @@ func NewKubernetesClient() (*kubernetes.Clientset, error) { return kubernetes.NewForConfig(clusterConfig) } -func (h *ResourcesCleanupHandler) deleteMetadata(conn *sqlite.Conn, path string) runtime.Object { +func (h *ResourcesCleanupHandler) deleteMetadata(conn *sqlite.Conn, path string) (runtime.Object, error) { key := payloadPathToKey(path) metaOut := &PartialObjectMetadata{} err := DeleteMetadata(conn, key, metaOut) if err != nil { - logger.L().Error("failed to delete metadata", helpers.Error(err)) + return nil, fmt.Errorf("failed to delete metadata: %w", err) } - return metaOut + _, _, kind, _, _, _ := K8sPathToKeys(key) + if IsContainerProfileKind(kind) { + if err := DeleteTimeSeriesContainerEntries(conn, key); err != nil { + return nil, fmt.Errorf("failed to delete time series entries: %w", err) + } + } + return metaOut, nil } func getConfig() (*rest.Config, error) {