Skip to content

feat: add Kafka event publishing to Livepeer fal runner#956

Merged
emranemran merged 3 commits intomainfrom
feat/livepeer-fal-kafka
Apr 20, 2026
Merged

feat: add Kafka event publishing to Livepeer fal runner#956
emranemran merged 3 commits intomainfrom
feat/livepeer-fal-kafka

Conversation

@emranemran
Copy link
Copy Markdown
Contributor

@emranemran emranemran commented Apr 16, 2026

Summary

Publishes session lifecycle events from the livepeer fal wrapper (src/scope/cloud/livepeer_fal_app.py) to Kafka so cloud sessions show up in ClickHouse alongside the existing fal_app.py events.

Events

  • websocket_connected — on orchestrator → fal WS accept, includes GPU type, fal region/runner/log labels.
  • websocket_disconnected — on teardown, adds duration_ms and session start/end timestamps.

Both events carry user_id (daydream user) and connection_id (Livepeer manifest ID).

How IDs are sourced

manifest_id and user_id are read from WebSocket upgrade headers (Manifest-Id, Daydream-User-Id) set by the orchestrator in livepeer/go-livepeer#3884 (already deployed to staging + prod). No message-body sniffing.

Dependencies

  • Runtime: aiokafka (added to both the fal wrapper requirements and the runner subprocess via --extra kafka).
  • Config (fal secrets): KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_SASL_USERNAME, KAFKA_SASL_PASSWORD. Kafka is disabled (events silently dropped) when unset.
  • Orchestrator-side header change from Serverless livepeer/go-livepeer#3884.

Not in scope

pipeline_loaded, session_created, stream_started, and stream_heartbeat already exist in the codebase (fired from pipeline_manager.py, webrtc.py, frame_processor.py) and will flow from the cloud runner once KAFKA_* env is set there. No changes in this PR.

Commits

  1. feat: add Kafka event publishing to livepeer fal wrapper (emran) — KafkaPublisher, env-var allowlist, aiokafka install, initial implementation.
  2. Extract manifest_id / user_id from websocket headers (Josh Allmann) — replace proxy sniff with header reads.
  3. chore: strip smoke-test and [KAFKA-DEBUG] prints (emran) — remove bootstrap scaffolding before merge.

Verification

Tested end-to-end against the deployed fal app (scope-livepeer-emran) with ./test-cloud-connect.sh (see #962 for the test harness). Both events confirmed landing in the production ClickHouse scope_cloud_events table with correct manifest_id and user_id.

🤖 Co-authored with Claude Code

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 16, 2026

Important

Review skipped

Auto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2fd44562-4181-4fb8-a1de-6ad9dc7d4ec2

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/livepeer-fal-kafka

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@emranemran emranemran force-pushed the feat/livepeer-fal-kafka branch 4 times, most recently from 364017e to 7bc13f8 Compare April 16, 2026 18:52
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 16, 2026

🚀 fal.ai Preview Deployment

App ID daydream/scope-pr-956--preview
WebSocket wss://fal.run/daydream/scope-pr-956--preview/ws
Commit af5d47b

Livepeer Runner

App ID daydream/scope-livepeer-pr-956--preview
WebSocket wss://fal.run/daydream/scope-livepeer-pr-956--preview/ws
Auth private

Testing Livepeer Mode

SCOPE_CLOUD_MODE=livepeer SCOPE_CLOUD_APP_ID="daydream/scope-livepeer-pr-956--preview/ws" uv run daydream-scope

Comment thread src/scope/cloud/livepeer_fal_app.py Outdated
@emranemran emranemran force-pushed the feat/livepeer-fal-kafka branch from 3f6b790 to edd2fb3 Compare April 18, 2026 04:02
emranemran and others added 3 commits April 19, 2026 21:16
Publish lifecycle events from the fal runner proxy
(src/scope/cloud/livepeer_fal_app.py) to Kafka for cloud session
observability.

- Adds a KafkaPublisher wrapping aiokafka with env-var-based config
  (KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_SASL_USERNAME,
  KAFKA_SASL_PASSWORD) and SASL_SSL when credentials are set.
- Forwards those env vars to the runner subprocess and installs
  aiokafka via --extra kafka so scope.server.kafka_publisher in the
  runner can also emit events.
- Publishes websocket_connected on /ws accept and websocket_disconnected
  on teardown, including GPU type, fal region/runner/log labels,
  duration, and session start/end timestamps.

Initial implementation intercepts the first client message in the proxy
to learn manifest_id / user_id; a follow-up commit replaces that with
header extraction so no message parsing is needed.

Signed-off-by: emranemran <emran@livepeer.org>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Use headers instead of sniffing websocket messages.
Remove the one-time Kafka smoke-test publish and the print() tracing
that were added while bootstrapping the Kafka path. The KafkaPublisher
class retains its own [Kafka] structured logs; actual websocket_connected
and websocket_disconnected events still publish as before.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: emranemran <emran.mah@gmail.com>
@emranemran emranemran force-pushed the feat/livepeer-fal-kafka branch from be98308 to af5d47b Compare April 20, 2026 05:04
@emranemran emranemran requested a review from leszko April 20, 2026 05:05
@emranemran
Copy link
Copy Markdown
Contributor Author

Tested e2e and works as expected. I'm seeing kafka messages in clickhouse now.

print(f"[Kafka] Failed to start producer: {e}")
return False

async def stop(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where this is used? Do we even need it?

Copy link
Copy Markdown
Contributor

@j0sh j0sh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit around the stop method possibly being unused, otherwise LGTM

@emranemran emranemran merged commit 21736af into main Apr 20, 2026
10 of 11 checks passed
emranemran added a commit that referenced this pull request Apr 21, 2026
…969)

## Summary

Follow-up to #956. That PR added `websocket_connected` /
`websocket_disconnected` from the livepeer fal wrapper, but the rest of
the session lifecycle (`pipeline_loaded`, `session_created`,
`stream_started`, `stream_heartbeat`, `stream_stopped`,
`playback_ready`, `error` variants) was either not firing or firing with
null identifiers in livepeer mode. Cloud-relay mode (`fal_app.py` path)
already had these events working; this PR brings livepeer mode to the
same shape.

## Root causes (all in the runner, \`src/scope/cloud/livepeer_app.py\`)

1. \`FrameProcessor(...)\` was constructed without \`user_id\` /
\`connection_id\` / \`session_id\` / \`connection_info\`, so every event
it emits (\`stream_started\`, \`stream_heartbeat\`, \`stream_stopped\`,
etc.) carried nulls.
2. \`session.manifest_id\` was never populated from
\`job_info.manifest_id\`, so there was no identifier available that
matched the wrapper's \`websocket_connected.connection_id\`.
3. The pipeline/load body injection used the runner's random internal
UUID as \`connection_id\` instead of the manifest_id, so
\`pipeline_loaded\` didn't correlate.
4. \`session_created\` / \`session_closed\` are only emitted from
\`webrtc.py\`, which isn't on the livepeer code path — they have to be
emitted explicitly.
5. \`connection_info\` env vars (\`NOMAD_DC\`, \`FAL_JOB_ID\`,
\`FAL_LOG_LABELS\`, etc.) weren't in the runner subprocess allowlist, so
the runner couldn't rebuild the dict even if asked.

## Changes

**\`src/scope/cloud/livepeer_app.py\`**
- Add \`manifest_id\` / \`session_id\` / \`connection_info\` fields to
\`LivepeerSession\`; populate them right after parsing \`ScopeJobInfo\`.
- New \`_build_connection_info()\` helper that mirrors the shape built
by \`livepeer_fal_app.py\` from env vars.
- Pass \`session_id\`, \`user_id\`, \`connection_id=manifest_id\`,
\`connection_info\` into \`FrameProcessor\` so every downstream event
from \`frame_processor.py\` / \`pipeline_processor.py\` carries
identifiers that join with \`websocket_connected\`.
- Emit \`session_created\` right after \`FrameProcessor.start()\`, and
\`session_closed\` right after \`FrameProcessor.stop()\`, using the same
shape as \`webrtc.py:731\` and \`:1150\`.
- Flip the pipeline/load body injection from \`session.connection_id\`
to \`session.manifest_id\`, and pass \`connection_info\` too.

**\`src/scope/cloud/livepeer_fal_app.py\`**
- Add \`NOMAD_DC\`, \`FAL_JOB_ID\`, \`FAL_RUNNER_ID\`,
\`FAL_LOG_LABELS\`, \`FAL_MACHINE_TYPE\` to \`env_allowlist\` so the
runner subprocess can reconstruct \`connection_info\`.

## Test plan

- [ ] \`uv run ruff check src/\` and \`ruff format --check src/\` pass
(already verified)
- [ ] \`uv run daydream-scope\` starts without import/init errors
- [ ] \`./test-cloud-connect.sh --skip-push\` after CI build-cloud
succeeds → exit 0 with CONNECTED
- [ ] Manual UI test: connect to cloud, load pipeline (\`longlive\`),
start stream ~30s, disconnect
- [ ] ClickHouse query on \`scope_cloud_events\` filtered by \`user_id\`
and \`connection_id = <manifest_id>\` shows all of:
\`websocket_connected\`, \`pipeline_load_start\`, \`pipeline_loaded\`,
\`session_created\`, \`stream_started\`, ≥2 \`stream_heartbeat\`,
\`stream_stopped\`, \`session_closed\`, \`websocket_disconnected\` — all
sharing the same connection_id
- [ ] Regression check on cloud-relay path (\`fal_app.py\` deploy): no
changes, events unchanged

## Not in scope

- Local-scope-side events (\`session_created\` from WebRTC path on
user's laptop, \`session_closed\`, webrtc connection errors) still
require the user's local env to have \`KAFKA_*\` configured; that's a
config concern, not code.
- \`/api/v1/session/start\` not being livepeer-compatible
(\`mcp_router.py:252\` TODO) — untouched; only affects headless test
flows.

🤖 Co-authored with [Claude Code](https://claude.com/claude-code)

Signed-off-by: emranemran <emran.mah@gmail.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants