Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -3773,6 +3773,7 @@ type TemporalWorkflowShowCommand struct {
WorkflowReferenceOptions
Follow bool
Detailed bool
Reverse bool
}

func NewTemporalWorkflowShowCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowShowCommand {
Expand All @@ -3789,6 +3790,7 @@ func NewTemporalWorkflowShowCommand(cctx *CommandContext, parent *TemporalWorkfl
s.Command.Args = cobra.NoArgs
s.Command.Flags().BoolVarP(&s.Follow, "follow", "f", false, "Follow the Workflow Execution progress in real time. Does not apply to JSON output.")
s.Command.Flags().BoolVar(&s.Detailed, "detailed", false, "Display events as detailed sections instead of table. Does not apply to JSON output.")
s.Command.Flags().BoolVar(&s.Reverse, "reverse", false, "Fetch Event History newest-event-first. Cannot be combined with --follow.")
s.WorkflowReferenceOptions.BuildFlags(s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
Expand Down
56 changes: 51 additions & 5 deletions internal/temporalcli/commands.workflow_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,16 +707,24 @@ func coloredEventType(e enums.EventType) string {
type structuredHistoryIter struct {
ctx context.Context
client client.Client
namespace string
workflowID string
runID string
includeDetails bool
// If set true, long poll the history for updates
follow bool
// If set true, fetch history newest-event-first via GetWorkflowExecutionHistoryReverse
reverse bool
// If and when the iterator encounters a workflow-terminating event, it will store it here
wfResult *history.HistoryEvent

// Internal
// Internal (forward)
iter client.HistoryEventIterator

// Internal (reverse)
reverseBuf []*history.HistoryEvent
reverseNextToken []byte
reverseStarted bool
}

func (s *structuredHistoryIter) print(cctx *CommandContext) error {
Expand Down Expand Up @@ -784,15 +792,20 @@ func (s *structuredHistoryIter) Next() (any, error) {
Type: coloredEventType(event.EventType),
}

// Follow continue as new
if attr := event.GetWorkflowExecutionContinuedAsNewEventAttributes(); attr != nil {
s.runID = attr.NewExecutionRunId
s.iter = nil
// Follow continue as new (forward only; reverse traversal stays within the requested run)
if !s.reverse {
if attr := event.GetWorkflowExecutionContinuedAsNewEventAttributes(); attr != nil {
s.runID = attr.NewExecutionRunId
s.iter = nil
}
}
return data, nil
}

func (s *structuredHistoryIter) NextRawEvent() (*history.HistoryEvent, error) {
if s.reverse {
return s.nextRawEventReverse()
}
// Load iter
if s.iter == nil {
s.iter = s.client.GetWorkflowHistory(
Expand All @@ -811,6 +824,39 @@ func (s *structuredHistoryIter) NextRawEvent() (*history.HistoryEvent, error) {
return event, nil
}

func (s *structuredHistoryIter) nextRawEventReverse() (*history.HistoryEvent, error) {
for len(s.reverseBuf) == 0 {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question: Under what conditions will this loop? Is this a polling thing only?

if s.reverseStarted && len(s.reverseNextToken) == 0 {
return nil, nil
}
s.reverseStarted = true
resp, err := s.client.WorkflowService().GetWorkflowExecutionHistoryReverse(
s.ctx,
&workflowservice.GetWorkflowExecutionHistoryReverseRequest{
Namespace: s.namespace,
Execution: &common.WorkflowExecution{
WorkflowId: s.workflowID,
RunId: s.runID,
},
NextPageToken: s.reverseNextToken,
},
)
if err != nil {
return nil, err
}
s.reverseNextToken = resp.GetNextPageToken()
if h := resp.GetHistory(); h != nil {
s.reverseBuf = h.GetEvents()
}
}
event := s.reverseBuf[0]
s.reverseBuf = s.reverseBuf[1:]
if isWorkflowTerminatingEvent(event.EventType) {
s.wfResult = event
}
return event, nil
}

type eventFieldValue struct {
field string
value string
Expand Down
6 changes: 6 additions & 0 deletions internal/temporalcli/commands.workflow_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,10 @@ func (c *TemporalWorkflowResultCommand) run(cctx *CommandContext, _ []string) er
}

func (c *TemporalWorkflowShowCommand) run(cctx *CommandContext, _ []string) error {
if c.Reverse && c.Follow {
return fmt.Errorf("--reverse cannot be combined with --follow")
}

// Call describe
cl, err := dialClient(cctx, &c.Parent.ClientOptions)
if err != nil {
Expand All @@ -583,10 +587,12 @@ func (c *TemporalWorkflowShowCommand) run(cctx *CommandContext, _ []string) erro
iter := &structuredHistoryIter{
ctx: cctx,
client: cl,
namespace: c.Parent.Namespace,
workflowID: c.WorkflowId,
runID: c.RunId,
includeDetails: c.Detailed,
follow: c.Follow,
reverse: c.Reverse,
}
if !cctx.JSONOutput {
cctx.Printer.Println(color.MagentaString("Progress:"))
Expand Down
87 changes: 87 additions & 0 deletions internal/temporalcli/commands.workflow_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,93 @@ func (s *SharedServerSuite) TestWorkflow_Show_JSON() {
s.NotContains(out, "Results:")
}

func (s *SharedServerSuite) TestWorkflow_Show_Reverse() {
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
workflow.GetSignalChannel(ctx, "my-signal").Receive(ctx, nil)
return "hi!", nil
})

run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue},
DevWorkflow,
"ignored",
)
s.NoError(err)
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil))
s.NoError(run.Get(s.Context, nil))

res := s.Execute(
"workflow", "show",
"--address", s.Address(),
"-w", run.GetID(),
"--reverse",
)
s.NoError(res.Err)
out := res.Stdout.String()
completedIdx := strings.Index(out, "WorkflowExecutionCompleted")
startedIdx := strings.Index(out, "WorkflowExecutionStarted")
s.Greater(completedIdx, -1, "output should include the completed event")
s.Greater(startedIdx, -1, "output should include the started event")
s.Less(completedIdx, startedIdx, "completed event should appear before started event in reverse order")
}

func (s *SharedServerSuite) TestWorkflow_Show_Reverse_JSON() {
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
workflow.GetSignalChannel(ctx, "my-signal").Receive(ctx, nil)
return "hi!", nil
})

run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue},
DevWorkflow,
"workflow-param",
)
s.NoError(err)
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil))
s.NoError(run.Get(s.Context, nil))

res := s.Execute(
"workflow", "show",
"--address", s.Address(),
"-w", run.GetID(),
"--reverse",
"-o", "json",
)
s.NoError(res.Err)
out := res.Stdout.String()

var parsed struct {
Events []struct {
EventId string `json:"eventId"`
} `json:"events"`
}
s.NoError(json.Unmarshal([]byte(out), &parsed))
s.GreaterOrEqual(len(parsed.Events), 2)
prev := int64(-1)
for i, e := range parsed.Events {
id, err := strconv.ParseInt(e.EventId, 10, 64)
s.NoError(err)
if i > 0 {
s.Less(id, prev, "event %d (id=%d) should have smaller eventId than previous (%d) in reverse order", i, id, prev)
}
prev = id
}
}

func (s *SharedServerSuite) TestWorkflow_Show_Reverse_RejectsFollow() {
res := s.Execute(
"workflow", "show",
"--address", s.Address(),
"-w", "does-not-matter",
"--reverse",
"--follow",
)
s.Error(res.Err)
s.ErrorContains(res.Err, "--reverse cannot be combined with --follow")
}

func (s *SharedServerSuite) TestWorkflow_List() {
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
return a, nil
Expand Down
4 changes: 4 additions & 0 deletions internal/temporalcli/commands.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3759,6 +3759,10 @@ commands:
description: |
Display events as detailed sections instead of table.
Does not apply to JSON output.
- name: reverse
type: bool
description: |
Fetch Event History newest-event-first. Cannot be combined with --follow.
option-sets:
- workflow-reference

Expand Down
Loading