Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [gevent_async](gevent_async) - Combine gevent and Temporal.
* [hello_standalone_activity](hello_standalone_activity) - Use activities without using a workflow.
* [langchain](langchain) - Orchestrate workflows for LangChain.
* [langgraph_plugin](langgraph_plugin) - Run LangGraph workflows as durable Temporal workflows (Graph API and Functional API).
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
* [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
* [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist.
Expand Down
65 changes: 65 additions & 0 deletions langgraph_plugin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# LangGraph Plugin Samples

These samples demonstrate the [Temporal LangGraph plugin](https://github.com/temporalio/sdk-python/pull/1448), which runs LangGraph workflows as durable Temporal workflows. Each LangGraph graph node (Graph API) or `@task` (Functional API) executes as a Temporal activity with automatic retries, timeouts, and crash recovery.

Samples are organized by API style:

- **Graph API** (`graph_api/`) -- Define workflows as `StateGraph` with nodes and edges.
- **Functional API** (`functional_api/`) -- Define workflows with `@task` and `@entrypoint` decorators for an imperative programming style.

## Samples

| Sample | Graph API | Functional API | Description |
|--------|:---------:|:--------------:|-------------|
| **Human-in-the-loop** | [graph_api/human_in_the_loop](graph_api/human_in_the_loop) | [functional_api/human_in_the_loop](functional_api/human_in_the_loop) | Chatbot that uses `interrupt()` to pause for human approval, Temporal signals to receive feedback, and queries to expose the pending draft. |
| **Continue-as-new** | [graph_api/continue_as_new](graph_api/continue_as_new) | [functional_api/continue_as_new](functional_api/continue_as_new) | Multi-stage data pipeline that uses `continue-as-new` with task result caching so previously-completed stages are not re-executed. |
| **ReAct Agent** | [graph_api/react_agent](graph_api/react_agent) | [functional_api/react_agent](functional_api/react_agent) | Tool-calling agent loop. Graph API uses conditional edges; Functional API uses a `while` loop. |
| **Control Flow** | -- | [functional_api/control_flow](functional_api/control_flow) | Demonstrates parallel task execution, `for` loops, and `if/else` branching -- patterns that are natural in the Functional API. |
Comment thread
DABH marked this conversation as resolved.

## Prerequisites

1. Install dependencies:

```bash
uv sync --group langgraph
```

Comment thread
DABH marked this conversation as resolved.
2. Start a [Temporal dev server](https://docs.temporal.io/cli#start-dev-server):

```bash
temporal server start-dev
```

## Running a Sample

Each sample has two scripts -- start the worker first, then the workflow starter in a separate terminal.

```bash
# Terminal 1: start the worker
uv run langgraph_plugin/<api>/<sample>/run_worker.py

# Terminal 2: start the workflow
uv run langgraph_plugin/<api>/<sample>/run_workflow.py
```

For example, to run the Graph API human-in-the-loop chatbot:

```bash
# Terminal 1
uv run langgraph_plugin/graph_api/human_in_the_loop/run_worker.py

# Terminal 2
uv run langgraph_plugin/graph_api/human_in_the_loop/run_workflow.py
```

## Key Features Demonstrated

- **Durable execution** -- Every graph node / `@task` runs as a Temporal activity with configurable timeouts and retry policies.
- **Human-in-the-loop** -- LangGraph's `interrupt()` pauses the graph; Temporal signals deliver human input; queries expose pending state to UIs.
- **Continue-as-new with caching** -- `get_cache()` captures completed task results; passing the cache to the next execution avoids re-running them.
- **Conditional routing** -- Graph API's `add_conditional_edges` and Functional API's native `if/else`/`while` for agent loops.
- **Parallel execution** -- Functional API launches multiple tasks concurrently by creating futures before awaiting them.

## Related

- [SDK PR: LangGraph plugin](https://github.com/temporalio/sdk-python/pull/1448)
Empty file added langgraph_plugin/__init__.py
Empty file.
Empty file.
Empty file.
36 changes: 36 additions & 0 deletions langgraph_plugin/functional_api/continue_as_new/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Worker for the continue-as-new pipeline (Functional API)."""

import asyncio

from temporalio.client import Client
from temporalio.contrib.langgraph import LangGraphPlugin
from temporalio.worker import Worker

from langgraph_plugin.functional_api.continue_as_new.workflow import (
PipelineFunctionalWorkflow,
activity_options,
all_tasks,
pipeline_entrypoint,
)


async def main() -> None:
client = await Client.connect("localhost:7233")
plugin = LangGraphPlugin(
entrypoints={"pipeline": pipeline_entrypoint},
tasks=all_tasks,
activity_options=activity_options,
)

worker = Worker(
client,
task_queue="langgraph-pipeline-functional",
workflows=[PipelineFunctionalWorkflow],
plugins=[plugin],
)
print("Worker started. Ctrl+C to exit.")
await worker.run()


if __name__ == "__main__":
asyncio.run(main())
30 changes: 30 additions & 0 deletions langgraph_plugin/functional_api/continue_as_new/run_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Start the continue-as-new pipeline workflow (Functional API)."""

import asyncio
from datetime import timedelta

from temporalio.client import Client

from langgraph_plugin.functional_api.continue_as_new.workflow import (
PipelineFunctionalWorkflow,
PipelineInput,
)


async def main() -> None:
client = await Client.connect("localhost:7233")

result = await client.execute_workflow(
PipelineFunctionalWorkflow.run,
PipelineInput(data=10),
id="pipeline-functional-workflow",
task_queue="langgraph-pipeline-functional",
execution_timeout=timedelta(seconds=60),
)

# 10*2=20 -> 20+50=70 -> 70*3=210
print(f"Pipeline result: {result}")


if __name__ == "__main__":
asyncio.run(main())
81 changes: 81 additions & 0 deletions langgraph_plugin/functional_api/continue_as_new/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Continue-as-new with caching using the LangGraph Functional API with Temporal.

Same pattern as the Graph API version, but using @task and @entrypoint decorators.
"""

from dataclasses import dataclass
from datetime import timedelta
from typing import Any

from langgraph.func import entrypoint as lg_entrypoint
from langgraph.func import task
from temporalio import workflow
from temporalio.contrib.langgraph import entrypoint, get_cache


@task
def extract(data: int) -> int:
"""Stage 1: Extract -- simulate data extraction by doubling the input."""
return data * 2


@task
def transform(data: int) -> int:
"""Stage 2: Transform -- simulate transformation by adding 50."""
return data + 50


@task
def load(data: int) -> int:
"""Stage 3: Load -- simulate loading by tripling the result."""
return data * 3


@lg_entrypoint()
async def pipeline_entrypoint(data: int) -> dict:
"""Run the 3-stage pipeline: extract -> transform -> load."""
extracted = await extract(data)
transformed = await transform(extracted)
loaded = await load(transformed)
return {"result": loaded}


all_tasks = [extract, transform, load]

activity_options = {
t.func.__name__: {"start_to_close_timeout": timedelta(seconds=30)}
for t in all_tasks
}


@dataclass
class PipelineInput:
data: int
cache: dict[str, Any] | None = None
phase: int = 1


@workflow.defn
class PipelineFunctionalWorkflow:
"""Runs the pipeline, continuing-as-new after each phase.

Input 10: 10*2=20 -> 20+50=70 -> 70*3=210
Each task executes once; phases 2 and 3 use cached results.
"""

@workflow.run
async def run(self, input_data: PipelineInput) -> dict[str, Any]:
result = await entrypoint("pipeline", cache=input_data.cache).ainvoke(
input_data.data
)

if input_data.phase < 3:
workflow.continue_as_new(
PipelineInput(
data=input_data.data,
cache=get_cache(),
phase=input_data.phase + 1,
)
)

return result
Comment thread
DABH marked this conversation as resolved.
Empty file.
36 changes: 36 additions & 0 deletions langgraph_plugin/functional_api/control_flow/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Worker for the control flow pipeline (Functional API)."""

import asyncio

from temporalio.client import Client
from temporalio.contrib.langgraph import LangGraphPlugin
from temporalio.worker import Worker

from langgraph_plugin.functional_api.control_flow.workflow import (
ControlFlowWorkflow,
activity_options,
all_tasks,
control_flow_pipeline,
)


async def main() -> None:
client = await Client.connect("localhost:7233")
plugin = LangGraphPlugin(
entrypoints={"control_flow": control_flow_pipeline},
tasks=all_tasks,
activity_options=activity_options,
)

worker = Worker(
client,
task_queue="langgraph-control-flow",
workflows=[ControlFlowWorkflow],
plugins=[plugin],
)
print("Worker started. Ctrl+C to exit.")
await worker.run()


if __name__ == "__main__":
asyncio.run(main())
37 changes: 37 additions & 0 deletions langgraph_plugin/functional_api/control_flow/run_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Start the control flow pipeline workflow (Functional API)."""

import asyncio

from temporalio.client import Client

from langgraph_plugin.functional_api.control_flow.workflow import (
ControlFlowWorkflow,
)


async def main() -> None:
client = await Client.connect("localhost:7233")

items = [
"Fix login bug",
"URGENT: Production outage in payments",
"Update README",
"INVALID:",
"Urgent: Security patch needed",
"Refactor test suite",
]

result = await client.execute_workflow(
ControlFlowWorkflow.run,
items,
id="control-flow-workflow",
task_queue="langgraph-control-flow",
)

print(f"Summary: {result['summary']}")
for r in result["results"]:
print(f" {r}")


if __name__ == "__main__":
asyncio.run(main())
96 changes: 96 additions & 0 deletions langgraph_plugin/functional_api/control_flow/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Control flow sample using the LangGraph Functional API with Temporal.

Demonstrates the Functional API's advantage for complex control flow:
- Parallel task execution (launch multiple tasks concurrently)
- Sequential for-loop processing
- Conditional if/else branching based on intermediate results
"""

from datetime import timedelta

from langgraph.func import entrypoint as lg_entrypoint
from langgraph.func import task
from temporalio import workflow
from temporalio.contrib.langgraph import entrypoint


@task
def validate_item(item: str) -> bool:
"""Validate an item. Returns True if the item is non-empty and well-formed."""
return len(item.strip()) > 0 and not item.startswith("INVALID:")


@task
def classify_item(item: str) -> str:
"""Classify an item as 'urgent' or 'normal' based on its content."""
return "urgent" if "urgent" in item.lower() else "normal"


@task
def process_urgent(item: str) -> str:
"""Process an urgent item with priority handling."""
return f"[PRIORITY] Processed: {item}"


@task
def process_normal(item: str) -> str:
"""Process a normal item with standard handling."""
return f"[STANDARD] Processed: {item}"


@task
def summarize(results: list[str]) -> str:
"""Produce a summary of all processed results."""
urgent_count = sum(1 for r in results if r.startswith("[PRIORITY]"))
normal_count = sum(1 for r in results if r.startswith("[STANDARD]"))
return (
f"Processed {len(results)} items ({urgent_count} urgent, {normal_count} normal)"
)


@lg_entrypoint()
async def control_flow_pipeline(items: list[str]) -> dict:
"""Process a batch of items with parallel validation, sequential
classification, and conditional routing.
"""
# PARALLEL: Validate all items concurrently.
# Creating task futures without awaiting launches them in parallel.
validation_futures = [validate_item(item) for item in items]
valid_flags = [await f for f in validation_futures]
valid_items = [item for item, is_valid in zip(items, valid_flags) if is_valid]

# SEQUENTIAL + CONDITIONAL: Process each valid item
results = []
for item in valid_items:
category = await classify_item(item)
if category == "urgent":
result = await process_urgent(item)
else:
result = await process_normal(item)
results.append(result)

# Aggregate all results
summary_text = await summarize(results)

return {"results": results, "summary": summary_text, "total": len(results)}


all_tasks = [
validate_item,
classify_item,
process_urgent,
process_normal,
summarize,
]

activity_options = {
t.func.__name__: {"start_to_close_timeout": timedelta(seconds=30)}
for t in all_tasks
}
Comment on lines +86 to +92
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like we're hacking around our own limitations here. Let's just be explicit everywhere or make this more user friendly.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(My vote is to be more explicit, that was the point of making this change anyways)



@workflow.defn
class ControlFlowWorkflow:
@workflow.run
async def run(self, items: list[str]) -> dict:
return await entrypoint("control_flow").ainvoke(items)
Empty file.
Loading
Loading