Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions src/scope/cloud/livepeer_fal_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,121 @@
RUNNER_FAILURE_WINDOW_SECONDS = 60.0
ASSETS_DIR_PATH = "/tmp/.daydream-scope/assets"


# ---------------------------------------------------------------------------
# Kafka publisher — matches fal_app.py KafkaPublisher for event parity
# ---------------------------------------------------------------------------

kafka_publisher: "KafkaPublisher | None" = None


class KafkaPublisher:
"""Async Kafka event publisher for fal.ai websocket events."""

def __init__(self):
self._producer = None
self._started = False
self._topic = None

async def start(self) -> bool:
"""Start the Kafka producer."""
import json as _json # noqa: F811

bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
self._topic = os.getenv("KAFKA_TOPIC", "network_events")
sasl_username = os.getenv("KAFKA_SASL_USERNAME")
sasl_password = os.getenv("KAFKA_SASL_PASSWORD")

print(
f"[Kafka] Starting publisher (KAFKA_BOOTSTRAP_SERVERS={bootstrap_servers})"
)
if not bootstrap_servers:
print("[Kafka] Not configured, event publishing disabled")
return False

try:
from aiokafka import AIOKafkaProducer

config = {
"bootstrap_servers": bootstrap_servers,
"value_serializer": lambda v: _json.dumps(v).encode("utf-8"),
"key_serializer": lambda k: k.encode("utf-8") if k else None,
}

if sasl_username and sasl_password:
import ssl

ssl_context = ssl.create_default_context()
config.update(
{
"security_protocol": "SASL_SSL",
"sasl_mechanism": "PLAIN",
"sasl_plain_username": sasl_username,
"sasl_plain_password": sasl_password,
"ssl_context": ssl_context,
}
)

self._producer = AIOKafkaProducer(**config)
await self._producer.start()
self._started = True
print(f"[Kafka] Publisher started, topic: {self._topic}")
return True

except ImportError:
print("[Kafka] aiokafka not installed, Kafka disabled")
return False
except Exception as e:
print(f"[Kafka] Failed to start producer: {e}")
return False

async def stop(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where this is used? Do we even need it?

"""Stop the Kafka producer."""
if self._producer and self._started:
try:
await self._producer.stop()
print("[Kafka] Publisher stopped")
except Exception as e:
print(f"[Kafka] Error stopping producer: {e}")
finally:
self._started = False
self._producer = None

async def publish(self, event_type: str, data: dict) -> bool:
"""Publish an event to Kafka."""
import uuid as _uuid

if not self._started or not self._producer:
return False

event_id = str(_uuid.uuid4())
timestamp_ms = str(int(time.time() * 1000))

event = {
"id": event_id,
"type": "stream_trace",
"timestamp": timestamp_ms,
"data": {
"type": event_type,
"client_source": "scope",
"timestamp": timestamp_ms,
**data,
},
}

try:
await self._producer.send_and_wait(self._topic, value=event, key=event_id)
print(f"[Kafka] Published event: {event_type}")
return True
except Exception as e:
print(f"[Kafka] Failed to publish event {event_type}: {e}")
return False

@property
def is_running(self) -> bool:
return self._started


# Gates startup cleanup so only one cleanup run executes at a time.
_cleanup_event: asyncio.Event | None = None

Expand Down Expand Up @@ -148,6 +263,8 @@ def _build_runner_command() -> list[str]:
"run",
"--extra",
"livepeer",
"--extra",
"kafka",
"livepeer-runner",
"--host",
RUNNER_HOST,
Expand All @@ -162,6 +279,7 @@ async def _proxy_ws(client_ws: WebSocket) -> None:
Raises WebSocketDisconnect if the client disconnects.
Returns normally if the runner connection drops.
"""

import websockets
from websockets.exceptions import ConnectionClosed

Expand Down Expand Up @@ -228,6 +346,7 @@ class LivepeerScopeApp(fal.App, keep_alive=300):
requirements = [
"websockets",
"httpx",
"aiokafka",
]

def setup(self):
Expand Down Expand Up @@ -266,6 +385,11 @@ def setup(self):
"DAYDREAM_SCOPE_BUNDLED_PLUGINS_FILE",
"LIVEPEER_DEBUG",
"UV_CACHE_DIR",
# Kafka (for scope.server.kafka_publisher in the runner subprocess)
"KAFKA_BOOTSTRAP_SERVERS",
"KAFKA_TOPIC",
"KAFKA_SASL_USERNAME",
"KAFKA_SASL_PASSWORD",
]
runner_env = {k: os.environ[k] for k in env_allowlist if k in os.environ}
runner_env.setdefault("UV_CACHE_DIR", "/tmp/uv-cache")
Expand Down Expand Up @@ -317,6 +441,46 @@ async def websocket_handler(self, client_ws: WebSocket) -> None:

await client_ws.accept()

# Initialize Kafka publisher (lazy, once per process).
global kafka_publisher
if kafka_publisher is None:
kafka_publisher = KafkaPublisher()
await kafka_publisher.start()

connection_start_time = time.time()
metadata: dict = {}
manifest_id = client_ws.headers.get("manifest-id")
user_id = client_ws.headers.get("daydream-user-id")
metadata["manifest_id"] = manifest_id
metadata["user_id"] = user_id

import json

fal_log_labels_raw = os.getenv("FAL_LOG_LABELS", "unknown")
try:
fal_log_labels = json.loads(fal_log_labels_raw)
except (json.JSONDecodeError, TypeError):
fal_log_labels = fal_log_labels_raw

connection_info = {
"gpu_type": LivepeerScopeApp.machine_type,
"fal_region": os.getenv("NOMAD_DC", "unknown"),
"fal_runner_id": os.getenv(
"FAL_JOB_ID", os.getenv("FAL_RUNNER_ID", "unknown")
),
"fal_log_labels": fal_log_labels,
}
metadata["connection_info"] = connection_info
if kafka_publisher is not None and kafka_publisher.is_running:
await kafka_publisher.publish(
"websocket_connected",
{
"user_id": user_id,
"connection_id": manifest_id,
"connection_info": connection_info,
},
)

# Ensure any previous session data is cleaned up
event = _get_cleanup_event()
await event.wait()
Expand Down Expand Up @@ -358,6 +522,21 @@ async def websocket_handler(self, client_ws: WebSocket) -> None:
except Exception as exc:
print(f"Livepeer fal ws proxy error: {type(exc).__name__}: {exc}")
finally:
if kafka_publisher and kafka_publisher.is_running:
end_time = time.time()
elapsed_ms = int((end_time - connection_start_time) * 1000)
await kafka_publisher.publish(
"websocket_disconnected",
{
"user_id": metadata.get("user_id"),
"connection_id": metadata.get("manifest_id"),
"connection_info": connection_info,
"duration_ms": elapsed_ms,
"session_start_time_ms": int(connection_start_time * 1000),
"session_end_time_ms": int(end_time * 1000),
},
)

await run_cleanup()
with suppress(Exception):
await client_ws.close()
Expand Down
Loading