Skip to content

feat: Expose prediction SSE streams#3019

Open
markphelps wants to merge 5 commits into
mainfrom
cog-streaming-2-sse
Open

feat: Expose prediction SSE streams#3019
markphelps wants to merge 5 commits into
mainfrom
cog-streaming-2-sse

Conversation

@markphelps
Copy link
Copy Markdown
Collaborator

@markphelps markphelps commented May 14, 2026

Summary

  • Reuse POST /predictions and PUT /predictions/{id} for Server-Sent Events by returning an SSE stream when requests send Accept: text/event-stream.
  • Preserve existing non-SSE behavior: sync prediction requests still return the blocking JSON response, and Prefer: respond-async without SSE still returns 202 JSON.

CLI behavior

  • This PR does not add a cog predict --stream option or otherwise change the predict CLI.
  • Predictions made through the Cog CLI continue to default to the existing non-streaming behavior.

Copy link
Copy Markdown
Contributor

@ask-bonk ask-bonk Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm Bonk, and I've done a quick review of your PR.

This PR adds Server-Sent Events (SSE) streaming for predictions with event replay for late subscribers. The implementation looks solid overall with good test coverage. I found one potential resource concern that should be addressed:

Issues

1. Unbounded memory growth in stream_history (Medium)

The stream_history Vec in Prediction grows unbounded during prediction execution. For long-running predictions with frequent output chunks, logs, or metrics, this could cause memory exhaustion. Consider adding a limit to the history size or making it configurable.

Comment thread crates/coglet/src/prediction.rs
@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 14, 2026

I've completed my review of PR #3019. The PR adds SSE streaming for predictions with event replay functionality. The implementation is well-structured and includes comprehensive tests.

Summary of findings:

I identified one medium-severity issue regarding unbounded memory growth in the stream_history Vec. For long-running predictions that generate many output chunks, logs, or metrics, this could lead to memory exhaustion. I've posted a review comment suggesting either adding a configurable limit to the history size or implementing a mechanism to drop old events when a threshold is exceeded.

The rest of the implementation looks solid:

  • Good use of tokio::sync::broadcast for the streaming channel
  • Proper handling of late subscribers via replay
  • Correct cancellation semantics for sync vs async predictions
  • Comprehensive test coverage including integration tests
  • Clean SSE stream implementation with proper event formatting

github run

@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 14, 2026

Based on my review of PR #3019, I agree with the previous reviewer's finding about unbounded memory growth in stream_history. However, since that issue was already flagged, I have no additional actionable issues to report.

The implementation is well-structured:

  • Clean SSE stream implementation with proper event replay for late subscribers
  • Correct handling of sync vs async prediction cancellation semantics
  • Comprehensive test coverage including integration tests
  • Good documentation with working examples
  • Proper error handling for broadcast channel lag cases

LGTM

github run

Base automatically changed from cog-streaming-1 to main May 14, 2026 19:32
@markphelps markphelps force-pushed the cog-streaming-2-sse branch from cfa865e to c61e323 Compare May 14, 2026 19:38
@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 14, 2026

Based on my thorough review of PR #3019, I have analyzed all the changed files:

Summary of changes:
This PR adds Server-Sent Events (SSE) streaming for predictions with event replay for late subscribers. It exposes GET /predictions/{id}/stream as an SSE endpoint that streams lifecycle events (start, output, log, metric, completed) in real-time.

Previous findings:
The unbounded memory growth issue in stream_history was already flagged by the previous reviewer. This is a legitimate concern where long-running predictions with frequent output chunks could cause memory exhaustion.

My independent assessment:
I've verified the following aspects of the implementation:

  1. Broadcast channel - Uses a 1024-slot channel with proper lag handling (logs warning and continues)
  2. Stream cancellation semantics - Correctly distinguishes between sync (cancel on disconnect) and async (no cancel) predictions via PredictionStreamGuard::drop()
  3. Drop order - Verified that field drop order in StreamState ensures the receiver is still alive when the guard's Drop runs
  4. Event replay - Properly replays history for late subscribers before subscribing to live events
  5. Keep-alive - SSE stream includes 15-second keep-alive messages
  6. Integration test - Comprehensive test covering the full SSE flow
  7. Example - Working example with Hugging Face transformers model

No additional actionable issues beyond what was already flagged.

LGTM

github run

@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 14, 2026

After reviewing PR #3019, I've analyzed the changes to add Server-Sent Events (SSE) streaming for predictions. The implementation reuses POST /predictions and PUT /predictions/{id} with content negotiation via Accept: text/event-stream, removing the separate GET /predictions/{id}/stream route.

