Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/registry/file/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,10 @@ func (h *ResourcesCleanupHandler) cleanupNamespace(ctx context.Context, ns strin
logger.L().Debug("deleting", helpers.String("kind", resourceKind), helpers.String("namespace", metadata.Namespace), helpers.String("name", metadata.Name))
h.deleteFunc(h.appFs, path)

metaOut := h.deleteMetadata(conn, path)
metaOut, err := h.deleteMetadata(conn, path)
if err != nil {
return fmt.Errorf("failed to delete metadata: %w", err)
}
Comment thread
matthyx marked this conversation as resolved.
if h.watchDispatcher != nil {
key := path[len(h.root) : len(path)-len(GobExt)]
h.watchDispatcher.Deleted(key, metaOut)
Expand Down
50 changes: 31 additions & 19 deletions pkg/registry/file/containerprofile_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,35 @@ func (a *ContainerProfileProcessor) consolidateContinuousTimeSeries(
func (a *ContainerProfileProcessor) updateProfileStatus(ctx context.Context, key, seriesID string,
profile *softwarecomposition.ContainerProfile, newTimeSeries []softwarecomposition.TimeSeriesContainers, expired bool) ([]softwarecomposition.TimeSeriesContainers, bool, error) {

// If the time series is expired, we finalize it as Completed/Partial (unless it is already Completed/Full)
// and clear the time series data so we don't leak zombie records.
if expired {
// Try to mark it as Completed/Full if we actually have a Completed status and the series is continuous
var isFull bool
if len(newTimeSeries) == 1 && isZeroTime(newTimeSeries[0].PreviousReportTimestamp) && newTimeSeries[0].Status == helpers.Completed {
isFull = profile.SetCompletedStatus(newTimeSeries[0])
}

if isFull {
logger.L().Debug("ContainerProfileProcessor.updateProfileStatus - expired profile is completed/full, skipping further processing",
loggerhelpers.String("key", key), loggerhelpers.String("seriesID", seriesID))

// Remove all time series data
if err := a.ContainerProfileStorage.DeleteTimeSeriesContainerEntries(ctx, key); err != nil {
return newTimeSeries, false, fmt.Errorf("failed to delete time series data: %w", err)
}
return newTimeSeries[:0], true, nil
}

// Otherwise, mark it as Completed/Partial (unless already Completed/Full)
if profile.Annotations[helpers.StatusMetadataKey] != helpers.Completed || profile.Annotations[helpers.CompletionMetadataKey] != helpers.Full {
profile.Annotations[helpers.StatusMetadataKey] = helpers.Completed
profile.Annotations[helpers.CompletionMetadataKey] = helpers.Partial
}
return newTimeSeries[:0], false, nil
}

// Normal active (non-expired) flow below:
// An aggregated series is removed only if it has one element, no previous report timestamp, and is completed or failed
if len(newTimeSeries) != 1 || !isZeroTime(newTimeSeries[0].PreviousReportTimestamp) {
profile.SetLearningStatus(newTimeSeries[0]) // series is missing some TS entries
Expand All @@ -685,15 +714,7 @@ func (a *ContainerProfileProcessor) updateProfileStatus(ctx context.Context, key
if err := a.ContainerProfileStorage.DeleteTimeSeriesContainerEntries(ctx, key); err != nil {
return newTimeSeries, false, fmt.Errorf("failed to delete time series data: %w", err)
}
return newTimeSeries, true, nil // skip further processing
}
// If this is an expired time series, mark it as partial completed instead of clearing it
if expired {
// Safeguard: never change a completed full profile
if profile.Annotations[helpers.StatusMetadataKey] != helpers.Completed || profile.Annotations[helpers.CompletionMetadataKey] != helpers.Full {
profile.Annotations[helpers.StatusMetadataKey] = helpers.Completed
profile.Annotations[helpers.CompletionMetadataKey] = helpers.Partial
}
return newTimeSeries[:0], true, nil
}
// Clear this time series as it is finished
newTimeSeries = newTimeSeries[:0]
Expand All @@ -704,16 +725,7 @@ func (a *ContainerProfileProcessor) updateProfileStatus(ctx context.Context, key
newTimeSeries = newTimeSeries[:0]

default:
// If this is an expired time series, mark it as partial completed
if expired {
// Safeguard: never change a completed full profile
if profile.Annotations[helpers.StatusMetadataKey] != helpers.Completed || profile.Annotations[helpers.CompletionMetadataKey] != helpers.Full {
profile.Annotations[helpers.StatusMetadataKey] = helpers.Completed
profile.Annotations[helpers.CompletionMetadataKey] = helpers.Partial
}
} else {
profile.SetLearningStatus(newTimeSeries[0]) // series is complete but not finished
}
profile.SetLearningStatus(newTimeSeries[0]) // series is complete but not finished
}

return newTimeSeries, false, nil
Expand Down
79 changes: 73 additions & 6 deletions pkg/registry/file/containerprofile_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ func TestConsolidateData(t *testing.T) {
defer func(pool *sqlitemigration.Pool) {
_ = pool.Close()
}(pool)
conn, err := pool.Take(context.TODO())
require.NoError(t, err)

var err error
sch := scheme.Scheme
require.NoError(t, softwarecomposition.AddToScheme(sch))
processor := ContainerProfileProcessor{
Expand Down Expand Up @@ -82,6 +80,10 @@ func TestConsolidateData(t *testing.T) {
err = processor.ConsolidateTimeSeries(ctx)
assert.NoError(t, err)

conn, err := pool.Take(ctx)
require.NoError(t, err)
defer pool.Put(conn)

applicationProfile := softwarecomposition.ApplicationProfile{}
key := "/spdx.softwarecomposition.kubescape.io/applicationprofiles/node-agent-test-hjjz/replicaset-multiple-containers-deployment-d4b8dd5fd"
err = s.GetWithConn(ctx, conn, key, storage.GetOptions{}, &applicationProfile)
Expand Down Expand Up @@ -109,9 +111,6 @@ func TestConsolidateData(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, softwarecomposition.CallID("test-call-id"), containerProfile.Spec.IdentifiedCallStacks[0].CallID)

// Clean up
pool.Put(conn)
assert.NoError(t, pool.Close())
}

func Test_isZeroTime(t *testing.T) {
Expand Down Expand Up @@ -299,3 +298,71 @@ func TestSendConsolidatedSlugToChannel(t *testing.T) {
})
}
}

func TestUpdateProfileStatusExpired(t *testing.T) {
processor := ContainerProfileProcessor{}

profile := &softwarecomposition.ContainerProfile{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{},
},
}

ts := []softwarecomposition.TimeSeriesContainers{
{
Status: helpersv1.Learning,
TsSuffix: "123",
},
}

res, skip, err := processor.updateProfileStatus(context.TODO(), "key", "seriesID", profile, ts, true)
assert.NoError(t, err)
assert.False(t, skip)
assert.Len(t, res, 0) // should be cleared
assert.Equal(t, helpersv1.Completed, profile.Annotations[helpersv1.StatusMetadataKey])
assert.Equal(t, helpersv1.Partial, profile.Annotations[helpersv1.CompletionMetadataKey])
}

type mockContainerProfileStorage struct {
fakeStorage
deleteCalled bool
deleteKey string
}

func (m *mockContainerProfileStorage) DeleteTimeSeriesContainerEntries(ctx context.Context, key string) error {
m.deleteCalled = true
m.deleteKey = key
return nil
}

func TestUpdateProfileStatusExpiredFull(t *testing.T) {
mockStorage := &mockContainerProfileStorage{}
processor := ContainerProfileProcessor{
ContainerProfileStorage: mockStorage,
}

profile := &softwarecomposition.ContainerProfile{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{},
},
}

// For isFull to be true: len(newTimeSeries) == 1, previous report timestamp is zero, status is Completed.
ts := []softwarecomposition.TimeSeriesContainers{
{
Status: helpersv1.Completed,
Completion: helpersv1.Full,
TsSuffix: "123",
PreviousReportTimestamp: "0001-01-01 00:00:00 +0000 UTC",
},
}

res, skip, err := processor.updateProfileStatus(context.TODO(), "test-key", "seriesID", profile, ts, true)
assert.NoError(t, err)
assert.True(t, skip)
assert.Len(t, res, 0) // should be cleared
assert.True(t, mockStorage.deleteCalled)
assert.Equal(t, "test-key", mockStorage.deleteKey)
assert.Equal(t, helpersv1.Completed, profile.Annotations[helpersv1.StatusMetadataKey])
assert.Equal(t, helpersv1.Full, profile.Annotations[helpersv1.CompletionMetadataKey])
}
12 changes: 12 additions & 0 deletions pkg/registry/file/containerprofile_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
// a ug- CRD (kubescape/storage#315 review).
const (
ContainerProfileKind = "containerprofile"
ContainerProfileKindPlural = "containerprofiles"
ContainerProfileMergedKind = "containerprofile-merged"
)

Expand Down Expand Up @@ -370,3 +371,14 @@ func (c *ContainerProfileStorageImpl) WriteTimeSeriesEntry(ctx context.Context,
conn := ctx.Value(connKey).(*sqlite.Conn)
return WriteTimeSeriesEntry(conn, kind, namespace, name, seriesID, tsSuffix, reportTimestamp, status, completion, previousReportTimestamp, hasData)
}

func IsContainerProfileKind(kind string) bool {
return kind == ContainerProfileKind || kind == ContainerProfileKindPlural
}

func NormalizeContainerProfileKind(kind string) string {
if kind == ContainerProfileKindPlural {
return ContainerProfileKind
}
return kind
}
17 changes: 12 additions & 5 deletions pkg/registry/file/containerprofile_user_managed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,12 @@ func (h *e2eHarness) createCP(fixture string) {
&profile, nil, 0))
}

func (h *e2eHarness) writeTSEntryDirect(kind, namespace, name, seriesID, tsSuffix, reportTimestamp, status, completion, previousReportTimestamp string, hasData bool) {
h.t.Helper()
err := WriteTimeSeriesEntry(h.conn, kind, namespace, name, seriesID, tsSuffix, reportTimestamp, status, completion, previousReportTimestamp, hasData)
require.NoError(h.t, err)
}

// createFreshReport ingests a TS ContainerProfile for the e2e workload with a
// *current* report timestamp, so a large DeleteThreshold keeps the workload in
// Learning (the expired path never marks it Completed). Each call uses a fresh
Expand Down Expand Up @@ -592,11 +598,11 @@ func TestConsolidateUserManagedIdempotent(t *testing.T) {
// the assertions below when a tick happened to straddle a second.
time.Sleep(1100 * time.Millisecond)

// Second tick: no new time-series data and the ug- AP unchanged. The
// consolidator still revisits the workload (expired), re-running the merge
// with identical inputs. The rebuilt merged must be recognised as unchanged
// and NOT rewritten: no watch event fires, the persisted object is byte-for-
// byte identical, and its ResourceVersion is stable.
// Second tick: no new time-series data and the ug- AP unchanged.
// Since the expired time series was cleared on the first tick, we inject a report
// to trigger consolidation and verify that rebuilding with identical inputs is recognized
// as unchanged and NOT rewritten (no watch event, stable ResourceVersion).
h.writeTSEntryDirect("containerprofile", "kube-system", "replicaset-coredns-5d78c9869d-coredns-185f-129c", "4580f9fc-7563-41d8-bb60-e2eeca72f495", "c68b821c86194262b389d919d1355ee6", "2025-06-24 10:29:46.810421941 +0000 UTC m=+66.976503851", "ready", "partial", "0001-01-01 00:00:00 +0000 UTC", true)
h.consolidate()
second := h.requireMerged()
assert.Equal(t, 0, drainMergedWrites(),
Expand All @@ -615,6 +621,7 @@ func TestConsolidateUserManagedIdempotent(t *testing.T) {
{Name: "coredns", Capabilities: []string{"USER_MANAGED_CAP_V2"}},
},
})
h.writeTSEntryDirect("containerprofile", "kube-system", "replicaset-coredns-5d78c9869d-coredns-185f-129c", "4580f9fc-7563-41d8-bb60-e2eeca72f495", "c68b821c86194262b389d919d1355ee6", "2025-06-24 10:29:46.810421941 +0000 UTC m=+66.976503851", "ready", "partial", "0001-01-01 00:00:00 +0000 UTC", true)
h.consolidate()
third := h.requireMerged()
assert.GreaterOrEqual(t, drainMergedWrites(), 1,
Expand Down
1 change: 1 addition & 0 deletions pkg/registry/file/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func listNamespaces(conn *sqlite.Conn) ([]string, error) {
// DeleteTimeSeriesContainerEntries deletes all time series entries for a completed container.
func DeleteTimeSeriesContainerEntries(conn *sqlite.Conn, path string) error {
_, _, kind, _, namespace, name := K8sPathToKeys(path)
kind = NormalizeContainerProfileKind(kind)
err := sqlitex.Execute(conn,
`DELETE FROM time_series
WHERE kind = ?
Expand Down
8 changes: 8 additions & 0 deletions pkg/registry/file/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,14 @@ func (s *StorageImpl) delete(ctx context.Context, conn *sqlite.Conn, key string,
if err := s.appFs.Remove(makePayloadPath(p)); err != nil {
logger.L().Ctx(ctx).Error("Delete - remove json file failed", helpers.Error(err), helpers.String("key", key))
}
// delete time series entries if this is a containerprofile
_, _, kind, _, _, _ := K8sPathToKeys(key)
if IsContainerProfileKind(kind) {
if err := DeleteTimeSeriesContainerEntries(conn, key); err != nil {
logger.L().Ctx(ctx).Error("Delete - delete time series entries failed", helpers.Error(err), helpers.String("key", key))
return fmt.Errorf("delete time series entries: %w", err)
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
matthyx marked this conversation as resolved.
// publish event to watchers
s.watchDispatcher.Deleted(key, metaOut)
return nil
Expand Down
14 changes: 9 additions & 5 deletions pkg/registry/file/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"path/filepath"
"strings"

"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
helpers2 "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers"
"github.com/olvrng/ujson"
"github.com/spf13/afero"
Expand Down Expand Up @@ -49,14 +47,20 @@ func NewKubernetesClient() (*kubernetes.Clientset, error) {
return kubernetes.NewForConfig(clusterConfig)
}

func (h *ResourcesCleanupHandler) deleteMetadata(conn *sqlite.Conn, path string) runtime.Object {
func (h *ResourcesCleanupHandler) deleteMetadata(conn *sqlite.Conn, path string) (runtime.Object, error) {
key := payloadPathToKey(path)
metaOut := &PartialObjectMetadata{}
err := DeleteMetadata(conn, key, metaOut)
if err != nil {
logger.L().Error("failed to delete metadata", helpers.Error(err))
return nil, fmt.Errorf("failed to delete metadata: %w", err)
}
return metaOut
_, _, kind, _, _, _ := K8sPathToKeys(key)
if IsContainerProfileKind(kind) {
if err := DeleteTimeSeriesContainerEntries(conn, key); err != nil {
return nil, fmt.Errorf("failed to delete time series entries: %w", err)
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return metaOut, nil
}

func getConfig() (*rest.Config, error) {
Expand Down
Loading