diff --git a/README.md b/README.md index d4d6a61b..80cda649 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`. * [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. * [prometheus](prometheus) - Configure Prometheus metrics on clients/workers. +* [workflow_streams](workflow_streams) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_streams`. **Experimental — requires the [`contrib/pubsub` branch](https://github.com/temporalio/sdk-python/tree/contrib/pubsub) of sdk-python.** * [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models. * [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule. * [sentry](sentry) - Report errors to Sentry. diff --git a/workflow_streams/README.md b/workflow_streams/README.md new file mode 100644 index 00000000..bf2466fb --- /dev/null +++ b/workflow_streams/README.md @@ -0,0 +1,170 @@ +# Workflow Streams + +> **Experimental.** These samples target the +> `temporalio.contrib.workflow_streams` module on the +> [`contrib/pubsub` branch of sdk-python][branch], which is not yet +> released. To run them locally, install sdk-python from that branch +> (e.g. `uv pip install -e ` after checking out the +> branch). + +[branch]: https://github.com/temporalio/sdk-python/tree/contrib/pubsub + +`temporalio.contrib.workflow_streams` lets a workflow host a durable, +offset-addressed event channel. The workflow holds an append-only log; +external clients (activities, starters, BFFs) publish to topics via +signals and subscribe via long-poll updates. This packages the +boilerplate — batching, offset tracking, topic filtering, continue-as-new +hand-off — into a reusable stream. + +This directory has four scenarios sharing one Worker. + +**Scenario 1 — basic publish/subscribe with heterogeneous topics:** + +* `workflows/order_workflow.py` — a workflow that hosts a + `WorkflowStream` and publishes status events as it processes an order. +* `activities/payment_activity.py` — an activity that publishes + intermediate progress to the stream via + `WorkflowStreamClient.from_within_activity()`. +* `run_publisher.py` — starts the workflow, subscribes to both topics, + decodes each by `item.topic`, and prints events as they arrive. + +**Scenario 2 — reconnecting subscriber:** + +* `workflows/pipeline_workflow.py` — a multi-stage pipeline that + publishes stage transitions over ~10 seconds, leaving room for a + consumer to disconnect and reconnect mid-run. +* `run_reconnecting_subscriber.py` — connects, reads a couple of + events, persists `item.offset + 1` to disk, "disconnects," then + reopens a fresh client and resumes via `subscribe(from_offset=...)`. + This is the central Workflow Streams use case: a consumer can + disappear (page refresh, server restart, laptop closed) and resume + later without missing events or seeing duplicates. + +**Scenario 3 — external (non-Activity) publisher:** + +* `workflows/hub_workflow.py` — a passive workflow that does no work + of its own; it exists only to host a `WorkflowStream` and shut down + when signaled. +* `run_external_publisher.py` — starts the hub, then publishes events + into it from a plain Python coroutine using + `WorkflowStreamClient.create(client, workflow_id)`. A subscriber + task runs alongside; when the publisher is done it emits an in-band + sentinel headline (`__done__`) into the stream, then signals + `HubWorkflow.close`. The subscriber breaks on the sentinel and + exits its `async for`. This is the shape that fits a backend + service or scheduled job pushing events into a workflow it didn't + itself start. + +**Scenario 4 — bounded log via `truncate()`:** + +* `workflows/ticker_workflow.py` — a long-running workflow that + publishes events at a fixed cadence and calls + `self.stream.truncate(...)` periodically to bound log growth, keeping + only the most recent N entries. +* `run_truncating_ticker.py` — runs a fast subscriber and a slow + subscriber side by side. The fast one keeps up and sees every offset + in order; the slow one sleeps between iterations, falls behind a + truncation, and silently jumps forward to the new base offset. The + output makes the trade visible: bounded log size in exchange for + intermediate events being invisible to slow consumers. + +`run_worker.py` registers all four workflows and the activity. + +## Ending the stream + +`WorkflowStreamClient.subscribe()` is a long-poll loop — it does not +exit on its own when the host workflow completes. Two things have to +happen at the end of a streamed workflow for clean shutdown: + +1. **An in-band terminator that subscribers recognize.** Each scenario + here sends one before the workflow exits: + - `OrderWorkflow` and `PipelineWorkflow` publish a "complete" + status / stage event; consumers break on it. + - `run_external_publisher.py` publishes a sentinel + `NewsEvent(headline="__done__")` immediately before signaling + `HubWorkflow.close`; the consumer breaks on the sentinel. + - `TickerWorkflow`'s final tick (`n == count - 1`) is the + terminator; subscribers break when they see it. `keep_last` + guarantees that final offset survives the last truncation, so + even slow consumers reach it. + +2. **A short hold-open in the workflow before returning** so that the + final publish gets fetched. Items published in the same workflow + task that returns from `@workflow.run` are abandoned: the + in-memory log dies with the workflow, and the next subscriber + poll lands on a completed workflow. Each workflow here ends with + + ```python + await workflow.sleep(timedelta(milliseconds=500)) + return ... + ``` + + which gives subscribers in their `poll_cooldown` interval time to + issue one more poll. With both pieces in place, subscribers + receive the terminator, break out of their `async for`, and stop + polling — by the time the workflow exits there are no in-flight + poll handlers, so the SDK does not warn about unfinished + handlers. + +## Run it + +```bash +# Terminal 1: worker +uv run workflow_streams/run_worker.py + +# Terminal 2: pick a scenario +uv run workflow_streams/run_publisher.py +# or +uv run workflow_streams/run_reconnecting_subscriber.py +# or +uv run workflow_streams/run_external_publisher.py +# or +uv run workflow_streams/run_truncating_ticker.py +``` + +Expected output on the basic publisher side: + +``` +[status] received: order=order-1 +[progress] charging card... +[progress] card charged +[status] shipped: order=order-1 +[progress] charge id: charge-order-1 +[status] complete: order=order-1 +workflow result: charge-order-1 +``` + +Expected output on the reconnecting subscriber side (note the offsets +are continuous across the disconnect — no events lost, none duplicated): + +``` +[phase 1] connecting and reading first few events + offset= 0 stage=validating + offset= 1 stage=loading data +[phase 1] persisted resume offset=2 -> /tmp/...; disconnecting + +[phase 2] reconnecting and resuming from persisted offset + offset= 2 stage=transforming + offset= 3 stage=writing output + offset= 4 stage=verifying + offset= 5 stage=complete + +workflow result: pipeline workflow-stream-pipeline-... done +``` + +## Notes + +* **Subscriber start position.** `subscribe(...)` without `from_offset` + starts at the stream's current base offset and follows live — older + events that have been truncated, or that arrived before the + subscribe call, are not replayed. Pass `from_offset=N` to resume + from a known position (see `run_reconnecting_subscriber.py`); the + iterator skips forward to the current base if `N` has been + truncated. +* **Continue-as-new.** Every `*Input` dataclass carries + `stream_state: WorkflowStreamState | None = None`. To survive + continue-as-new without losing buffered items, capture the workflow's + stream state and pass it to the next run via + `WorkflowStream(prior_state=...)` in `@workflow.init`. The samples + declare the field for completeness; none of them actually trigger + continue-as-new. diff --git a/workflow_streams/__init__.py b/workflow_streams/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_streams/activities/__init__.py b/workflow_streams/activities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_streams/activities/payment_activity.py b/workflow_streams/activities/payment_activity.py new file mode 100644 index 00000000..2ccd708b --- /dev/null +++ b/workflow_streams/activities/payment_activity.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +import asyncio +from datetime import timedelta + +from temporalio import activity +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import TOPIC_PROGRESS, ProgressEvent + + +@activity.defn +async def charge_card(order_id: str) -> str: + """Pretend to charge a card, publishing progress to the parent workflow. + + `WorkflowStreamClient.from_within_activity()` reads the parent + workflow id and the Temporal client from the activity context, so + this activity can push events back without any wiring. + """ + client = WorkflowStreamClient.from_within_activity( + batch_interval=timedelta(milliseconds=200) + ) + async with client: + progress = client.topic(TOPIC_PROGRESS, type=ProgressEvent) + progress.publish(ProgressEvent(message="charging card...")) + await asyncio.sleep(1.0) + progress.publish( + ProgressEvent(message="card charged"), + force_flush=True, + ) + return f"charge-{order_id}" diff --git a/workflow_streams/run_external_publisher.py b/workflow_streams/run_external_publisher.py new file mode 100644 index 00000000..bf7d98e6 --- /dev/null +++ b/workflow_streams/run_external_publisher.py @@ -0,0 +1,102 @@ +"""External publisher: a non-Activity process pushes events into a workflow. + +The two earlier scenarios publish from inside the workflow itself +(``OrderWorkflow``, ``PipelineWorkflow``) or from an Activity it runs +(``charge_card``). This scenario shows the third shape: a backend +service, scheduled job, or anything else with a Temporal ``Client`` +publishing into a *running* workflow it didn't start. Same factory as +the subscribe path — :py:meth:`WorkflowStreamClient.create` — used for +publishing instead. + +The script starts a ``HubWorkflow`` (which does no work of its own — +it exists only to host the stream), then runs a publisher and a +subscriber concurrently. When the publisher is done it signals +``HubWorkflow.close``, the workflow's run finishes, and the +subscriber's iterator exits normally. + +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: + + uv run workflow_streams/run_external_publisher.py +""" + +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import ( + TASK_QUEUE, + TOPIC_NEWS, + HubInput, + NewsEvent, +) +from workflow_streams.workflows.hub_workflow import HubWorkflow + + +HEADLINES = [ + "rates held", + "merger announced", + "outage resolved", + "earnings beat", + "regulator opens probe", +] + +# In-band terminator the publisher emits before signaling close. The +# subscriber recognizes this value and stops polling — without an +# explicit terminator the consumer would have to rely on the workflow +# returning to break the iterator, which means racing the last item +# delivery against workflow completion. +DONE_HEADLINE = "__done__" + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-hub-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + HubWorkflow.run, + HubInput(hub_id=workflow_id), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + async def publish_news() -> None: + # WorkflowStreamClient.create takes a Temporal client and a + # workflow id — the same factory used elsewhere for subscribing. + # The async context manager batches publishes and flushes on + # exit; we additionally call flush() before signaling close so + # we know the events landed before the workflow shuts down. + producer = WorkflowStreamClient.create(client, workflow_id) + async with producer: + news = producer.topic(TOPIC_NEWS, type=NewsEvent) + for headline in HEADLINES: + news.publish(NewsEvent(headline=headline)) + print(f"[publisher] sent: {headline}") + await asyncio.sleep(0.5) + news.publish(NewsEvent(headline=DONE_HEADLINE), force_flush=True) + await producer.flush() + # Tell the hub it can stop. The subscriber has already broken + # out of its async-for loop on the sentinel above. + await handle.signal(HubWorkflow.close) + print("[publisher] signaled close") + + async def consume_news() -> None: + consumer = WorkflowStreamClient.create(client, workflow_id) + async for item in consumer.subscribe( + [TOPIC_NEWS], result_type=NewsEvent + ): + if item.data.headline == DONE_HEADLINE: + return + print(f"[subscriber] offset={item.offset}: {item.data.headline}") + + await asyncio.gather(publish_news(), consume_news()) + + result = await handle.result() + print(f"\nworkflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_publisher.py b/workflow_streams/run_publisher.py new file mode 100644 index 00000000..2e5ddb8d --- /dev/null +++ b/workflow_streams/run_publisher.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.common import RawValue +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import ( + TASK_QUEUE, + TOPIC_PROGRESS, + TOPIC_STATUS, + OrderInput, + ProgressEvent, + StatusEvent, + race_with_workflow, +) +from workflow_streams.workflows.order_workflow import OrderWorkflow + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-order-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + OrderWorkflow.run, + OrderInput(order_id="order-1"), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + stream = WorkflowStreamClient.create(client, workflow_id) + converter = client.data_converter.payload_converter + + async def consume() -> None: + # Single iterator over both topics — avoids a cancellation race + # between two concurrent subscribers. result_type=RawValue + # delivers the underlying Payload so we can dispatch + # heterogeneous events on item.topic. + async for item in stream.subscribe( + [TOPIC_STATUS, TOPIC_PROGRESS], result_type=RawValue + ): + if item.topic == TOPIC_STATUS: + evt = converter.from_payload(item.data.payload, StatusEvent) + print(f"[status] {evt.kind}: order={evt.order_id}") + if evt.kind == "complete": + return + elif item.topic == TOPIC_PROGRESS: + progress = converter.from_payload(item.data.payload, ProgressEvent) + print(f"[progress] {progress.message}") + + result = await race_with_workflow(consume(), handle) + print(f"workflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_reconnecting_subscriber.py b/workflow_streams/run_reconnecting_subscriber.py new file mode 100644 index 00000000..3aae76c6 --- /dev/null +++ b/workflow_streams/run_reconnecting_subscriber.py @@ -0,0 +1,107 @@ +"""Reconnecting subscriber: persist offset, disconnect, resume. + +Demonstrates the central Workflow Streams use case: a consumer can +disappear mid-stream — page refresh, server restart, laptop closed — +and resume later without missing events or seeing duplicates. The +event log lives in the Workflow, so the consumer just remembers where +it stopped. + +The script runs the pattern in two phases inside one process to keep +the demo short. The same code shape works across actual process +restarts because the resume offset is persisted to disk between phases. + +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: + + uv run workflow_streams/run_reconnecting_subscriber.py +""" + +from __future__ import annotations + +import asyncio +import tempfile +import uuid +from pathlib import Path + +from temporalio.client import Client +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import ( + TASK_QUEUE, + TOPIC_STATUS, + PipelineInput, + StageEvent, +) +from workflow_streams.workflows.pipeline_workflow import PipelineWorkflow + +# Number of events read in phase 1 before simulating a disconnect. +# Picked small enough that the workflow is still running after. +PHASE_1_EVENTS = 2 + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-pipeline-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + PipelineWorkflow.run, + PipelineInput(pipeline_id=workflow_id), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + # Where the consumer remembers its position. In a real BFF or UI + # backend this would be a database row keyed by (user_id, run_id); + # a temp file keeps the sample self-contained. + offset_path = Path(tempfile.gettempdir()) / f"{workflow_id}.offset" + + # ---- Phase 1: connect, read a couple of events, persist offset, disconnect. + print("[phase 1] connecting and reading first few events") + stream = WorkflowStreamClient.create(client, workflow_id) + seen = 0 + next_offset = 0 + async for item in stream.subscribe([TOPIC_STATUS], result_type=StageEvent): + print(f" offset={item.offset:2d} stage={item.data.stage}") + # Persist *one past* the offset just consumed. On resume we want + # the *next* unseen event, not the one we already showed. + next_offset = item.offset + 1 + offset_path.write_text(str(next_offset)) + seen += 1 + if seen >= PHASE_1_EVENTS: + break + + print( + f"[phase 1] persisted resume offset={next_offset} -> {offset_path}; disconnecting\n" + ) + # The async for loop exits the subscribe() iterator. Any background + # poll Update is cancelled. The workflow keeps running in the + # background, accumulating events into its log. + await asyncio.sleep(3) # let the workflow publish more in our absence + + # ---- Phase 2: reconnect, read persisted offset, resume from there. + print("[phase 2] reconnecting and resuming from persisted offset") + resume_from = int(offset_path.read_text()) + # A brand-new client and stream object — same shape as a different + # process picking up where the first one left off. + client2 = await Client.connect("localhost:7233") + stream2 = WorkflowStreamClient.create(client2, workflow_id) + async for item in stream2.subscribe( + [TOPIC_STATUS], + from_offset=resume_from, + result_type=StageEvent, + ): + print(f" offset={item.offset:2d} stage={item.data.stage}") + # Continue persisting after each event so a second crash here + # would also resume cleanly. + offset_path.write_text(str(item.offset + 1)) + if item.data.stage == "complete": + break + + result = await handle.result() + print(f"\nworkflow result: {result}") + # Clean up the offset file; in a real consumer you'd retain it as + # long as the user might reconnect. + offset_path.unlink(missing_ok=True) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_truncating_ticker.py b/workflow_streams/run_truncating_ticker.py new file mode 100644 index 00000000..65f8740e --- /dev/null +++ b/workflow_streams/run_truncating_ticker.py @@ -0,0 +1,89 @@ +"""Truncating ticker: bounded log + slow vs. fast subscribers. + +The ``TickerWorkflow`` publishes ``count`` events at a fixed interval, +calling ``self.stream.truncate(...)`` periodically to bound log +growth. This script subscribes twice — once fast, once slow — and +prints both side-by-side so the trade is visible: + +* The fast subscriber keeps up and sees every published offset in + order. +* The slow subscriber sleeps between iterations. When a truncation + runs past its position, the iterator silently jumps forward to the + new base offset — the slow subscriber's offsets jump too, and + intermediate events are not visible to it. + +This is the bounded-log model: log size is capped, slow consumers may +miss intermediate events, but they always see the most recent state. +For long-running workflows pushing high event volumes this is usually +the right trade — pair with set-semantic events where each event +carries enough state to make missing the prior ones recoverable. + +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: + + uv run workflow_streams/run_truncating_ticker.py +""" + +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import ( + TASK_QUEUE, + TOPIC_TICK, + TickerInput, + TickEvent, +) +from workflow_streams.workflows.ticker_workflow import TickerWorkflow + + +SLOW_SUBSCRIBER_DELAY_S = 1.5 +TICKER_COUNT = 20 + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-ticker-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + TickerWorkflow.run, + TickerInput( + count=TICKER_COUNT, + keep_last=3, + truncate_every=5, + interval_ms=400, + ), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + stream = WorkflowStreamClient.create(client, workflow_id) + last_n = TICKER_COUNT - 1 + + # Both subscribers break on the final tick (n == last_n). ``keep_last`` + # ensures that offset survives the last truncation so even the slow + # consumer reaches it. + async def fast_subscriber() -> None: + async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent): + print(f"[fast] offset={item.offset:3d} n={item.data.n}") + if item.data.n == last_n: + return + + async def slow_subscriber() -> None: + async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent): + print(f"[SLOW] offset={item.offset:3d} n={item.data.n}") + if item.data.n == last_n: + return + await asyncio.sleep(SLOW_SUBSCRIBER_DELAY_S) + + await asyncio.gather(fast_subscriber(), slow_subscriber()) + + result = await handle.result() + print(f"\nworkflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_worker.py b/workflow_streams/run_worker.py new file mode 100644 index 00000000..8aa12edc --- /dev/null +++ b/workflow_streams/run_worker.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from workflow_streams.activities.payment_activity import charge_card +from workflow_streams.shared import TASK_QUEUE +from workflow_streams.workflows.hub_workflow import HubWorkflow +from workflow_streams.workflows.order_workflow import OrderWorkflow +from workflow_streams.workflows.pipeline_workflow import PipelineWorkflow +from workflow_streams.workflows.ticker_workflow import TickerWorkflow + + +async def main() -> None: + logging.basicConfig(level=logging.INFO) + client = await Client.connect("localhost:7233") + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[HubWorkflow, OrderWorkflow, PipelineWorkflow, TickerWorkflow], + activities=[charge_card], + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/shared.py b/workflow_streams/shared.py new file mode 100644 index 00000000..746ee73d --- /dev/null +++ b/workflow_streams/shared.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import asyncio +from collections.abc import Coroutine +from dataclasses import dataclass +from typing import Any, TypeVar + +from temporalio.client import WorkflowHandle +from temporalio.contrib.workflow_streams import WorkflowStreamState + +TASK_QUEUE = "workflow-stream-sample-task-queue" + +# Topics published by the workflow / activity. +TOPIC_STATUS = "status" +TOPIC_PROGRESS = "progress" +TOPIC_NEWS = "news" +TOPIC_TICK = "tick" + + +@dataclass +class OrderInput: + order_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class StatusEvent: + kind: str + order_id: str + + +@dataclass +class ProgressEvent: + message: str + + +@dataclass +class PipelineInput: + pipeline_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class StageEvent: + stage: str + + +@dataclass +class HubInput: + hub_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class NewsEvent: + headline: str + + +@dataclass +class TickerInput: + count: int = 20 + keep_last: int = 3 + truncate_every: int = 5 + interval_ms: int = 400 + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class TickEvent: + n: int + + +T = TypeVar("T") + + +async def race_with_workflow( + consumer: Coroutine[Any, Any, None], + handle: WorkflowHandle[Any, T], +) -> T: + """Run a subscriber concurrently with the workflow. + + If the workflow finishes before the subscriber sees its terminal + event, cancel the subscriber and surface the workflow's result + (raising on failure). If the subscriber finishes first, wait for + the workflow result. A non-cancellation failure in the subscriber + is propagated either way. + + Without this, a workflow that raises before publishing its terminal + event would leave the subscriber blocked on its next poll forever. + """ + consumer_task = asyncio.create_task(consumer) + result_task = asyncio.create_task(handle.result()) + we_cancelled_consumer = False + try: + await asyncio.wait( + [consumer_task, result_task], + return_when=asyncio.FIRST_COMPLETED, + ) + if not consumer_task.done(): + consumer_task.cancel() + we_cancelled_consumer = True + # gather(return_exceptions=True) drains both tasks. Only + # cancellation we initiated is expected — anything else + # propagates. + consumer_outcome, workflow_outcome = await asyncio.gather( + consumer_task, result_task, return_exceptions=True + ) + if isinstance(consumer_outcome, asyncio.CancelledError): + if not we_cancelled_consumer: + raise consumer_outcome + elif isinstance(consumer_outcome, BaseException): + raise consumer_outcome + if isinstance(workflow_outcome, BaseException): + raise workflow_outcome + return workflow_outcome + finally: + for task in (consumer_task, result_task): + if not task.done(): + task.cancel() + for task in (consumer_task, result_task): + try: + await task + except BaseException: + pass diff --git a/workflow_streams/workflows/__init__.py b/workflow_streams/workflows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_streams/workflows/hub_workflow.py b/workflow_streams/workflows/hub_workflow.py new file mode 100644 index 00000000..5dcc3c5f --- /dev/null +++ b/workflow_streams/workflows/hub_workflow.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_streams import WorkflowStream + +from workflow_streams.shared import HubInput + + +@workflow.defn +class HubWorkflow: + """Passive stream host: starts up, waits, closes when told. + + Unlike OrderWorkflow or PipelineWorkflow, this workflow does no + work of its own — it exists only to host a ``WorkflowStream`` that + external publishers push events into and external subscribers read + from. The shape that fits a backend service or "event bus" pattern, + where the workflow owns durable state but the events come from + outside. + """ + + @workflow.init + def __init__(self, input: HubInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + self._closed = False + + @workflow.run + async def run(self, input: HubInput) -> str: + await workflow.wait_condition(lambda: self._closed) + # The publisher publishes its own terminator into the stream + # before signaling close (see run_external_publisher.py). + # Hold the run open briefly so subscribers' final poll + # delivers any items still in the log. + await workflow.sleep(timedelta(milliseconds=500)) + return f"hub {input.hub_id} closed" + + @workflow.signal + def close(self) -> None: + self._closed = True diff --git a/workflow_streams/workflows/order_workflow.py b/workflow_streams/workflows/order_workflow.py new file mode 100644 index 00000000..099634cd --- /dev/null +++ b/workflow_streams/workflows/order_workflow.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_streams import WorkflowStream + +from workflow_streams.shared import ( + TOPIC_PROGRESS, + TOPIC_STATUS, + OrderInput, + ProgressEvent, + StatusEvent, +) + +with workflow.unsafe.imports_passed_through(): + from workflow_streams.activities.payment_activity import charge_card + + +@workflow.defn +class OrderWorkflow: + """Process a fake order, publishing status and progress events. + + The workflow itself publishes status changes; an activity it runs + publishes finer-grained progress events using a + `WorkflowStreamClient`. A single stream carries both topics — + subscribers can filter on the topic(s) they care about. + """ + + @workflow.init + def __init__(self, input: OrderInput) -> None: + # Construct the stream from @workflow.init so it can register + # signal/update/query handlers before the workflow accepts any + # messages. Threading prior_state lets the workflow survive + # continue-as-new without losing buffered items. + self.stream = WorkflowStream(prior_state=input.stream_state) + self.status = self.stream.topic(TOPIC_STATUS, type=StatusEvent) + self.progress = self.stream.topic(TOPIC_PROGRESS, type=ProgressEvent) + + @workflow.run + async def run(self, input: OrderInput) -> str: + self.status.publish(StatusEvent(kind="received", order_id=input.order_id)) + + charge_id = await workflow.execute_activity( + charge_card, + input.order_id, + start_to_close_timeout=timedelta(seconds=30), + ) + + self.status.publish(StatusEvent(kind="shipped", order_id=input.order_id)) + self.progress.publish(ProgressEvent(message=f"charge id: {charge_id}")) + self.status.publish(StatusEvent(kind="complete", order_id=input.order_id)) + # The "complete" status event above is the in-band terminator + # subscribers break on (see run_publisher.py). Hold the run + # open briefly so subscribers' next poll delivers it before + # this task returns and the in-memory log is gone. + await workflow.sleep(timedelta(milliseconds=500)) + return charge_id diff --git a/workflow_streams/workflows/pipeline_workflow.py b/workflow_streams/workflows/pipeline_workflow.py new file mode 100644 index 00000000..83336905 --- /dev/null +++ b/workflow_streams/workflows/pipeline_workflow.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_streams import WorkflowStream + +from workflow_streams.shared import ( + TOPIC_STATUS, + PipelineInput, + StageEvent, +) + + +@workflow.defn +class PipelineWorkflow: + """Multi-stage pipeline that publishes stage transitions over time. + + Stages are spaced out with ``workflow.sleep`` so a subscriber can + realistically disconnect partway through and reconnect without the + pipeline finishing in the meantime — the shape needed to demo the + "show up late and still see what happened" pattern. + """ + + @workflow.init + def __init__(self, input: PipelineInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + self.status = self.stream.topic(TOPIC_STATUS, type=StageEvent) + + @workflow.run + async def run(self, input: PipelineInput) -> str: + stages = [ + "validating", + "loading data", + "transforming", + "writing output", + "verifying", + "complete", + ] + for stage in stages: + self.status.publish(StageEvent(stage=stage)) + if stage != "complete": + await workflow.sleep(timedelta(seconds=2)) + # The "complete" stage above is the in-band terminator + # subscribers break on. Hold the run open briefly so the final + # poll delivers it. + await workflow.sleep(timedelta(milliseconds=500)) + return f"pipeline {input.pipeline_id} done" diff --git a/workflow_streams/workflows/ticker_workflow.py b/workflow_streams/workflows/ticker_workflow.py new file mode 100644 index 00000000..566b98f1 --- /dev/null +++ b/workflow_streams/workflows/ticker_workflow.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_streams import WorkflowStream + +from workflow_streams.shared import ( + TOPIC_TICK, + TickEvent, + TickerInput, +) + + +@workflow.defn +class TickerWorkflow: + """Long-running ticker that bounds its event log via ``truncate``. + + Long-running workflows that publish high volumes of events would + otherwise grow their event log unboundedly. This workflow shows + the truncation pattern: every ``truncate_every`` events, drop + everything except the last ``keep_last`` entries by calling + ``self.stream.truncate(safe_offset)``. + + Subscribers that fall behind a truncation jump forward to the new + base offset transparently (the iterator handles the + ``TruncatedOffset`` error internally), so consumers stay live but + may not see every intermediate event. That is the trade: bounded + log size in exchange for at-best-effort delivery to slow + consumers. + + To compute the truncation offset the workflow tracks its own + published count. ``WorkflowStream`` does not expose a workflow-side + head-offset accessor, but the running count plus the carried + ``base_offset`` (in continue-as-new chains) is sufficient. + """ + + @workflow.init + def __init__(self, input: TickerInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + self.tick = self.stream.topic(TOPIC_TICK, type=TickEvent) + # Running count of events published by THIS run. To compute a + # global offset, add the prior_state's base_offset (omitted + # here — this sample doesn't continue-as-new). + self._published = 0 + + @workflow.run + async def run(self, input: TickerInput) -> str: + for n in range(input.count): + self.tick.publish(TickEvent(n=n)) + self._published += 1 + await workflow.sleep(timedelta(milliseconds=input.interval_ms)) + if ( + self._published % input.truncate_every == 0 + and self._published > input.keep_last + ): + # Drop everything except the last `keep_last` entries. + truncate_to = self._published - input.keep_last + self.stream.truncate(truncate_to) + # The final tick (n == count - 1) is the in-band terminator + # subscribers break on. ``keep_last`` guarantees that final + # offset survives the last truncation so even slow consumers + # eventually see it. Hold the run open briefly so the final + # poll delivers it. + await workflow.sleep(timedelta(milliseconds=500)) + return f"ticker emitted {self._published} events"