Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
44ceff7
Add workflow_streams samples: order_workflow scenario
jssmith Apr 29, 2026
faac49f
samples: workflow_stream: add reconnecting-subscriber scenario
jssmith Apr 29, 2026
b607117
samples: workflow_stream: add external-publisher scenario
jssmith Apr 29, 2026
91233b0
samples: workflow_stream: add truncating-ticker scenario
jssmith Apr 29, 2026
78062b4
samples: rename workflow_stream → workflow_streams; migrate to topic …
jssmith Apr 30, 2026
5d67b9e
samples: workflow_streams review polish
jssmith Apr 30, 2026
6294691
workflow_streams: deliver terminal events + fix run_publisher subscri…
jssmith Apr 30, 2026
bfbb2ed
workflow_streams README: document the stream-end pattern
jssmith Apr 30, 2026
fb3c8fc
Merge main into workflow-streams-samples
jssmith May 3, 2026
0962379
samples: workflow_streams: README and wheel packages cleanup
jssmith May 3, 2026
d5cc2fe
samples: workflow_streams: drop force_flush=True from charge_card
jssmith May 3, 2026
553bfdb
samples: workflow_streams: drop temp-file resume offset; add stats co…
jssmith May 3, 2026
c107687
samples: workflow_streams: surface multiple truncation jumps in ticker
jssmith May 3, 2026
31b6cf0
samples: workflow_streams: add LLM-streaming scenario
jssmith May 3, 2026
e8620c6
samples: workflow_streams: drop chat-stream openai upper cap
jssmith May 3, 2026
0b4cbc8
samples: workflow_streams: chat consumer header + cursor save/restore
jssmith May 3, 2026
81bf605
samples: workflow_streams: rename chat -> llm in scenario 5
jssmith May 3, 2026
c8663e5
samples: workflow_streams: race the LLM consumer with workflow result
jssmith May 3, 2026
44d944b
samples: workflow_streams: drop race_with_workflow helper
jssmith May 3, 2026
a760ad3
samples: workflow_streams: reorganize README; drop closing section
jssmith May 3, 2026
dc381c5
samples: workflow_streams: drop README Notes section
jssmith May 3, 2026
7a5065e
samples: workflow_streams: lock llm-stream dependency group
jssmith May 3, 2026
51f2f2d
samples: workflow_streams: fix lint failures (ruff isort + format)
jssmith May 3, 2026
2f39146
samples: workflow_streams: drop BFF jargon and Expected output block
jssmith May 3, 2026
f1814e5
Apply suggestion from @brianstrauch
brianstrauch May 4, 2026
be8cf92
Apply suggestion from @brianstrauch
brianstrauch May 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [prometheus](prometheus) - Configure Prometheus metrics on clients/workers.
* [workflow_streams](workflow_streams) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_streams`. **Experimental — requires the [`contrib/pubsub` branch](https://github.com/temporalio/sdk-python/tree/contrib/pubsub) of sdk-python.**
Comment thread
brianstrauch marked this conversation as resolved.
Outdated
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
* [sentry](sentry) - Report errors to Sentry.
Expand Down
170 changes: 170 additions & 0 deletions workflow_streams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# Workflow Streams

> **Experimental.** These samples target the
> `temporalio.contrib.workflow_streams` module on the
> [`contrib/pubsub` branch of sdk-python][branch], which is not yet
> released. To run them locally, install sdk-python from that branch
> (e.g. `uv pip install -e <path-to-sdk-python>` after checking out the
> branch).

[branch]: https://github.com/temporalio/sdk-python/tree/contrib/pubsub

`temporalio.contrib.workflow_streams` lets a workflow host a durable,
offset-addressed event channel. The workflow holds an append-only log;
external clients (activities, starters, BFFs) publish to topics via
signals and subscribe via long-poll updates. This packages the
boilerplate — batching, offset tracking, topic filtering, continue-as-new
hand-off — into a reusable stream.

This directory has four scenarios sharing one Worker.

**Scenario 1 — basic publish/subscribe with heterogeneous topics:**

* `workflows/order_workflow.py` — a workflow that hosts a
`WorkflowStream` and publishes status events as it processes an order.
* `activities/payment_activity.py` — an activity that publishes
intermediate progress to the stream via
`WorkflowStreamClient.from_within_activity()`.
* `run_publisher.py` — starts the workflow, subscribes to both topics,
decodes each by `item.topic`, and prints events as they arrive.

**Scenario 2 — reconnecting subscriber:**

* `workflows/pipeline_workflow.py` — a multi-stage pipeline that
publishes stage transitions over ~10 seconds, leaving room for a
consumer to disconnect and reconnect mid-run.
* `run_reconnecting_subscriber.py` — connects, reads a couple of
events, persists `item.offset + 1` to disk, "disconnects," then
reopens a fresh client and resumes via `subscribe(from_offset=...)`.
This is the central Workflow Streams use case: a consumer can
disappear (page refresh, server restart, laptop closed) and resume
later without missing events or seeing duplicates.

**Scenario 3 — external (non-Activity) publisher:**

* `workflows/hub_workflow.py` — a passive workflow that does no work
of its own; it exists only to host a `WorkflowStream` and shut down
when signaled.
* `run_external_publisher.py` — starts the hub, then publishes events
into it from a plain Python coroutine using
`WorkflowStreamClient.create(client, workflow_id)`. A subscriber
task runs alongside; when the publisher is done it emits an in-band
sentinel headline (`__done__`) into the stream, then signals
`HubWorkflow.close`. The subscriber breaks on the sentinel and
exits its `async for`. This is the shape that fits a backend
service or scheduled job pushing events into a workflow it didn't
itself start.

**Scenario 4 — bounded log via `truncate()`:**

* `workflows/ticker_workflow.py` — a long-running workflow that
publishes events at a fixed cadence and calls
`self.stream.truncate(...)` periodically to bound log growth, keeping
only the most recent N entries.
* `run_truncating_ticker.py` — runs a fast subscriber and a slow
subscriber side by side. The fast one keeps up and sees every offset
in order; the slow one sleeps between iterations, falls behind a
truncation, and silently jumps forward to the new base offset. The
output makes the trade visible: bounded log size in exchange for
intermediate events being invisible to slow consumers.

`run_worker.py` registers all four workflows and the activity.

## Ending the stream

`WorkflowStreamClient.subscribe()` is a long-poll loop — it does not
exit on its own when the host workflow completes. Two things have to
happen at the end of a streamed workflow for clean shutdown:

1. **An in-band terminator that subscribers recognize.** Each scenario
here sends one before the workflow exits:
- `OrderWorkflow` and `PipelineWorkflow` publish a "complete"
status / stage event; consumers break on it.
- `run_external_publisher.py` publishes a sentinel
`NewsEvent(headline="__done__")` immediately before signaling
`HubWorkflow.close`; the consumer breaks on the sentinel.
- `TickerWorkflow`'s final tick (`n == count - 1`) is the
terminator; subscribers break when they see it. `keep_last`
guarantees that final offset survives the last truncation, so
even slow consumers reach it.

2. **A short hold-open in the workflow before returning** so that the
final publish gets fetched. Items published in the same workflow
task that returns from `@workflow.run` are abandoned: the
in-memory log dies with the workflow, and the next subscriber
poll lands on a completed workflow. Each workflow here ends with

```python
await workflow.sleep(timedelta(milliseconds=500))
return ...
```

which gives subscribers in their `poll_cooldown` interval time to
issue one more poll. With both pieces in place, subscribers
receive the terminator, break out of their `async for`, and stop
polling — by the time the workflow exits there are no in-flight
poll handlers, so the SDK does not warn about unfinished
handlers.

## Run it

```bash
# Terminal 1: worker
uv run workflow_streams/run_worker.py

# Terminal 2: pick a scenario
uv run workflow_streams/run_publisher.py
# or
uv run workflow_streams/run_reconnecting_subscriber.py
# or
uv run workflow_streams/run_external_publisher.py
# or
uv run workflow_streams/run_truncating_ticker.py
```

Expected output on the basic publisher side:

```
[status] received: order=order-1
[progress] charging card...
[progress] card charged
[status] shipped: order=order-1
[progress] charge id: charge-order-1
[status] complete: order=order-1
workflow result: charge-order-1
```

Expected output on the reconnecting subscriber side (note the offsets
are continuous across the disconnect — no events lost, none duplicated):

```
[phase 1] connecting and reading first few events
offset= 0 stage=validating
offset= 1 stage=loading data
[phase 1] persisted resume offset=2 -> /tmp/...; disconnecting

[phase 2] reconnecting and resuming from persisted offset
offset= 2 stage=transforming
offset= 3 stage=writing output
offset= 4 stage=verifying
offset= 5 stage=complete

workflow result: pipeline workflow-stream-pipeline-... done
```

## Notes

* **Subscriber start position.** `subscribe(...)` without `from_offset`
starts at the stream's current base offset and follows live — older
events that have been truncated, or that arrived before the
subscribe call, are not replayed. Pass `from_offset=N` to resume
from a known position (see `run_reconnecting_subscriber.py`); the
iterator skips forward to the current base if `N` has been
truncated.
* **Continue-as-new.** Every `*Input` dataclass carries
`stream_state: WorkflowStreamState | None = None`. To survive
continue-as-new without losing buffered items, capture the workflow's
stream state and pass it to the next run via
`WorkflowStream(prior_state=...)` in `@workflow.init`. The samples
declare the field for completeness; none of them actually trigger
continue-as-new.
Empty file added workflow_streams/__init__.py
Empty file.
Empty file.
31 changes: 31 additions & 0 deletions workflow_streams/activities/payment_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import annotations

import asyncio
from datetime import timedelta

from temporalio import activity
from temporalio.contrib.workflow_streams import WorkflowStreamClient

from workflow_streams.shared import TOPIC_PROGRESS, ProgressEvent


@activity.defn
async def charge_card(order_id: str) -> str:
"""Pretend to charge a card, publishing progress to the parent workflow.

`WorkflowStreamClient.from_within_activity()` reads the parent
workflow id and the Temporal client from the activity context, so
this activity can push events back without any wiring.
"""
client = WorkflowStreamClient.from_within_activity(
batch_interval=timedelta(milliseconds=200)
)
async with client:
progress = client.topic(TOPIC_PROGRESS, type=ProgressEvent)
progress.publish(ProgressEvent(message="charging card..."))
await asyncio.sleep(1.0)
progress.publish(
ProgressEvent(message="card charged"),
force_flush=True,
)
return f"charge-{order_id}"
102 changes: 102 additions & 0 deletions workflow_streams/run_external_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""External publisher: a non-Activity process pushes events into a workflow.

The two earlier scenarios publish from inside the workflow itself
(``OrderWorkflow``, ``PipelineWorkflow``) or from an Activity it runs
(``charge_card``). This scenario shows the third shape: a backend
service, scheduled job, or anything else with a Temporal ``Client``
publishing into a *running* workflow it didn't start. Same factory as
the subscribe path — :py:meth:`WorkflowStreamClient.create` — used for
publishing instead.

The script starts a ``HubWorkflow`` (which does no work of its own —
it exists only to host the stream), then runs a publisher and a
subscriber concurrently. When the publisher is done it signals
``HubWorkflow.close``, the workflow's run finishes, and the
subscriber's iterator exits normally.

Run the worker first (``uv run workflow_streams/run_worker.py``), then::

uv run workflow_streams/run_external_publisher.py
"""

from __future__ import annotations

import asyncio
import uuid

from temporalio.client import Client
from temporalio.contrib.workflow_streams import WorkflowStreamClient

from workflow_streams.shared import (
TASK_QUEUE,
TOPIC_NEWS,
HubInput,
NewsEvent,
)
from workflow_streams.workflows.hub_workflow import HubWorkflow


HEADLINES = [
"rates held",
"merger announced",
"outage resolved",
"earnings beat",
"regulator opens probe",
]

# In-band terminator the publisher emits before signaling close. The
# subscriber recognizes this value and stops polling — without an
# explicit terminator the consumer would have to rely on the workflow
# returning to break the iterator, which means racing the last item
# delivery against workflow completion.
DONE_HEADLINE = "__done__"


async def main() -> None:
client = await Client.connect("localhost:7233")

workflow_id = f"workflow-stream-hub-{uuid.uuid4().hex[:8]}"
handle = await client.start_workflow(
HubWorkflow.run,
HubInput(hub_id=workflow_id),
id=workflow_id,
task_queue=TASK_QUEUE,
)

async def publish_news() -> None:
# WorkflowStreamClient.create takes a Temporal client and a
# workflow id — the same factory used elsewhere for subscribing.
# The async context manager batches publishes and flushes on
# exit; we additionally call flush() before signaling close so
# we know the events landed before the workflow shuts down.
producer = WorkflowStreamClient.create(client, workflow_id)
async with producer:
news = producer.topic(TOPIC_NEWS, type=NewsEvent)
for headline in HEADLINES:
news.publish(NewsEvent(headline=headline))
print(f"[publisher] sent: {headline}")
await asyncio.sleep(0.5)
news.publish(NewsEvent(headline=DONE_HEADLINE), force_flush=True)
await producer.flush()
# Tell the hub it can stop. The subscriber has already broken
# out of its async-for loop on the sentinel above.
await handle.signal(HubWorkflow.close)
print("[publisher] signaled close")

async def consume_news() -> None:
consumer = WorkflowStreamClient.create(client, workflow_id)
async for item in consumer.subscribe(
[TOPIC_NEWS], result_type=NewsEvent
):
if item.data.headline == DONE_HEADLINE:
return
print(f"[subscriber] offset={item.offset}: {item.data.headline}")

await asyncio.gather(publish_news(), consume_news())

result = await handle.result()
print(f"\nworkflow result: {result}")


if __name__ == "__main__":
asyncio.run(main())
58 changes: 58 additions & 0 deletions workflow_streams/run_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from __future__ import annotations

import asyncio
import uuid

from temporalio.client import Client
from temporalio.common import RawValue
from temporalio.contrib.workflow_streams import WorkflowStreamClient

from workflow_streams.shared import (
TASK_QUEUE,
TOPIC_PROGRESS,
TOPIC_STATUS,
OrderInput,
ProgressEvent,
StatusEvent,
race_with_workflow,
)
from workflow_streams.workflows.order_workflow import OrderWorkflow


async def main() -> None:
client = await Client.connect("localhost:7233")

workflow_id = f"workflow-stream-order-{uuid.uuid4().hex[:8]}"
handle = await client.start_workflow(
OrderWorkflow.run,
OrderInput(order_id="order-1"),
id=workflow_id,
task_queue=TASK_QUEUE,
)

stream = WorkflowStreamClient.create(client, workflow_id)
converter = client.data_converter.payload_converter

async def consume() -> None:
# Single iterator over both topics — avoids a cancellation race
# between two concurrent subscribers. result_type=RawValue
# delivers the underlying Payload so we can dispatch
# heterogeneous events on item.topic.
async for item in stream.subscribe(
[TOPIC_STATUS, TOPIC_PROGRESS], result_type=RawValue
):
if item.topic == TOPIC_STATUS:
evt = converter.from_payload(item.data.payload, StatusEvent)
print(f"[status] {evt.kind}: order={evt.order_id}")
if evt.kind == "complete":
return
elif item.topic == TOPIC_PROGRESS:
progress = converter.from_payload(item.data.payload, ProgressEvent)
print(f"[progress] {progress.message}")

result = await race_with_workflow(consume(), handle)
print(f"workflow result: {result}")


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading