-
Notifications
You must be signed in to change notification settings - Fork 176
AI-36: Add LangGraph plugin #1448
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 36 commits
Commits
Show all changes
62 commits
Select commit
Hold shift + click to select a range
cb0af36
add langgraph plugin
brianstrauch 8f30e56
add experimental package warnings
brianstrauch d1470c7
fix ruff lint
brianstrauch 68a0eb4
fix pyright lint errors
brianstrauch cbd066a
fixed some mypy lints
brianstrauch 5d1f182
fix docstring lints
brianstrauch ea24a13
copilot code review
brianstrauch 19a5052
fix mypy lint
brianstrauch 0201c89
separate graphs and entrypoints by task queue to avoid concurrent wri…
brianstrauch e98dd96
use graph.node or task_id for activity names to avoid collisions
brianstrauch e1a93fc
rm local conftest in favor of global and fix lint
brianstrauch 0eeeb00
Merge branch 'main' into langgraph
brianstrauch 58cbdc4
Merge branch 'main' into langgraph
brianstrauch 7d925b0
allow langgraph 1.1
brianstrauch 05bd7aa
uv lock
brianstrauch 2253a5f
add default_activity_options
brianstrauch 86837df
add replay test
brianstrauch dc2ed59
fix gaps in missing tests
brianstrauch c0525cc
introduce an interceptor to patch is_running only in the workflow, th…
brianstrauch b78d063
add interceptor
brianstrauch cabbb1a
Merge branch 'main' into langgraph
brianstrauch 8ef609f
remove graph and entrypoint functions in favor of direct graph usage
brianstrauch ec8244c
rename cache() to get_cache()
brianstrauch 7c54795
Merge branch 'main' into langgraph
brianstrauch 1da61a8
remove interceptor
brianstrauch c84c22f
allow metadata to be accessed from node func and test
brianstrauch 077452d
Fix import sorting in test_node_metadata.py
DABH 7716e47
Fix formatting in langgraph_plugin.py
DABH 30094dc
Fix mypy errors: add type params to StateGraph and use State() constr…
DABH 01d18ed
Merge branch 'main' into langgraph
DABH e0d766c
Fix langsmith sandbox crash when langchain_core is installed
DABH 38f2a18
Suppress basedpyright unused import warning for langchain_core preload
DABH aba75a3
Skip langgraph async tests on Python < 3.11 and warn plugin users
DABH 6f19fa8
Remove duplicate pytest import in test_interrupt.py
DABH 567f423
Fix basedpyright reportUnreachable warning on version check
DABH 3191912
Increase execution_timeout for OpenAI tests that call the real API
DABH d13d912
Revert "allow metadata to be accessed from node func and test"
brianstrauch 63e3d0c
Revert "rename cache() to get_cache()"
brianstrauch 21acfea
Revert "remove graph and entrypoint functions in favor of direct grap…
brianstrauch 4eb67e5
reimplement node metadata fixes
brianstrauch f62bc09
scope graphs and entrypoints to workflow, rename files
brianstrauch aa84892
test sync nodes and tasks, send
brianstrauch 4b915b1
support command goto/update
brianstrauch c36423b
add test for command
brianstrauch 04b56e7
raise error if node or task has a retry policy
brianstrauch ef96ba4
support runtime context
brianstrauch b742951
fix lint
brianstrauch 5ecd3b1
Merge branch 'main' into langgraph
brianstrauch f9c8322
Merge branch 'main' into langgraph
brianstrauch 6135fe0
Revert changes to langsmith test_integration.py
DABH 65036a0
code review
brianstrauch 44c7b88
Remove langchain_core from LangSmith plugin sandbox passthroughs (CI …
DABH d9187c8
Restore langchain_core to LangSmith plugin sandbox passthroughs
DABH 1f5b5e3
Merge branch 'main' into langgraph
DABH 872adf1
underscore py files in langgraph plugin dir
brianstrauch 4c333b2
include all serializable data for langgraph config
brianstrauch 5deb0b9
mention langsmith tracing in readme
brianstrauch 01f1312
require execute_in
brianstrauch 39850ee
Merge branch 'main' into langgraph
DABH d749fc7
fix lint
brianstrauch 5714d21
fix flaky test
brianstrauch 6a32dfd
fix docs
brianstrauch File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| # LangGraph Plugin for Temporal Python SDK | ||
|
|
||
| ⚠️ **This package is currently at an experimental release stage.** ⚠️ | ||
|
|
||
| This Temporal [Plugin](https://docs.temporal.io/develop/plugins-guide) allows you to run [LangGraph](https://www.langchain.com/langgraph) nodes and tasks as Temporal Activities, giving your AI workflows durable execution, automatic retries, and timeouts. It supports both the LangGraph Graph API (``StateGraph``) and Functional API (``@entrypoint`` / ``@task``). | ||
|
|
||
| ## Installation | ||
|
|
||
| ```sh | ||
| uv add temporalio[langgraph] | ||
| ``` | ||
|
|
||
| or with pip: | ||
|
|
||
| ```sh | ||
| pip install temporalio[langgraph] | ||
| ``` | ||
|
|
||
| ## Module layout | ||
|
|
||
| Define your graphs, tasks, and entrypoints in a module **separate** from your `@workflow.defn` classes — the standard Temporal split. The plugin adds the graph/task modules to the workflow sandbox's passthrough list so its in-place rewrites are visible to the workflow. Workflow modules stay sandboxed. | ||
|
|
||
| ## Graph API | ||
|
|
||
| ```python | ||
| # graphs.py | ||
| from langgraph.graph import START, StateGraph | ||
|
|
||
| my_graph = StateGraph(State) | ||
| my_graph.add_node("my_node", my_node) | ||
| my_graph.add_edge(START, "my_node") | ||
|
|
||
| # workflow.py | ||
| from temporalio import workflow | ||
| from myapp.graphs import my_graph | ||
|
|
||
| @workflow.defn | ||
| class MyWorkflow: | ||
| @workflow.run | ||
| async def run(self, input): | ||
| return await my_graph.compile().ainvoke(input) | ||
|
|
||
| # worker.py | ||
| from temporalio.contrib.langgraph import LangGraphPlugin | ||
| from myapp.graphs import my_graph | ||
|
|
||
| plugin = LangGraphPlugin(graphs=[my_graph]) | ||
| ``` | ||
|
|
||
| ## Functional API | ||
|
|
||
| ```python | ||
| # flows.py | ||
| from langgraph.func import entrypoint, task | ||
|
|
||
| @task | ||
| async def my_task(x): ... | ||
|
|
||
| @entrypoint() | ||
| async def my_flow(inputs): | ||
| return await my_task(inputs) | ||
|
|
||
| # workflow.py | ||
| from temporalio import workflow | ||
| from myapp.flows import my_flow | ||
|
|
||
| @workflow.defn | ||
| class MyWorkflow: | ||
| @workflow.run | ||
| async def run(self, input): | ||
| return await my_flow.ainvoke(input) | ||
|
|
||
| # worker.py | ||
| import datetime | ||
| from temporalio.contrib.langgraph import LangGraphPlugin | ||
| from myapp.flows import my_task | ||
|
|
||
| plugin = LangGraphPlugin( | ||
| tasks=[my_task], | ||
| activity_options={ | ||
| "my_task": { | ||
| "start_to_close_timeout": datetime.timedelta(seconds=30), | ||
| }, | ||
| }, | ||
| ) | ||
| ``` | ||
|
|
||
| ## Checkpointer | ||
|
|
||
| Use `InMemorySaver` as your checkpointer. Temporal handles durability, so third-party checkpointers (like PostgreSQL or Redis) are not needed. | ||
|
|
||
| ```python | ||
| import langgraph.checkpoint.memory | ||
|
|
||
| from temporalio import workflow | ||
| from myapp.graphs import my_graph | ||
|
|
||
| @workflow.defn | ||
| class MyWorkflow: | ||
| @workflow.run | ||
| async def run(self, input): | ||
| app = my_graph.compile( | ||
| checkpointer=langgraph.checkpoint.memory.InMemorySaver(), | ||
| ) | ||
| ... | ||
| ``` | ||
|
|
||
| ## Activity Options | ||
|
|
||
| Options are passed through to [`workflow.execute_activity()`](https://python.temporal.io/temporalio.workflow.html#execute_activity), which supports parameters like `start_to_close_timeout`, `retry_policy`, `schedule_to_close_timeout`, `heartbeat_timeout`, and more. | ||
|
|
||
| ### Graph API | ||
|
|
||
| Pass per-node options as node `metadata`, or plugin-wide defaults via `default_activity_options`: | ||
|
|
||
| ```python | ||
| import datetime | ||
| from temporalio.common import RetryPolicy | ||
|
|
||
| g = StateGraph(State) | ||
| g.add_node("my_node", my_node, metadata={ | ||
| "start_to_close_timeout": datetime.timedelta(seconds=30), | ||
| "retry_policy": RetryPolicy(maximum_attempts=3), | ||
| }) | ||
|
|
||
| plugin = LangGraphPlugin( | ||
| graphs=[g], | ||
| default_activity_options={"start_to_close_timeout": datetime.timedelta(seconds=60)}, | ||
| ) | ||
| ``` | ||
|
|
||
| ### Functional API | ||
|
|
||
| Pass activity options to the plugin, keyed by task function name: | ||
|
|
||
| ```python | ||
| import datetime | ||
| from temporalio.common import RetryPolicy | ||
| from temporalio.contrib.langgraph import LangGraphPlugin | ||
|
|
||
| plugin = LangGraphPlugin( | ||
| tasks=[my_task], | ||
| activity_options={ | ||
| "my_task": { | ||
| "start_to_close_timeout": datetime.timedelta(seconds=30), | ||
| "retry_policy": RetryPolicy(maximum_attempts=3), | ||
| }, | ||
| }, | ||
| ) | ||
| ``` | ||
|
|
||
| ### Running in the Workflow | ||
|
|
||
| To skip the Activity wrapper and run a node or task directly in the Workflow, set `execute_in` to `"workflow"`: | ||
|
|
||
| ```python | ||
| # Graph API | ||
| g.add_node("my_node", my_node, metadata={"execute_in": "workflow"}) | ||
|
|
||
| # Functional API | ||
| plugin = LangGraphPlugin( | ||
| tasks=[my_task], | ||
| activity_options={"my_task": {"execute_in": "workflow"}}, | ||
| ) | ||
| ``` | ||
|
|
||
| ## Continue-As-New | ||
|
|
||
| To carry cached task results across a continue-as-new boundary, pass the cache to your next run and restore it with `set_cache`: | ||
|
|
||
| ```python | ||
| from temporalio import workflow | ||
| from temporalio.contrib.langgraph import get_cache, set_cache | ||
| from myapp.graphs import my_graph | ||
|
|
||
| @workflow.defn | ||
| class MyWorkflow: | ||
| @workflow.run | ||
| async def run(self, input, prev_cache=None): | ||
| set_cache(prev_cache) | ||
| result = await my_graph.compile().ainvoke(input) | ||
| if should_continue(result): | ||
| workflow.continue_as_new(next_input, get_cache()) | ||
| return result | ||
| ``` | ||
|
|
||
| ## Running Tests | ||
|
|
||
| Install dependencies: | ||
|
|
||
| ```sh | ||
| uv sync | ||
|
brianstrauch marked this conversation as resolved.
Outdated
|
||
| ``` | ||
|
|
||
| Run the test suite: | ||
|
|
||
| ```sh | ||
| uv run pytest | ||
| ``` | ||
|
|
||
| Tests start a local Temporal dev server automatically — no external server needed. | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| """LangGraph plugin for Temporal SDK. | ||
|
|
||
| .. warning:: | ||
| This package is experimental and may change in future versions. | ||
| Use with caution in production environments. | ||
|
|
||
| This plugin runs `LangGraph <https://github.com/langchain-ai/langgraph>`_ nodes | ||
| and tasks as Temporal Activities, giving your AI agent workflows durable | ||
| execution, automatic retries, and timeouts. It supports both the LangGraph Graph | ||
| API (``StateGraph``) and Functional API (``@entrypoint`` / ``@task``). | ||
| """ | ||
|
|
||
| from temporalio.contrib.langgraph.langgraph_plugin import ( | ||
| LangGraphPlugin, | ||
| get_cache, | ||
| set_cache, | ||
| ) | ||
|
|
||
| __all__ = [ | ||
| "LangGraphPlugin", | ||
| "get_cache", | ||
| "set_cache", | ||
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| """Activity wrappers for executing LangGraph nodes and tasks.""" | ||
|
|
||
| from collections.abc import Awaitable | ||
| from dataclasses import dataclass | ||
| from inspect import iscoroutinefunction | ||
| from typing import Any, Callable | ||
|
|
||
| from langgraph.errors import GraphInterrupt | ||
| from langgraph.types import Interrupt | ||
|
|
||
| from temporalio import workflow | ||
| from temporalio.contrib.langgraph.langgraph_config import ( | ||
| get_langgraph_config, | ||
| set_langgraph_config, | ||
| ) | ||
| from temporalio.contrib.langgraph.task_cache import ( | ||
| cache_key, | ||
| cache_lookup, | ||
| cache_put, | ||
| ) | ||
|
|
||
|
|
||
| @dataclass | ||
| class ActivityInput: | ||
| """Input for a LangGraph activity, containing args, kwargs, and config.""" | ||
|
|
||
| args: tuple[Any, ...] | ||
| kwargs: dict[str, Any] | ||
| langgraph_config: dict[str, Any] | ||
|
|
||
|
|
||
| @dataclass | ||
| class ActivityOutput: | ||
| """Output from a LangGraph activity, containing result or interrupts.""" | ||
|
|
||
| result: Any = None | ||
| langgraph_interrupts: tuple[Interrupt] | None = None | ||
|
|
||
|
|
||
| def wrap_activity( | ||
| func: Callable, | ||
| ) -> Callable[[ActivityInput], Awaitable[ActivityOutput]]: | ||
| """Wrap a function as a Temporal activity that handles LangGraph config and interrupts.""" | ||
|
|
||
| async def wrapper(input: ActivityInput) -> ActivityOutput: | ||
| set_langgraph_config(input.langgraph_config) | ||
| try: | ||
| if iscoroutinefunction(func): | ||
| result = await func(*input.args, **input.kwargs) | ||
| else: | ||
| result = func(*input.args, **input.kwargs) | ||
| return ActivityOutput(result=result) | ||
| except GraphInterrupt as e: | ||
| return ActivityOutput(langgraph_interrupts=e.args[0]) | ||
|
|
||
| return wrapper | ||
|
|
||
|
|
||
| def wrap_execute_activity( | ||
| afunc: Callable[[ActivityInput], Awaitable[ActivityOutput]], | ||
| task_id: str = "", | ||
| **execute_activity_kwargs: Any, | ||
| ) -> Callable[..., Any]: | ||
| """Wrap an activity function to be called via workflow.execute_activity with caching.""" | ||
|
|
||
| async def wrapper(*args: Any, **kwargs: Any) -> Any: | ||
| # LangGraph may inject a RunnableConfig as the 'config' kwarg. Strip it | ||
| # down to a serializable subset (metadata + tags) so it can cross the | ||
| # activity boundary; callbacks, stores, etc. aren't serializable. | ||
| if "config" in kwargs: | ||
| orig = kwargs["config"] or {} | ||
| kwargs["config"] = { | ||
| "metadata": dict(orig.get("metadata") or {}), | ||
| "tags": list(orig.get("tags") or []), | ||
| } | ||
|
|
||
| # Check task result cache (for continue-as-new deduplication). | ||
| key = cache_key(task_id, args, kwargs) if task_id else "" | ||
| if task_id: | ||
| found, cached = cache_lookup(key) | ||
| if found: | ||
| return cached | ||
|
|
||
| input = ActivityInput( | ||
| args=args, kwargs=kwargs, langgraph_config=get_langgraph_config() | ||
| ) | ||
| output = await workflow.execute_activity( | ||
| afunc, input, **execute_activity_kwargs | ||
| ) | ||
| if output.langgraph_interrupts is not None: | ||
| raise GraphInterrupt(output.langgraph_interrupts) | ||
|
|
||
| # Store in cache for future continue-as-new cycles. | ||
| if task_id: | ||
| cache_put(key, output.result) | ||
|
|
||
| return output.result | ||
|
|
||
| return wrapper |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.