Skip to content
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [prometheus](prometheus) - Configure Prometheus metrics on clients/workers.
* [workflow_streams](workflow_streams) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_streams`. **Experimental — requires the [`contrib/pubsub` branch](https://github.com/temporalio/sdk-python/tree/contrib/pubsub) of sdk-python.**
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
* [sentry](sentry) - Report errors to Sentry.
Expand Down
170 changes: 170 additions & 0 deletions workflow_streams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# Workflow Streams

> **Experimental.** These samples target the
> `temporalio.contrib.workflow_streams` module on the
> [`contrib/pubsub` branch of sdk-python][branch], which is not yet
> released. To run them locally, install sdk-python from that branch
> (e.g. `uv pip install -e <path-to-sdk-python>` after checking out the
> branch).

[branch]: https://github.com/temporalio/sdk-python/tree/contrib/pubsub

`temporalio.contrib.workflow_streams` lets a workflow host a durable,
offset-addressed event channel. The workflow holds an append-only log;
external clients (activities, starters, BFFs) publish to topics via
signals and subscribe via long-poll updates. This packages the
boilerplate — batching, offset tracking, topic filtering, continue-as-new
hand-off — into a reusable stream.

This directory has four scenarios sharing one Worker.

**Scenario 1 — basic publish/subscribe with heterogeneous topics:**

* `workflows/order_workflow.py` — a workflow that hosts a
`WorkflowStream` and publishes status events as it processes an order.
* `activities/payment_activity.py` — an activity that publishes
intermediate progress to the stream via
`WorkflowStreamClient.from_within_activity()`.
* `run_publisher.py` — starts the workflow, subscribes to both topics,
decodes each by `item.topic`, and prints events as they arrive.

**Scenario 2 — reconnecting subscriber:**

* `workflows/pipeline_workflow.py` — a multi-stage pipeline that
publishes stage transitions over ~10 seconds, leaving room for a
consumer to disconnect and reconnect mid-run.
* `run_reconnecting_subscriber.py` — connects, reads a couple of
events, persists `item.offset + 1` to disk, "disconnects," then
reopens a fresh client and resumes via `subscribe(from_offset=...)`.
This is the central Workflow Streams use case: a consumer can
disappear (page refresh, server restart, laptop closed) and resume
later without missing events or seeing duplicates.

**Scenario 3 — external (non-Activity) publisher:**

* `workflows/hub_workflow.py` — a passive workflow that does no work
of its own; it exists only to host a `WorkflowStream` and shut down
when signaled.
* `run_external_publisher.py` — starts the hub, then publishes events
into it from a plain Python coroutine using
`WorkflowStreamClient.create(client, workflow_id)`. A subscriber
task runs alongside; when the publisher is done it emits an in-band
sentinel headline (`__done__`) into the stream, then signals
`HubWorkflow.close`. The subscriber breaks on the sentinel and
exits its `async for`. This is the shape that fits a backend
service or scheduled job pushing events into a workflow it didn't
itself start.

**Scenario 4 — bounded log via `truncate()`:**

* `workflows/ticker_workflow.py` — a long-running workflow that
publishes events at a fixed cadence and calls
`self.stream.truncate(...)` periodically to bound log growth, keeping
only the most recent N entries.
* `run_truncating_ticker.py` — runs a fast subscriber and a slow
subscriber side by side. The fast one keeps up and sees every offset
in order; the slow one sleeps between iterations, falls behind a
truncation, and silently jumps forward to the new base offset. The
output makes the trade visible: bounded log size in exchange for
intermediate events being invisible to slow consumers.

`run_worker.py` registers all four workflows and the activity.

## Ending the stream

`WorkflowStreamClient.subscribe()` is a long-poll loop — it does not
exit on its own when the host workflow completes. Two things have to
happen at the end of a streamed workflow for clean shutdown:

1. **An in-band terminator that subscribers recognize.** Each scenario
here sends one before the workflow exits:
- `OrderWorkflow` and `PipelineWorkflow` publish a "complete"
status / stage event; consumers break on it.
- `run_external_publisher.py` publishes a sentinel
`NewsEvent(headline="__done__")` immediately before signaling
`HubWorkflow.close`; the consumer breaks on the sentinel.
- `TickerWorkflow`'s final tick (`n == count - 1`) is the
terminator; subscribers break when they see it. `keep_last`
guarantees that final offset survives the last truncation, so
even slow consumers reach it.

2. **A short hold-open in the workflow before returning** so that the
final publish gets fetched. Items published in the same workflow
task that returns from `@workflow.run` are abandoned: the
in-memory log dies with the workflow, and the next subscriber
poll lands on a completed workflow. Each workflow here ends with

```python
await workflow.sleep(timedelta(milliseconds=500))
return ...
```

which gives subscribers in their `poll_cooldown` interval time to
issue one more poll. With both pieces in place, subscribers
receive the terminator, break out of their `async for`, and stop
polling — by the time the workflow exits there are no in-flight
poll handlers, so the SDK does not warn about unfinished
handlers.

## Run it

```bash
# Terminal 1: worker
uv run workflow_streams/run_worker.py

# Terminal 2: pick a scenario
uv run workflow_streams/run_publisher.py
# or
uv run workflow_streams/run_reconnecting_subscriber.py
# or
uv run workflow_streams/run_external_publisher.py
# or
uv run workflow_streams/run_truncating_ticker.py
```

Expected output on the basic publisher side:

```
[status] received: order=order-1
[progress] charging card...
[progress] card charged
[status] shipped: order=order-1
[progress] charge id: charge-order-1
[status] complete: order=order-1
workflow result: charge-order-1
```

Expected output on the reconnecting subscriber side (note the offsets
are continuous across the disconnect — no events lost, none duplicated):

```
[phase 1] connecting and reading first few events
offset= 0 stage=validating
offset= 1 stage=loading data
[phase 1] persisted resume offset=2 -> /tmp/...; disconnecting

[phase 2] reconnecting and resuming from persisted offset
offset= 2 stage=transforming
offset= 3 stage=writing output
offset= 4 stage=verifying
offset= 5 stage=complete

workflow result: pipeline workflow-stream-pipeline-... done
```

## Notes

* **Subscriber start position.** `subscribe(...)` without `from_offset`
starts at the stream's current base offset and follows live — older
events that have been truncated, or that arrived before the
subscribe call, are not replayed. Pass `from_offset=N` to resume
from a known position (see `run_reconnecting_subscriber.py`); the
iterator skips forward to the current base if `N` has been
truncated.
* **Continue-as-new.** Every `*Input` dataclass carries
`stream_state: WorkflowStreamState | None = None`. To survive
continue-as-new without losing buffered items, capture the workflow's
stream state and pass it to the next run via
`WorkflowStream(prior_state=...)` in `@workflow.init`. The samples
declare the field for completeness; none of them actually trigger
continue-as-new.
Empty file added workflow_streams/__init__.py
Empty file.
Empty file.
31 changes: 31 additions & 0 deletions workflow_streams/activities/payment_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import annotations

import asyncio
from datetime import timedelta

from temporalio import activity
from temporalio.contrib.workflow_streams import WorkflowStreamClient

from workflow_streams.shared import TOPIC_PROGRESS, ProgressEvent


@activity.defn
async def charge_card(order_id: str) -> str:
"""Pretend to charge a card, publishing progress to the parent workflow.

`WorkflowStreamClient.from_within_activity()` reads the parent
workflow id and the Temporal client from the activity context, so
this activity can push events back without any wiring.
"""
client = WorkflowStreamClient.from_within_activity(
batch_interval=timedelta(milliseconds=200)
)
async with client:
progress = client.topic(TOPIC_PROGRESS, type=ProgressEvent)
progress.publish(ProgressEvent(message="charging card..."))
await asyncio.sleep(1.0)
progress.publish(
ProgressEvent(message="card charged"),
force_flush=True,
)
return f"charge-{order_id}"
102 changes: 102 additions & 0 deletions workflow_streams/run_external_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""External publisher: a non-Activity process pushes events into a workflow.

The two earlier scenarios publish from inside the workflow itself
(``OrderWorkflow``, ``PipelineWorkflow``) or from an Activity it runs
(``charge_card``). This scenario shows the third shape: a backend
service, scheduled job, or anything else with a Temporal ``Client``
publishing into a *running* workflow it didn't start. Same factory as
the subscribe path — :py:meth:`WorkflowStreamClient.create` — used for
publishing instead.

The script starts a ``HubWorkflow`` (which does no work of its own —
it exists only to host the stream), then runs a publisher and a
subscriber concurrently. When the publisher is done it signals
``HubWorkflow.close``, the workflow's run finishes, and the
subscriber's iterator exits normally.

Run the worker first (``uv run workflow_streams/run_worker.py``), then::

uv run workflow_streams/run_external_publisher.py
"""

from __future__ import annotations

import asyncio
import uuid

from temporalio.client import Client
from temporalio.contrib.workflow_streams import WorkflowStreamClient

from workflow_streams.shared import (
TASK_QUEUE,
TOPIC_NEWS,
HubInput,
NewsEvent,
)
from workflow_streams.workflows.hub_workflow import HubWorkflow


HEADLINES = [
"rates held",
"merger announced",
"outage resolved",
"earnings beat",
"regulator opens probe",
]

# In-band terminator the publisher emits before signaling close. The
# subscriber recognizes this value and stops polling — without an
# explicit terminator the consumer would have to rely on the workflow
# returning to break the iterator, which means racing the last item
# delivery against workflow completion.
DONE_HEADLINE = "__done__"


async def main() -> None:
client = await Client.connect("localhost:7233")

workflow_id = f"workflow-stream-hub-{uuid.uuid4().hex[:8]}"
handle = await client.start_workflow(
HubWorkflow.run,
HubInput(hub_id=workflow_id),
id=workflow_id,
task_queue=TASK_QUEUE,
)

async def publish_news() -> None:
# WorkflowStreamClient.create takes a Temporal client and a
# workflow id — the same factory used elsewhere for subscribing.
# The async context manager batches publishes and flushes on
# exit; we additionally call flush() before signaling close so
# we know the events landed before the workflow shuts down.
producer = WorkflowStreamClient.create(client, workflow_id)
async with producer:
news = producer.topic(TOPIC_NEWS, type=NewsEvent)
for headline in HEADLINES:
news.publish(NewsEvent(headline=headline))
print(f"[publisher] sent: {headline}")
await asyncio.sleep(0.5)
news.publish(NewsEvent(headline=DONE_HEADLINE), force_flush=True)
await producer.flush()
# Tell the hub it can stop. The subscriber has already broken
# out of its async-for loop on the sentinel above.
await handle.signal(HubWorkflow.close)
print("[publisher] signaled close")

async def consume_news() -> None:
consumer = WorkflowStreamClient.create(client, workflow_id)
async for item in consumer.subscribe(
[TOPIC_NEWS], result_type=NewsEvent
):
if item.data.headline == DONE_HEADLINE:
return
print(f"[subscriber] offset={item.offset}: {item.data.headline}")

await asyncio.gather(publish_news(), consume_news())

result = await handle.result()
print(f"\nworkflow result: {result}")


if __name__ == "__main__":
asyncio.run(main())
58 changes: 58 additions & 0 deletions workflow_streams/run_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from __future__ import annotations

import asyncio
import uuid

from temporalio.client import Client
from temporalio.common import RawValue
from temporalio.contrib.workflow_streams import WorkflowStreamClient

from workflow_streams.shared import (
TASK_QUEUE,
TOPIC_PROGRESS,
TOPIC_STATUS,
OrderInput,
ProgressEvent,
StatusEvent,
race_with_workflow,
)
from workflow_streams.workflows.order_workflow import OrderWorkflow


async def main() -> None:
client = await Client.connect("localhost:7233")

workflow_id = f"workflow-stream-order-{uuid.uuid4().hex[:8]}"
handle = await client.start_workflow(
OrderWorkflow.run,
OrderInput(order_id="order-1"),
id=workflow_id,
task_queue=TASK_QUEUE,
)

stream = WorkflowStreamClient.create(client, workflow_id)
converter = client.data_converter.payload_converter

async def consume() -> None:
# Single iterator over both topics — avoids a cancellation race
# between two concurrent subscribers. result_type=RawValue
# delivers the underlying Payload so we can dispatch
# heterogeneous events on item.topic.
async for item in stream.subscribe(
[TOPIC_STATUS, TOPIC_PROGRESS], result_type=RawValue
):
if item.topic == TOPIC_STATUS:
evt = converter.from_payload(item.data.payload, StatusEvent)
print(f"[status] {evt.kind}: order={evt.order_id}")
if evt.kind == "complete":
return
elif item.topic == TOPIC_PROGRESS:
progress = converter.from_payload(item.data.payload, ProgressEvent)
print(f"[progress] {progress.message}")

result = await race_with_workflow(consume(), handle)
print(f"workflow result: {result}")


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading