Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions cmd/entire/cli/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,49 @@ type TextGenerator interface {
GenerateText(ctx context.Context, prompt string, model string) (string, error)
}

// ProgressPhase identifies a coarse stage in streaming text generation.
type ProgressPhase string

const (
// PhaseConnecting is emitted once when the CLI signals it is making the upstream request.
PhaseConnecting ProgressPhase = "connecting"
// PhaseFirstToken is emitted once when the upstream responds with the first event,
// carrying TTFT and input/cache token counts.
PhaseFirstToken ProgressPhase = "first-token"
// PhaseGenerating is emitted repeatedly as text or thinking deltas arrive.
// OutputTokens carries a running estimate based on delta sizes.
PhaseGenerating ProgressPhase = "generating"
// PhaseDone is emitted once when the final result event is received without error.
PhaseDone ProgressPhase = "done"
)

// GenerationProgress reports a snapshot of streaming text generation progress.
// Fields not relevant to the current Phase may be zero-valued.
type GenerationProgress struct {
Phase ProgressPhase
OutputTokens int // running estimate during PhaseGenerating; final at PhaseDone
InputTokens int // populated at PhaseFirstToken
CachedInputTokens int // populated at PhaseFirstToken
TTFTms int // time-to-first-token, populated at PhaseFirstToken
DurationMs int // populated at PhaseDone (final result event)
}

// ProgressFn receives streaming progress updates. It must not block — invoke it
// from the same goroutine that reads the stream and keep handlers fast.
type ProgressFn func(GenerationProgress)

// StreamingTextGenerator is an optional interface for text generators whose
// underlying CLI exposes a streaming output mode. Callers can use AsStreamingTextGenerator
// to detect support and fall back to plain GenerateText when unavailable.
type StreamingTextGenerator interface {
Agent

// GenerateTextStreaming invokes the agent's streaming text generation and
// calls progress for each phase update. progress may be nil to suppress
// reporting. The returned string is the final response text.
GenerateTextStreaming(ctx context.Context, prompt, model string, progress ProgressFn) (string, error)
}

// HookResponseWriter is implemented by agents that support structured hook responses.
// Agents that implement this can output messages (e.g., banners) to the user via
// the agent's response protocol. For example, Claude Code outputs JSON with a
Expand Down
17 changes: 17 additions & 0 deletions cmd/entire/cli/agent/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type DeclaredCaps struct {
TranscriptPreparer bool `json:"transcript_preparer"`
TokenCalculator bool `json:"token_calculator"`
TextGenerator bool `json:"text_generator"`
StreamingTextGenerator bool `json:"streaming_text_generator"`
HookResponseWriter bool `json:"hook_response_writer"`
SubagentAwareExtractor bool `json:"subagent_aware_extractor"`
}
Expand Down Expand Up @@ -105,6 +106,22 @@ func AsTextGenerator(ag Agent) (TextGenerator, bool) { //nolint:ireturn // type-
return tg, true
}

// AsStreamingTextGenerator returns the agent as StreamingTextGenerator if it both
// implements the interface and (for CapabilityDeclarer agents) has declared the capability.
func AsStreamingTextGenerator(ag Agent) (StreamingTextGenerator, bool) { //nolint:ireturn // type-assertion helper must return interface
if ag == nil {
return nil, false
}
stg, ok := ag.(StreamingTextGenerator)
if !ok {
return nil, false
}
if cd, ok := ag.(CapabilityDeclarer); ok {
return stg, cd.DeclaredCapabilities().StreamingTextGenerator
}
return stg, true
}

// AsHookResponseWriter returns the agent as HookResponseWriter if it both
// implements the interface and (for CapabilityDeclarer agents) has declared the capability.
func AsHookResponseWriter(ag Agent) (HookResponseWriter, bool) { //nolint:ireturn // type-assertion helper must return interface
Expand Down
54 changes: 54 additions & 0 deletions cmd/entire/cli/agent/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,20 @@ func (m *mockFullAgent) CalculateTotalTokenUsage([]byte, int, string) (*TokenUsa
return nil, nil //nolint:nilnil // test mock
}

// StreamingTextGenerator
func (m *mockFullAgent) GenerateTextStreaming(context.Context, string, string, ProgressFn) (string, error) {
return "", nil
}

// mockBuiltinStreamingAgent is a built-in agent that implements StreamingTextGenerator but NOT CapabilityDeclarer.
type mockBuiltinStreamingAgent struct {
mockBaseAgent
}

func (m *mockBuiltinStreamingAgent) GenerateTextStreaming(context.Context, string, string, ProgressFn) (string, error) {
return "", nil
}

// mockBuiltinPromptAgent is a built-in agent that implements PromptExtractor but NOT CapabilityDeclarer.
type mockBuiltinPromptAgent struct {
mockBaseAgent
Expand Down Expand Up @@ -371,3 +385,43 @@ func TestAsPromptExtractor(t *testing.T) {
}
})
}

func TestAsStreamingTextGenerator(t *testing.T) {
t.Parallel()

t.Run("not implemented", func(t *testing.T) {
t.Parallel()
ag := &mockBaseAgent{}
_, ok := AsStreamingTextGenerator(ag)
if ok {
t.Error("expected false for agent not implementing StreamingTextGenerator")
}
})

t.Run("builtin agent", func(t *testing.T) {
t.Parallel()
ag := &mockBuiltinStreamingAgent{}
stg, ok := AsStreamingTextGenerator(ag)
if !ok || stg == nil {
t.Error("expected true for built-in agent implementing StreamingTextGenerator")
}
})

t.Run("declared true", func(t *testing.T) {
t.Parallel()
ag := &mockFullAgent{caps: DeclaredCaps{StreamingTextGenerator: true}}
stg, ok := AsStreamingTextGenerator(ag)
if !ok || stg == nil {
t.Error("expected true when capability declared true")
}
})

t.Run("declared false", func(t *testing.T) {
t.Parallel()
ag := &mockFullAgent{caps: DeclaredCaps{StreamingTextGenerator: false}}
_, ok := AsStreamingTextGenerator(ag)
if ok {
t.Error("expected false when capability declared false")
}
})
}
166 changes: 166 additions & 0 deletions cmd/entire/cli/agent/claudecode/generate_streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package claudecode

import (
"bytes"
"context"
"errors"
"fmt"
"os"
"os/exec"
"strings"

"github.com/entireio/cli/cmd/entire/cli/agent"
)

// GenerateTextStreaming runs the Claude CLI in stream-json mode, dispatches
// progress events to the optional callback, and returns the final result text.
// Implements the agent.StreamingTextGenerator interface.
//
// If the CLI rejects the stream-json flags (older Claude CLI), this falls back
// to the non-streaming GenerateText path — without progress events.
func (c *ClaudeCodeAgent) GenerateTextStreaming(
ctx context.Context,
prompt, model string,
progress agent.ProgressFn,
) (string, error) {
if model == "" {
model = "haiku"
}

commandRunner := c.CommandRunner
if commandRunner == nil {
commandRunner = exec.CommandContext
}

cmd := commandRunner(ctx, "claude",
"--print",
"--output-format", "stream-json",
"--verbose",
"--model", model,
"--setting-sources", "")

cmd.Dir = os.TempDir()
cmd.Env = agent.StripGitEnv(os.Environ())
cmd.Stdin = strings.NewReader(prompt)

stdout, err := cmd.StdoutPipe()
if err != nil {
return "", fmt.Errorf("claude stream stdout pipe: %w", err)
}
var stderr bytes.Buffer
cmd.Stderr = &stderr

if err := cmd.Start(); err != nil {
return "", fmt.Errorf("claude stream start: %w", err)
}

final, parseErr := streamClaudeResponse(stdout, makeProgressDispatcher(progress))
waitErr := cmd.Wait()

// Context errors pass through as sentinels so callers can use errors.Is.
if ctx.Err() != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return "", context.DeadlineExceeded
}
return "", context.Canceled
}

if final != nil {
if !final.IsError {
if final.Result == nil {
return "", errors.New("claude returned empty result")
}
if progress != nil {
progress(agent.GenerationProgress{
Phase: agent.PhaseDone,
OutputTokens: outputTokensFromUsage(final.Usage),
DurationMs: final.DurationMs,
})
}
return *final.Result, nil
}
msg := "claude CLI reported error"
if final.Result != nil && *final.Result != "" {
msg = fmt.Sprintf("%s: %s", msg, *final.Result)
}
if final.APIErrorStatus != nil {
msg = fmt.Sprintf("%s (HTTP %d)", msg, *final.APIErrorStatus)
}
return "", errors.New(msg)
}

// No envelope: check if the CLI rejected streaming flags (older version).
// If so, fall back to the non-streaming path.
if waitErr != nil {
stderrStr := stderr.String()
if looksLikeUnrecognizedFlag(stderrStr) {
return c.GenerateText(ctx, prompt, model)
}
if stderrStr != "" {
return "", fmt.Errorf("claude stream failed: %s: %w", strings.TrimSpace(stderrStr), waitErr)
}
return "", fmt.Errorf("claude stream failed: %w", waitErr)
}

if parseErr != nil {
return "", fmt.Errorf("claude stream parse: %w", parseErr)
}
return "", errors.New("claude exited without producing a result")
}

// makeProgressDispatcher returns a per-event handler that translates raw
// stream events into agent.GenerationProgress callbacks. PhaseDone is
// emitted by GenerateTextStreaming after cmd.Wait, because it needs data
// from the parsed final envelope (OutputTokens, DurationMs).
func makeProgressDispatcher(progress agent.ProgressFn) func(StreamEvent) {
if progress == nil {
return func(StreamEvent) {} // no-op: drain events
}
var outputTokensEstimate int
return func(ev StreamEvent) {
switch {
case ev.Type == "system" && ev.Subtype == "status" && ev.Status == "requesting":
progress(agent.GenerationProgress{Phase: agent.PhaseConnecting})
case ev.Type == "stream_event" && ev.Event.Type == "message_start":
p := agent.GenerationProgress{Phase: agent.PhaseFirstToken, TTFTms: ev.TTFTms}
if ev.Event.Message != nil && ev.Event.Message.Usage != nil {
p.InputTokens = ev.Event.Message.Usage.InputTokens
p.CachedInputTokens = ev.Event.Message.Usage.CacheReadInputTokens
}
progress(p)
case ev.Type == "stream_event" && ev.Event.Type == "content_block_delta" && ev.Event.Delta != nil:
text := ev.Event.Delta.Text
if text == "" {
text = ev.Event.Delta.Thinking
}
outputTokensEstimate += len(text) / 4 // rough estimate: ~4 chars/token
progress(agent.GenerationProgress{Phase: agent.PhaseGenerating, OutputTokens: outputTokensEstimate})
}
}
}

func outputTokensFromUsage(u *messageUsage) int {
if u == nil {
return 0
}
return u.OutputTokens
}

// looksLikeUnrecognizedFlag returns true if stderr indicates the CLI
// rejected one of the streaming-specific flags (older Claude CLI that
// doesn't support stream-json or --verbose). Requires both a rejection
// phrase AND a streaming flag name to avoid false-positives on unrelated
// errors that happen to contain "unknown option".
func looksLikeUnrecognizedFlag(stderr string) bool {
lower := strings.ToLower(stderr)
hasRejectPhrase := strings.Contains(lower, "unrecognized option") ||
strings.Contains(lower, "unknown flag") ||
strings.Contains(lower, "unknown option") ||
strings.Contains(lower, "invalid option")
if !hasRejectPhrase {
return false
}
return strings.Contains(lower, "stream-json") ||
strings.Contains(lower, "verbose") ||
strings.Contains(lower, "include-partial")
}
Loading