Key aspects reviewed:

  • Clean SSE stream implementation with 15s keep-alive and proper event formatting
  • Correct cancellation semantics distinguishing sync vs async predictions via PredictionStreamGuard
  • Event replay for late subscribers using stream_history Vec
  • Comprehensive test coverage including unit tests and integration tests
  • Good documentation with working examples
  • Preserved backward compatibility for non-SSE requests

Previously flagged issue:
The unbounded memory growth in stream_history Vec was already identified by a previous reviewer. This is a legitimate concern for long-running predictions with frequent events.

No additional actionable issues found beyond what was already flagged.

LGTM

github run

@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 14, 2026

Based on my thorough review of PR #3019, I can see this PR adds Server-Sent Events (SSE) streaming for predictions by reusing POST /predictions and PUT /predictions/{id} with content negotiation via Accept: text/event-stream.

Key changes reviewed:

  • prediction.rs: Bounded stream history (1024 events) with proper event replay for late subscribers
  • service.rs: PredictionStreamGuard correctly distinguishes sync vs async prediction cancellation semantics
  • routes.rs: Clean SSE stream implementation with 15s keep-alive, content negotiation, removed separate /stream route
  • Tests: Comprehensive unit tests and integration test coverage

Previously flagged issue (addressed):
The unbounded memory growth concern in stream_history has been resolved with MAX_STREAM_HISTORY_EVENTS: usize = 1024 (line 13), and the emit_stream_event method properly removes old events when the limit is reached (lines 195-201).

Additional verification:

  • Terminal state guards prevent overwriting terminal states
  • Drop order in StreamState ensures receiver is alive when guard's Drop runs
  • Sync predictions cancel on disconnect; async predictions do not
  • Backward compatibility preserved for non-SSE requests

LGTM

github run

@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 14, 2026

Based on my review of PR #3019, I can confirm that the previously flagged issue regarding unbounded memory growth in stream_history has been addressed:

Verified fixes:

  • MAX_STREAM_HISTORY_EVENTS: usize = 1024 is defined at line 13 in prediction.rs
  • The emit_stream_event method (lines 199-206) correctly removes old events when the limit is reached and tracks skipped events

Additional verification:

  • Bounded replay test exists at line 727 in prediction.rs
  • Terminal state guards prevent overwriting terminal states (lines 230-231, 249-250, 262-263)
  • PredictionStreamGuard::drop() correctly distinguishes sync vs async cancellation (lines 141-153 in service.rs)
  • Proper drop order in StreamState ensures receiver stays alive during guard drop

The implementation is well-structured with comprehensive test coverage including unit tests for lagged streams, truncated replay, and integration tests.

LGTM

github run

@markphelps markphelps marked this pull request as ready for review May 14, 2026 21:37
@markphelps markphelps requested a review from a team as a code owner May 14, 2026 21:37
@markphelps markphelps requested a review from michaeldwan May 14, 2026 21:37
@markphelps markphelps changed the title Expose prediction SSE streams feat: Expose prediction SSE streams May 14, 2026
Copy link
Copy Markdown
Member

@michaeldwan michaeldwan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid design. Main point discussed offline: streaming needs to be opt-in at the handler level via a decorator/attribute. Five concrete changes in the comments.


fn emit_stream_event(&mut self, event: PredictionStreamEvent) {
if self.stream_history.len() == MAX_STREAM_HISTORY_EVENTS {
self.stream_history.remove(0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vec::remove(0) is O(n) on every event after the buffer fills -- thousands of shifts per prediction for token-by-token LLM output. VecDeque::pop_front() is O(1).

prediction_id.clone(),
input.clone(),
webhook_sender,
response_mode != PredictionResponseMode::AsyncJson,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response_mode != PredictionResponseMode::AsyncJson is true for SyncJson too. Should be response_mode == PredictionResponseMode::AsyncSse -- the current expression is inert for sync mode but reads as a bug.


impl Prediction {
pub fn new(id: String, webhook: Option<WebhookSender>) -> Self {
let (stream_tx, _) = tokio::sync::broadcast::channel(1024);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Channel capacity and history cap are both 1024 but defined independently. Use a shared constant so they can't drift.

return;
}

if self.service.stream_receiver_count(&self.id) == 0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async cleanup task calls remove_prediction while the SSE client may still be draining. After removal, stream_receiver_count returns unwrap_or(0) and prediction_is_terminal returns unwrap_or(true) -- which happen to be the safe defaults. That invariant needs a comment, or hold an Arc to the entry in the guard so it doesn't depend on post-removal lookup.

cog serve --upload-url http://unused/

curl -H Accept:text/event-stream PUT /predictions/sse-stream-test '{"id":"sse-stream-test","input":{}}'
stdout 'event: output'
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing stdout 'event: start' -- should verify the full lifecycle.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants