Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion chasm/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const (
//
// LifecycleStateCreated LifecycleState = 1 << iota
LifecycleStateRunning LifecycleState = 2 << iota
// LifecycleStatePaused
LifecycleStatePaused
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to simply uncomment this? This would change the values of the lifecycles below.


// Lifecycle states that are considered CLOSED
//
Expand All @@ -81,10 +81,16 @@ func (s LifecycleState) IsClosed() bool {
return s >= LifecycleStateCompleted
}

func (s LifecycleState) IsPaused() bool {
return s == LifecycleStatePaused
}

func (s LifecycleState) String() string {
switch s {
case LifecycleStateRunning:
return "Running"
case LifecycleStatePaused:
return "Paused"
case LifecycleStateCompleted:
return "Completed"
case LifecycleStateFailed:
Expand Down
22 changes: 22 additions & 0 deletions chasm/test_component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,23 @@ func (tc *TestComponent) LifecycleState(_ Context) LifecycleState {
switch tc.ComponentData.GetStatus() {
case enumspb.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING:
return LifecycleStateRunning
case enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT:
return LifecycleStatePaused
case enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW:
return LifecycleStateCompleted
default:
return LifecycleStateFailed
}
}

func (tc *TestComponent) Pause(_ MutableContext) {
tc.ComponentData.Status = enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT
}

func (tc *TestComponent) Unpause(_ MutableContext) {
tc.ComponentData.Status = enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING
}

func (tc *TestComponent) Terminate(
mutableContext MutableContext,
_ TerminateComponentRequest,
Expand Down Expand Up @@ -134,13 +144,23 @@ func (tsc1 *TestSubComponent1) LifecycleState(_ Context) LifecycleState {
switch tsc1.SubComponent1Data.GetStatus() {
case enumspb.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING:
return LifecycleStateRunning
case enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT:
return LifecycleStatePaused
case enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW:
return LifecycleStateCompleted
default:
return LifecycleStateFailed
}
}

func (tsc1 *TestSubComponent1) Pause(_ MutableContext) {
tsc1.SubComponent1Data.Status = enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT
}

func (tsc1 *TestSubComponent1) Unpause(_ MutableContext) {
tsc1.SubComponent1Data.Status = enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING
}

func (tsc1 *TestSubComponent1) GetData() string {
return tsc1.SubComponent1Data.GetCreateRequestId()
}
Expand All @@ -149,6 +169,8 @@ func (tsc11 *TestSubComponent11) LifecycleState(_ Context) LifecycleState {
switch tsc11.SubComponent11Data.GetStatus() {
case enumspb.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING:
return LifecycleStateRunning
case enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT:
return LifecycleStatePaused
case enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW:
return LifecycleStateCompleted
default:
Expand Down
67 changes: 66 additions & 1 deletion chasm/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -1553,7 +1553,8 @@ func (n *Node) closeTransactionHandleRootLifecycleChange(
var newState enumsspb.WorkflowExecutionState
var newStatus enumspb.WorkflowExecutionStatus
switch lifecycleState {
case LifecycleStateRunning:
case LifecycleStateRunning, LifecycleStatePaused:
// Paused is an OPEN state; the execution remains RUNNING from the persistence perspective.
newState = enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING
newStatus = enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING
case LifecycleStateCompleted:
Expand Down Expand Up @@ -1856,6 +1857,16 @@ func (n *Node) validateTask(
return false, err
}

// Paused components (and their non-detached sub-components) have all tasks invalidated.
// Component authors must re-emit tasks when transitioning back to running.
paused, err := n.isInPausedSubtree(validateContext)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm can we refactor the code a bit and check ancestor only once for both paused and access rule check? Both are based on ancestors' lifecycle state.

Not for this PR:

I think as a follow optimization for the close transaction code path, since we are already doing a prefix traversal of the tree, we know if a node is in the scope of a paused component or not and doesn't need to go back the ancestor nodes. I think this is doable by changing the andAllChildren iterator implementation to return more information.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually there's a problem for side effect tasks created before component is paused. When those tasks are executed, component may not be in paused state any more and can get executed.

so we need to check if the corresponding logical task still exists for the component or not.

if err != nil {
return false, err
}
if paused {
return false, nil
}

defer log.CapturePanic(n.logger, &retErr)

return registableTask.validateFn(
Expand All @@ -1867,6 +1878,60 @@ func (n *Node) validateTask(
)
}

// isInPausedSubtree returns true if this component node or any non-detached ancestor
// component is in the paused lifecycle state. Detached nodes do not inherit pause
// from their ancestors, matching the access-rule boundary semantics.
func (n *Node) isInPausedSubtree(ctx Context) (bool, error) {
if n.isDetached() {
// Detached nodes are isolated from ancestor pause state.
return false, nil
}

// Check non-detached ancestors first.
if n.parent != nil {
if paused, err := n.parent.isInPausedSubtreeHelper(ctx); err != nil || paused {
return paused, err
}
}

// Check this node itself.
if !n.isComponent() {
return false, nil
}
if err := n.prepareComponentValue(ctx); err != nil {
return false, err
}
comp, ok := n.value.(Component) //nolint:revive // unchecked-type-assertion
if !ok {
return false, nil
}
return comp.LifecycleState(ctx).IsPaused(), nil
Comment on lines +1897 to +1908
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it cheaper to check if the components itself is paused first?

}

// isInPausedSubtreeHelper is the recursive ancestor-traversal helper for isInPausedSubtree.
// It checks whether this node (if a component) or any of its non-detached ancestors is paused,
// stopping the traversal at detached-component boundaries.
func (n *Node) isInPausedSubtreeHelper(ctx Context) (bool, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between this and the function above? From the comments, it seems they are the same.

// Traverse ancestors first, stopping at detached boundaries.
if !n.isDetached() && n.parent != nil {
if paused, err := n.parent.isInPausedSubtreeHelper(ctx); err != nil || paused {
return paused, err
}
}

if !n.isComponent() {
return false, nil
}
if err := n.prepareComponentValue(ctx); err != nil {
return false, err
}
comp, ok := n.value.(Component) //nolint:revive // unchecked-type-assertion
if !ok {
return false, nil
}
return comp.LifecycleState(ctx).IsPaused(), nil
Comment on lines +1922 to +1932
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks the same as above. Refactor it.

}

func (n *Node) closeTransactionCleanupInvalidTasks(
validateContext Context,
) (bool, error) {
Expand Down
200 changes: 200 additions & 0 deletions chasm/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2337,6 +2337,206 @@ func (s *nodeSuite) TestCloseTransaction_InvalidateComponentTasks() {
s.Empty(componentAttr.SideEffectTasks)
}

// TestCloseTransaction_PausedStateInvalidatesTasks verifies that all logical tasks are
// invalidated when a component (or one of its non-detached ancestors) is paused, without
// invoking the task-specific validator.
func (s *nodeSuite) TestCloseTransaction_PausedStateInvalidatesTasks() {
payload := &commonpb.Payload{
Data: []byte("some-random-data"),
}
taskBlob, err := serialization.ProtoEncode(payload)
s.NoError(err)

makeTask := func(typeID uint32, offset int64) *persistencespb.ChasmComponentAttributes_Task {
return &persistencespb.ChasmComponentAttributes_Task{
TypeId: typeID,
VersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
VersionedTransitionOffset: offset,
Data: taskBlob,
PhysicalTaskStatus: physicalTaskStatusCreated,
}
}

s.Run("paused component invalidates its own tasks without calling task validator", func() {
persistenceNodes := map[string]*persistencespb.ChasmNode{
"": {
Metadata: &persistencespb.ChasmNodeMetadata{
InitialVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
LastUpdateVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
ComponentAttributes: &persistencespb.ChasmComponentAttributes{
TypeId: testComponentTypeID,
SideEffectTasks: []*persistencespb.ChasmComponentAttributes_Task{makeTask(testSideEffectTaskTypeID, 1)},
PureTasks: []*persistencespb.ChasmComponentAttributes_Task{makeTask(testPureTaskTypeID, 2)},
},
},
},
},
}
root, err := s.newTestTree(persistenceNodes)
s.NoError(err)

nextTransitionCount := int64(2)
s.nodeBackend.HandleNextTransitionCount = func() int64 { return nextTransitionCount }

// Pause the root component.
mutableContext := NewMutableContext(context.Background(), root)
tc, err := root.Component(mutableContext, ComponentRef{})
s.NoError(err)
tc.(*TestComponent).Pause(mutableContext)

// Task-specific validators must NOT be called — paused state short-circuits them.
// (no EXPECT calls on mock handlers)

mutation, err := root.CloseTransaction()
s.NoError(err)

componentAttr := root.serializedNode.Metadata.GetComponentAttributes()
s.Empty(componentAttr.SideEffectTasks, "paused component should have no side-effect tasks")
s.Empty(componentAttr.PureTasks, "paused component should have no pure tasks")

// Node must be marked updated so the invalidation is persisted.
s.Len(mutation.UpdatedNodes, 1)
})

s.Run("paused parent invalidates non-detached sub-component tasks", func() {
persistenceNodes := map[string]*persistencespb.ChasmNode{
"": {
Metadata: &persistencespb.ChasmNodeMetadata{
InitialVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
LastUpdateVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
ComponentAttributes: &persistencespb.ChasmComponentAttributes{
TypeId: testComponentTypeID,
},
},
},
},
"SubComponent1": {
Metadata: &persistencespb.ChasmNodeMetadata{
InitialVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
LastUpdateVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
ComponentAttributes: &persistencespb.ChasmComponentAttributes{
TypeId: testSubComponent1TypeID,
SideEffectTasks: []*persistencespb.ChasmComponentAttributes_Task{makeTask(testSideEffectTaskTypeID, 1)},
PureTasks: []*persistencespb.ChasmComponentAttributes_Task{makeTask(testPureTaskTypeID, 2)},
},
},
},
},
}
root, err := s.newTestTree(persistenceNodes)
s.NoError(err)

nextTransitionCount := int64(2)
s.nodeBackend.HandleNextTransitionCount = func() int64 { return nextTransitionCount }

// Pause the root — its non-detached sub-component's tasks should also be invalidated.
mutableContext := NewMutableContext(context.Background(), root)
tc, err := root.Component(mutableContext, ComponentRef{})
s.NoError(err)
tc.(*TestComponent).Pause(mutableContext)

mutation, err := root.CloseTransaction()
s.NoError(err)

subAttr := root.children["SubComponent1"].serializedNode.Metadata.GetComponentAttributes()
s.Empty(subAttr.SideEffectTasks, "non-detached sub-component tasks should be invalidated when parent is paused")
s.Empty(subAttr.PureTasks)
s.Len(mutation.UpdatedNodes, 2) // root (paused) + SubComponent1 (task cleanup)
})

s.Run("detached sub-component tasks are NOT invalidated by parent pause", func() {
persistenceNodes := map[string]*persistencespb.ChasmNode{
"": {
Metadata: &persistencespb.ChasmNodeMetadata{
InitialVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
LastUpdateVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
ComponentAttributes: &persistencespb.ChasmComponentAttributes{
TypeId: testComponentTypeID,
},
},
},
},
"SubComponent1": {
Metadata: &persistencespb.ChasmNodeMetadata{
InitialVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
LastUpdateVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
ComponentAttributes: &persistencespb.ChasmComponentAttributes{
TypeId: testSubComponent1TypeID,
Detached: true,
SideEffectTasks: []*persistencespb.ChasmComponentAttributes_Task{makeTask(testSideEffectTaskTypeID, 1)},
},
},
},
},
}
root, err := s.newTestTree(persistenceNodes)
s.NoError(err)

nextTransitionCount := int64(2)
s.nodeBackend.HandleNextTransitionCount = func() int64 { return nextTransitionCount }

// Pause the root.
mutableContext := NewMutableContext(context.Background(), root)
tc, err := root.Component(mutableContext, ComponentRef{})
s.NoError(err)
tc.(*TestComponent).Pause(mutableContext)

// The detached sub-component's validator IS called (it decides independently).
s.testLibrary.mockSideEffectTaskHandler.EXPECT().
Validate(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).Times(1)

mutation, err := root.CloseTransaction()
s.NoError(err)

subAttr := root.children["SubComponent1"].serializedNode.Metadata.GetComponentAttributes()
s.Len(subAttr.SideEffectTasks, 1, "detached sub-component tasks should survive parent pause")
_ = mutation
})

s.Run("write access accepted on paused component", func() {
// Requirement: for now accept chasm engine requests on paused component.
root, err := s.newTestTree(testComponentSerializedNodes())
s.NoError(err)

ctx := NewContext(
newContextWithOperationIntent(context.Background(), OperationIntentProgress),
root,
)

// Pause the root.
err = root.prepareComponentValue(ctx)
s.NoError(err)
root.value.(*TestComponent).Pause(NewMutableContext(context.Background(), root))

// validateAccess should still succeed — paused does NOT block writes.
subNode, ok := root.findNode([]string{"SubComponent1"})
s.True(ok)
err = subNode.validateAccess(ctx)
s.NoError(err, "write access to sub-component of paused parent should be accepted")
})
}

func (s *nodeSuite) TestCloseTransaction_LifecycleChange_PausedRootKeepsRunning() {
// When the root component is paused, the execution state should remain RUNNING
// because paused is an OPEN lifecycle state.
node := s.testComponentTree()

chasmCtx := NewMutableContext(context.Background(), node)
rootComp, err := node.Component(chasmCtx, ComponentRef{componentPath: rootPath})
s.NoError(err)
rootComp.(*TestComponent).Pause(chasmCtx)

_, err = node.CloseTransaction()
s.NoError(err)
s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, s.nodeBackend.LastUpdateWorkflowState())
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, s.nodeBackend.LastUpdateWorkflowStatus())
}

func (s *nodeSuite) TestCloseTransaction_NewComponentTasks() {
persistenceNodes := map[string]*persistencespb.ChasmNode{
"": {
Expand Down
Loading