From 5c77584489570c4f7b7dbb6a24e35ecf9a913fda Mon Sep 17 00:00:00 2001
From: Adam Outler
Date: Mon, 6 Apr 2026 23:04:40 -0400
Subject: [PATCH 01/13] feat: Add targeted operations agent server
---
.github/workflows/build-branch.yml | 18 +-
.github/workflows/ci.yml | 66 +
.gitignore | 2 +-
docker-compose.yml | 31 +
plane_mcp/__main__.py | 40 +-
plane_mcp/journey/__init__.py | 0
plane_mcp/journey/__main__.py | 77 +
plane_mcp/journey/base.py | 132 ++
plane_mcp/journey/cache.py | 104 +
plane_mcp/journey/lod.py | 164 ++
plane_mcp/journey/server.py | 80 +
plane_mcp/journey/tools/__init__.py | 12 +
plane_mcp/journey/tools/create_update.py | 265 +++
plane_mcp/journey/tools/read.py | 246 +++
plane_mcp/journey/tools/workflow.py | 205 ++
plane_mcp/resolver.py | 160 ++
plane_mcp/sanitize.py | 44 +
plane_mcp/tools/generated_core.py | 276 +++
plane_mcp/tools/generated_metadata.py | 341 ++++
plane_mcp/tools/journeys.py | 314 +++
pyproject.toml | 2 +
tests/conftest.py | 13 +
tests/test_generated_tools.py | 20 +
tests/test_input_validation.py | 242 +++
tests/test_integration.py | 10 +
tests/test_journey_lod.py | 159 ++
tests/test_journey_methods_comprehensive.py | 450 +++++
tests/test_journey_tools.py | 144 ++
tests/test_resolver.py | 159 ++
tests/test_smart_updates.py | 135 ++
uv.lock | 1952 ++++++++++---------
31 files changed, 4910 insertions(+), 953 deletions(-)
create mode 100644 .github/workflows/ci.yml
create mode 100644 docker-compose.yml
create mode 100644 plane_mcp/journey/__init__.py
create mode 100644 plane_mcp/journey/__main__.py
create mode 100644 plane_mcp/journey/base.py
create mode 100644 plane_mcp/journey/cache.py
create mode 100644 plane_mcp/journey/lod.py
create mode 100644 plane_mcp/journey/server.py
create mode 100644 plane_mcp/journey/tools/__init__.py
create mode 100644 plane_mcp/journey/tools/create_update.py
create mode 100644 plane_mcp/journey/tools/read.py
create mode 100644 plane_mcp/journey/tools/workflow.py
create mode 100644 plane_mcp/resolver.py
create mode 100644 plane_mcp/sanitize.py
create mode 100644 plane_mcp/tools/generated_core.py
create mode 100644 plane_mcp/tools/generated_metadata.py
create mode 100644 plane_mcp/tools/journeys.py
create mode 100644 tests/test_generated_tools.py
create mode 100644 tests/test_input_validation.py
create mode 100644 tests/test_journey_lod.py
create mode 100644 tests/test_journey_methods_comprehensive.py
create mode 100644 tests/test_journey_tools.py
create mode 100644 tests/test_resolver.py
create mode 100644 tests/test_smart_updates.py
diff --git a/.github/workflows/build-branch.yml b/.github/workflows/build-branch.yml
index b3ceacd..ef640c2 100644
--- a/.github/workflows/build-branch.yml
+++ b/.github/workflows/build-branch.yml
@@ -94,6 +94,16 @@ jobs:
- id: checkout_files
name: Checkout Files
uses: actions/checkout@v4
+ - name: Install Modern Docker CLI
+ run: |
+ sudo apt-get update
+ sudo apt-get install -y ca-certificates curl gnupg
+ sudo install -m 0755 -d /etc/apt/keyrings
+ curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
+ sudo chmod a+r /etc/apt/keyrings/docker.gpg
+ echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu jammy stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
+ sudo apt-get update
+ sudo apt-get install -y docker-ce-cli
- name: Plane MCP Server Build and Push
uses: makeplane/actions/build-push@v1.0.0
with:
@@ -102,7 +112,7 @@ jobs:
release-version: ${{ needs.release_build_setup.outputs.release_version }}
dockerhub-username: ${{ secrets.DOCKERHUB_USERNAME }}
dockerhub-token: ${{ secrets.DOCKERHUB_TOKEN }}
- docker-image-owner: makeplane
+ docker-image-owner: ${{ secrets.DOCKERHUB_USERNAME }}
docker-image-name: ${{ needs.release_build_setup.outputs.dh_img_name }}
build-context: .
dockerfile-path: ./Dockerfile
@@ -115,11 +125,7 @@ jobs:
if: ${{ needs.release_build_setup.outputs.build_type == 'Release' }}
name: Build Release
runs-on: ubuntu-22.04
- needs:
- [
- release_build_setup,
- build_and_push,
- ]
+ needs: [release_build_setup, build_and_push]
env:
REL_VERSION: ${{ needs.release_build_setup.outputs.release_version }}
steps:
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
new file mode 100644
index 0000000..cf3cbc6
--- /dev/null
+++ b/.github/workflows/ci.yml
@@ -0,0 +1,66 @@
+name: CI / CD
+
+on:
+ push:
+ branches:
+ - main
+ - master
+ - ai-journeys-rearchitecture
+ - plane-agent
+ pull_request:
+ branches:
+ - main
+ - master
+ workflow_dispatch:
+
+permissions:
+ packages: write
+ contents: write
+
+jobs:
+ test:
+ name: Run Tests
+ runs-on: ubuntu-latest
+ container:
+ image: python:3.12
+ steps:
+ - name: Setup and Checkout
+ uses: actions/checkout@v4
+
+ - name: Run tests
+ run: |
+ set +e
+ pip install '.[dev]' pytest pytest-asyncio pytest-mock pytest-timeout respx requests-mock anyio
+ pytest -v > test-output.log 2>&1
+ TEST_EXIT=$?
+ cat test-output.log
+ exit $TEST_EXIT
+
+ build-and-push:
+ name: Build & Push Docker
+ needs: test
+ runs-on: ubuntu-latest
+ if: github.event_name == 'push' || github.event_name == 'workflow_dispatch'
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v4
+
+ - name: Set up QEMU
+ uses: docker/setup-qemu-action@v3
+
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v3
+
+ - name: Log in to Docker Hub
+ uses: docker/login-action@v3
+ with:
+ username: ${{ secrets.DOCKERHUB_USERNAME }}
+ password: ${{ secrets.DOCKERHUB_TOKEN }}
+
+ - name: Build and push
+ uses: docker/build-push-action@v5
+ with:
+ context: .
+ push: true
+ platforms: linux/amd64,linux/arm64
+ tags: ${{ secrets.DOCKERHUB_USERNAME }}/plane-mcp-server:latest
diff --git a/.gitignore b/.gitignore
index 88f9cb1..fcda810 100644
--- a/.gitignore
+++ b/.gitignore
@@ -55,4 +55,4 @@ dmypy.json
.env.test.local
# Ignore cursor AI rules
-.cursor/rules/codacy.mdc
+.cursor/rules/codacy.mdc
\ No newline at end of file
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..0cda472
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,31 @@
+# This docker-compose configuration is intended for stand-alone MCP server testing.
+services:
+ plane-mcp:
+ build: .
+ container_name: plane-mcp
+ restart: unless-stopped
+ ports:
+ - "${FASTMCP_PORT:-8211}:${FASTMCP_PORT:-8211}"
+ environment:
+ - PLANE_API_KEY=${PLANE_API_KEY}
+ - PLANE_WORKSPACE_SLUG=${PLANE_WORKSPACE_SLUG}
+ - PLANE_BASE_URL=${PLANE_BASE_URL}
+ - FASTMCP_PORT=${FASTMCP_PORT:-8211}
+ - REDIS_HOST=redis
+ - REDIS_PORT=6379
+ entrypoint: ["python", "-m", "plane_mcp"]
+ command: ["http"]
+ depends_on:
+ - redis
+
+ redis:
+ image: redis:alpine
+ container_name: plane-redis
+ restart: unless-stopped
+ ports:
+ - "6379:6379"
+ volumes:
+ - redis_data:/data
+
+volumes:
+ redis_data:
diff --git a/plane_mcp/__main__.py b/plane_mcp/__main__.py
index d943d90..8af9170 100644
--- a/plane_mcp/__main__.py
+++ b/plane_mcp/__main__.py
@@ -4,7 +4,7 @@
import logging
import os
import sys
-from contextlib import asynccontextmanager
+from contextlib import AsyncExitStack, asynccontextmanager
from datetime import datetime, timezone
from enum import Enum
@@ -61,13 +61,12 @@ class ServerMode(Enum):
@asynccontextmanager
-async def combined_lifespan(oauth_app, header_app, sse_app):
- """Combine lifespans from both OAuth and Header MCP apps."""
- # Start both lifespans
- async with oauth_app.lifespan(oauth_app):
- async with header_app.lifespan(header_app):
- async with sse_app.lifespan(sse_app):
- yield
+async def combined_lifespan(apps):
+ """Combine lifespans from multiple MCP apps."""
+ async with AsyncExitStack() as stack:
+ for app in apps:
+ await stack.enter_async_context(app.lifespan(app))
+ yield
def main() -> None:
@@ -83,7 +82,10 @@ def main() -> None:
if not os.getenv("PLANE_WORKSPACE_SLUG"):
raise ValueError("PLANE_WORKSPACE_SLUG is not set")
- get_stdio_mcp().run()
+ from plane_mcp.journey.tools import register_tools as register_journey_tools
+ stdio_mcp = get_stdio_mcp()
+ register_journey_tools(stdio_mcp)
+ stdio_mcp.run()
return
if server_mode == ServerMode.HTTP:
@@ -97,17 +99,35 @@ def main() -> None:
oauth_well_known = oauth_mcp.auth.get_well_known_routes(mcp_path="/mcp")
sse_well_known = sse_mcp.auth.get_well_known_routes(mcp_path="/sse")
+ # --- AGENT JOURNEY API ---
+ from plane_mcp.journey.server import (
+ get_header_mcp as journey_get_header_mcp,
+ get_oauth_mcp as journey_get_oauth_mcp
+ )
+ journey_oauth_mcp = journey_get_oauth_mcp("/agent")
+ journey_oauth_app = journey_oauth_mcp.http_app(stateless_http=True)
+
+ journey_header_mcp = journey_get_header_mcp()
+ journey_header_app = journey_header_mcp.http_app(stateless_http=True)
+
+ journey_oauth_well_known = []
+ if hasattr(journey_oauth_mcp, 'auth') and journey_oauth_mcp.auth:
+ journey_oauth_well_known = journey_oauth_mcp.auth.get_well_known_routes(mcp_path="/agent/mcp")
+
app = Starlette(
routes=[
# Well-known routes for OAuth and Header HTTP
*oauth_well_known,
*sse_well_known,
+ *journey_oauth_well_known,
# Mount both MCP servers
Mount("/http/api-key", app=header_app),
Mount("/http", app=oauth_app),
+ Mount("/agent/api-key", app=journey_header_app),
+ Mount("/agent", app=journey_oauth_app),
Mount("/", app=sse_app),
],
- lifespan=lambda app: combined_lifespan(oauth_app, header_app, sse_app),
+ lifespan=lambda app: combined_lifespan([oauth_app, header_app, sse_app, journey_oauth_app, journey_header_app]),
)
app.add_middleware(
diff --git a/plane_mcp/journey/__init__.py b/plane_mcp/journey/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/plane_mcp/journey/__main__.py b/plane_mcp/journey/__main__.py
new file mode 100644
index 0000000..b86f5af
--- /dev/null
+++ b/plane_mcp/journey/__main__.py
@@ -0,0 +1,77 @@
+"""Main entry point for the Plane MCP Server."""
+
+import os
+import sys
+from contextlib import asynccontextmanager
+from enum import Enum
+
+import uvicorn
+from fastmcp.utilities.logging import get_logger
+from starlette.applications import Starlette
+from starlette.middleware.cors import CORSMiddleware
+from starlette.routing import Mount
+
+from plane_mcp.journey.server import get_header_mcp, get_oauth_mcp, get_stdio_mcp
+
+logger = get_logger(__name__)
+
+
+class ServerMode(Enum):
+ STDIO = "stdio"
+ SSE = "sse"
+ HTTP = "http"
+
+
+@asynccontextmanager
+async def combined_lifespan(oauth_app, header_app, sse_app):
+ """Combine lifespans from both OAuth and Header MCP apps."""
+ # Start both lifespans
+ async with oauth_app.lifespan(oauth_app):
+ async with header_app.lifespan(header_app):
+ async with sse_app.lifespan(sse_app):
+ yield
+
+
+def main() -> None:
+ """Run the MCP server."""
+ server_mode = ServerMode.STDIO
+ if len(sys.argv) > 1:
+ server_mode = ServerMode(sys.argv[1])
+
+ if server_mode == ServerMode.STDIO:
+ # Validate API_KEY and PLANE_WORKSPACE_SLUG are set
+ if not os.getenv("PLANE_API_KEY"):
+ raise ValueError("PLANE_API_KEY is not set")
+ if not os.getenv("PLANE_WORKSPACE_SLUG"):
+ raise ValueError("PLANE_WORKSPACE_SLUG is not set")
+
+ get_stdio_mcp().run()
+ return
+
+ if server_mode == ServerMode.HTTP:
+ sse_mcp = get_stdio_mcp()
+ sse_app = sse_mcp.http_app(transport="streamable-http")
+
+ app = Starlette(
+ routes=[
+ Mount("/", app=sse_app),
+ ],
+ lifespan=lambda app: sse_app.lifespan(sse_app),
+ )
+
+ app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["*"],
+ allow_credentials=False,
+ allow_methods=["*"],
+ allow_headers=["*"],
+ )
+
+ port = int(os.getenv("FASTMCP_PORT", "8211"))
+ logger.info(f"Starting HTTP server for Streamable HTTP at /mcp on port {port}")
+ uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")
+ return
+
+
+if __name__ == "__main__":
+ main()
diff --git a/plane_mcp/journey/base.py b/plane_mcp/journey/base.py
new file mode 100644
index 0000000..c417ded
--- /dev/null
+++ b/plane_mcp/journey/base.py
@@ -0,0 +1,132 @@
+"""Base classes and utilities for Journey-based AI tools."""
+
+from functools import wraps
+from typing import Any, Callable, Dict, Optional, TypeVar, Tuple, cast
+from plane_mcp.resolver import EntityResolver
+from plane_mcp.journey.lod import apply_lod, LODProfile
+
+T = TypeVar('T', bound=Callable[..., Any])
+
+
+class JourneyBase:
+ """
+ Base class for Journey tools. Provides EntityResolver wiring
+ and utilities to aggressively filter Level of Detail.
+ """
+
+ def __init__(self, resolver: EntityResolver):
+ self.resolver = resolver
+
+ def apply_lod(self, data: Any, profile: LODProfile = LODProfile.SUMMARY, project_identifier: Optional[str] = None) -> Any:
+ """
+ Applies LOD profile, returning clean minimized data for AI context.
+ """
+ return apply_lod(data, profile=profile, project_identifier=project_identifier)
+
+ def parse_ticket_id(self, ticket_id: str) -> Tuple[str, int]:
+ """
+ Parses a typical sequence ID (ENG-123) into project_identifier and sequence_id.
+ """
+ if not isinstance(ticket_id, str):
+ raise ValueError(f"Invalid ticket_id: expected string, got {type(ticket_id).__name__}")
+
+ parts = ticket_id.split("-")
+ if len(parts) != 2:
+ raise ValueError(f"Invalid ticket ID format: '{ticket_id}'. Expected format like 'ENG-123'.")
+
+ project_identifier = parts[0]
+ try:
+ issue_sequence = int(parts[1])
+ except ValueError:
+ raise ValueError(f"Invalid ticket sequence in '{ticket_id}'. Must be an integer (e.g., 123).")
+
+ return project_identifier, issue_sequence
+
+def with_lod(profile: LODProfile = LODProfile.SUMMARY) -> Callable[[T], T]:
+ """
+ Decorator for Journey tools to automatically apply an LOD profile to the output.
+ Can be applied to methods of JourneyBase subclasses or any function returning dict/list.
+ """
+ def decorator(func: T) -> T:
+ @wraps(func)
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
+ result = func(*args, **kwargs)
+ # Try to get project_identifier from kwargs if available for sequence injection
+ project_identifier = kwargs.get("project_identifier")
+
+ # If not in kwargs but it's a typical ticket_id arg
+ if not project_identifier and "ticket_id" in kwargs:
+ try:
+ project_identifier = kwargs["ticket_id"].split("-")[0]
+ except Exception:
+ pass
+
+ # Extract self if it's a method
+ if args and hasattr(args[0], "apply_lod"):
+ # Let the base class handle it
+ return args[0].apply_lod(result, profile=profile, project_identifier=project_identifier)
+ else:
+ return apply_lod(result, profile=profile, project_identifier=project_identifier)
+ return cast(T, wrapper)
+ return decorator
+
+
+def mcp_error_boundary(func: T) -> T:
+ """
+ Decorator to wrap MCP tool executions and catch unhandled exceptions,
+ returning them as a formatted string or dict to prevent 500 errors.
+ """
+ @wraps(func)
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
+ try:
+ return func(*args, **kwargs)
+ except Exception as e:
+ try:
+ import logging
+ import inspect
+ import traceback
+
+ error_details = traceback.format_exc()
+ logging.error(f"Error in {func.__name__}:\n{error_details}")
+
+ # Determine if this is a "handled" error that shouldn't pollute LLM context with stack traces
+ is_handled = isinstance(e, ValueError)
+
+ try:
+ from pydantic import ValidationError as PydanticValidationError
+ if isinstance(e, PydanticValidationError):
+ is_handled = True
+ except ImportError:
+ pass
+
+ try:
+ from pydantic_core import ValidationError as PydanticCoreValidationError
+ if isinstance(e, PydanticCoreValidationError):
+ is_handled = True
+ except ImportError:
+ pass
+
+ try:
+ from plane.errors.errors import HttpError
+ if isinstance(e, HttpError):
+ is_handled = True
+ except ImportError:
+ pass
+
+ error_msg = f"Error executing tool '{func.__name__}': {str(e)}"
+ if not is_handled:
+ error_msg += f"\n\nDetails: {error_details}"
+
+ # Try to determine return type safely
+ try:
+ sig = inspect.signature(func)
+ if sig.return_annotation == list or str(sig.return_annotation).startswith("list"):
+ return [{"error": error_msg}]
+ except Exception:
+ pass
+
+ return {"error": error_msg}
+ except Exception as inner_e:
+ # Absolute fallback if error handling itself fails
+ return {"error": f"CRITICAL: Tool {func.__name__} failed, and error handler also crashed: {str(inner_e)}"}
+ return cast(T, wrapper)
diff --git a/plane_mcp/journey/cache.py b/plane_mcp/journey/cache.py
new file mode 100644
index 0000000..e1fcc6a
--- /dev/null
+++ b/plane_mcp/journey/cache.py
@@ -0,0 +1,104 @@
+import os
+import json
+import time
+import tempfile
+import logging
+
+def get_cached_workspace_context(cache_ttl_seconds: int = 300) -> dict:
+ cache_dir = os.path.join(tempfile.gettempdir(), "plane_mcp")
+ os.makedirs(cache_dir, exist_ok=True)
+ cache_file = os.path.join(cache_dir, "workspace_context_cache.json")
+
+ context = {}
+ if os.path.exists(cache_file):
+ if time.time() - os.path.getmtime(cache_file) < cache_ttl_seconds:
+ try:
+ with open(cache_file, "r") as f:
+ context = json.load(f)
+ except Exception:
+ pass
+
+ if not context:
+ try:
+ from plane_mcp.client import get_plane_client_context
+ ctx = get_plane_client_context()
+ if ctx.client and ctx.workspace_slug:
+ response = ctx.client.projects.list(workspace_slug=ctx.workspace_slug)
+
+ projects = []
+ states_by_project = {}
+ labels_by_project = {}
+
+ for p in response.results:
+ projects.append({
+ "identifier": p.identifier,
+ "name": p.name,
+ "description": getattr(p, "description", "")
+ })
+ try:
+ s_res = ctx.client.states.list(workspace_slug=ctx.workspace_slug, project_id=p.id)
+ states_by_project[p.identifier] = [s.name for s in s_res.results if s.name]
+ except Exception:
+ states_by_project[p.identifier] = []
+
+ try:
+ l_res = ctx.client.labels.list(workspace_slug=ctx.workspace_slug, project_id=p.id)
+ labels_by_project[p.identifier] = [l.name for l in l_res.results if l.name]
+ except Exception:
+ labels_by_project[p.identifier] = []
+
+ all_states = sorted({s for ss in states_by_project.values() for s in ss})
+ all_labels = sorted({l for ll in labels_by_project.values() for l in ll})
+
+ context = {
+ "projects": projects,
+ "states_by_project": states_by_project,
+ "labels_by_project": labels_by_project,
+ "all_states": all_states,
+ "all_labels": all_labels,
+ "priorities": ["urgent", "high", "medium", "low", "none"]
+ }
+
+ with open(cache_file, "w") as f:
+ json.dump(context, f)
+ except Exception as e:
+ logging.getLogger(__name__).warning(f"Failed to fetch workspace context: {e}")
+ return {"error": f"Could not connect to Plane API: {e}. Check PLANE_API_KEY and PLANE_BASE_URL."}
+
+ return context
+
+def get_cached_project_slugs_docstring(cache_ttl_seconds: int = 300, full_descriptions: bool = False) -> str:
+ context = get_cached_workspace_context(cache_ttl_seconds)
+ projects = context.get("projects", [])
+
+ if not projects:
+ return "(e.g., 'PLANE' or 'TEST'). If you are unsure of a project_slug, make your best logical guess."
+
+ if full_descriptions:
+ lines = ["valid slugs:"]
+ for p in projects:
+ desc = f" - {p['description']}" if p.get("description") else f" - {p.get('name', '')}"
+ lines.append(f" * {p['identifier']}: {desc}")
+ return "\n".join(lines)
+ else:
+ slugs = [p["identifier"] for p in projects]
+ return f"valid slugs: {', '.join(slugs)}"
+
+def get_cached_states_string() -> str:
+ context = get_cached_workspace_context()
+ states = context.get("all_states", context.get("states", []))
+ if not states:
+ return "'In Progress', 'Backlog', 'Done'"
+ return ", ".join([f"'{s}'" for s in states])
+
+def get_cached_labels_string() -> str:
+ context = get_cached_workspace_context()
+ labels = context.get("all_labels", context.get("labels", []))
+ if not labels:
+ return "'bug', 'feature'"
+ return ", ".join([f"'{l}'" for l in labels])
+
+def get_cached_priorities_string() -> str:
+ context = get_cached_workspace_context()
+ priorities = context.get("priorities", ["urgent", "high", "medium", "low", "none"])
+ return ", ".join([f"'{p}'" for p in priorities])
diff --git a/plane_mcp/journey/lod.py b/plane_mcp/journey/lod.py
new file mode 100644
index 0000000..b395fa6
--- /dev/null
+++ b/plane_mcp/journey/lod.py
@@ -0,0 +1,164 @@
+"""Level of Detail (LOD) filtering system to strip verbose REST metadata."""
+from enum import Enum
+from typing import Any, Dict, List, Optional, Union
+from plane_mcp.client import get_plane_client_context
+from plane_mcp.resolver import EntityResolver
+from markdownify import markdownify
+
+class LODProfile(Enum):
+ SUMMARY = "summary"
+ STANDARD = "standard"
+ FULL = "full"
+
+# Summary: Minimum viable fields for AI context
+SUMMARY_FIELDS = {
+ "ticket_id", "name", "state", "priority", "assignees"
+}
+
+# Standard: Default ticket read fields (Key, Name, Details/description, priority, labels, state)
+STANDARD_FIELDS = {
+ "ticket_id", "name", "description_html", "priority", "labels", "state"
+}
+
+def inject_sequence_id(data: Dict[str, Any], project_identifier: Optional[str] = None) -> None:
+ """
+ Consistently inject sequence IDs (e.g., 'ENG-123') into the data dictionary
+ to enable zero-lookup chaining for subsequent AI actions.
+ """
+ if "ticket_id" in data:
+ return
+
+ proj_id = project_identifier or data.get("project_identifier")
+ if not proj_id and "project_detail" in data and isinstance(data["project_detail"], dict):
+ proj_id = data["project_detail"].get("identifier")
+
+ if proj_id and "sequence_id" in data:
+ data["ticket_id"] = f"{proj_id}-{data['sequence_id']}"
+
+def _hydrate_state(data: Dict[str, Any], project_identifier: Optional[str] = None) -> None:
+ """If state is a raw UUID, attempt to hydrate its name."""
+ state_val = data.get("state")
+ if state_val:
+ state_str = str(state_val)
+ if len(state_str) == 36 and "-" in state_str:
+ try:
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+
+ # Figure out project UUID
+ proj_id = None
+ if data.get("project"):
+ proj_id = str(data.get("project"))
+ elif data.get("project_id"):
+ proj_id = str(data.get("project_id"))
+ elif project_identifier:
+ proj_id = str(resolver.resolve_project(project_identifier))
+
+ if proj_id:
+ state_obj = client.states.retrieve(workspace_slug=workspace_slug, project_id=proj_id, state_id=state_str)
+ data["state"] = {"name": state_obj.name, "id": state_str}
+ except Exception:
+ pass
+
+def _clean_html(html_str: str) -> str:
+ if not html_str:
+ return ""
+ return markdownify(html_str, heading_style="ATX", bullet_list_marker="-").strip()
+
+def _apply_lod_to_dict(data: Dict[str, Any], profile: LODProfile, project_identifier: Optional[str] = None) -> Dict[str, Any]:
+ inject_sequence_id(data, project_identifier)
+ _hydrate_state(data, project_identifier)
+
+ result = {}
+ if profile == LODProfile.SUMMARY:
+ for key, value in data.items():
+ if key in SUMMARY_FIELDS:
+ out_key = "key" if key == "ticket_id" else key
+ if key == "state" and isinstance(value, dict) and "name" in value:
+ result[out_key] = value["name"]
+ else:
+ result[out_key] = value
+
+ # Always prefer human-readable state name from state_detail over raw UUID
+ if "state_detail" in data and isinstance(data["state_detail"], dict) and "name" in data["state_detail"]:
+ result["state"] = data["state_detail"]["name"]
+
+ if "ticket_id" in data:
+ result["key"] = data["ticket_id"]
+
+ elif profile == LODProfile.STANDARD:
+ # Standard: Default ticket read fields (Key, Name, Details, priority, labels, state)
+
+ # Priority mapping
+ if "ticket_id" in data:
+ result["ticket_id"] = data["ticket_id"]
+ if "name" in data:
+ result["name"] = data["name"]
+
+ # Description (convert HTML to markdown)
+ if "description_html" in data and isinstance(data["description_html"], str):
+ result["description"] = _clean_html(data["description_html"])
+ elif "description" in data:
+ result["description"] = data["description"]
+
+ if "priority" in data:
+ result["priority"] = data["priority"]
+
+ if "labels" in data and isinstance(data["labels"], list):
+ result["labels"] = [l.get("name") if isinstance(l, dict) and "name" in l else l for l in data["labels"]]
+
+ # State mapping
+ if "state" in data:
+ if isinstance(data["state"], dict) and "name" in data["state"]:
+ result["state"] = data["state"]["name"]
+ else:
+ result["state"] = data["state"]
+
+ # Backup state check
+ if "state" not in result and "state_detail" in data and isinstance(data["state_detail"], dict):
+ result["state"] = data["state_detail"].get("name")
+
+ elif profile == LODProfile.FULL:
+ result = data.copy()
+ if "description_html" in result and isinstance(result["description_html"], str):
+ result["description"] = _clean_html(result["description_html"])
+ del result["description_html"]
+
+ return result
+
+def apply_lod(
+ data: Union[Dict, List, Any],
+ profile: LODProfile = LODProfile.SUMMARY,
+ project_identifier: Optional[str] = None
+) -> Union[Dict, List]:
+ """
+ Applies the LOD filter to a dictionary, list of dictionaries, or Pydantic model
+ and returns a clean JSON-serializable structure.
+ """
+ # Convert Pydantic models or objects to dict
+ if hasattr(data, "model_dump"):
+ try:
+ data = data.model_dump(mode='json')
+ except TypeError:
+ data = data.model_dump()
+ elif hasattr(data, "dict"):
+ data = data.dict()
+ elif hasattr(data, "__dict__"):
+ data = data.__dict__
+
+ filtered_data = None
+ if isinstance(data, list):
+ filtered_data = [
+ _apply_lod_to_dict(
+ item.model_dump(mode='json') if hasattr(item, "model_dump") else (item.dict() if hasattr(item, "dict") else item),
+ profile,
+ project_identifier
+ ) if hasattr(item, "model_dump") or hasattr(item, "dict") or isinstance(item, dict) else item
+ for item in data
+ ]
+ elif isinstance(data, dict):
+ filtered_data = _apply_lod_to_dict(data, profile, project_identifier)
+ else:
+ filtered_data = data
+
+ return filtered_data
diff --git a/plane_mcp/journey/server.py b/plane_mcp/journey/server.py
new file mode 100644
index 0000000..400f88a
--- /dev/null
+++ b/plane_mcp/journey/server.py
@@ -0,0 +1,80 @@
+"""Plane MCP Server implementation."""
+
+import os
+
+from fastmcp import FastMCP
+from key_value.aio.stores.memory import MemoryStore
+from key_value.aio.stores.redis import RedisStore
+from mcp.types import Icon
+
+from plane_mcp.auth import PlaneHeaderAuthProvider, PlaneOAuthProvider
+from plane_mcp.journey.tools import register_tools
+
+
+def get_oauth_mcp(base_path: str = "/"):
+ import logging
+
+ logger = logging.getLogger(__name__)
+
+ redis_host = os.getenv("REDIS_HOST")
+ redis_port = os.getenv("REDIS_PORT")
+
+ if redis_host and redis_port:
+ logger.info("Using Redis for token storage")
+ client_storage = RedisStore(host=redis_host, port=int(redis_port))
+ else:
+ logger.warning(
+ "Using in-memory storage - tokens will be lost on restart! "
+ "Set REDIS_HOST and REDIS_PORT for production."
+ )
+ client_storage = MemoryStore()
+
+ # Initialize the MCP server
+ oauth_mcp = FastMCP(
+ "Plane Journey MCP Server",
+ icons=[Icon(src="https://plane.so/favicon.ico", alt="Plane Journey MCP Server")],
+ website_url="https://plane.so",
+ auth=PlaneOAuthProvider(
+ client_id=os.getenv("PLANE_OAUTH_PROVIDER_CLIENT_ID", "dummy_client_id"),
+ client_secret=os.getenv("PLANE_OAUTH_PROVIDER_CLIENT_SECRET", "dummy_client_secret"),
+ base_url=f"{os.getenv('PLANE_OAUTH_PROVIDER_BASE_URL', 'http://localhost:8211')}{base_path}",
+ plane_base_url=os.getenv("PLANE_BASE_URL", ""),
+ plane_internal_base_url=os.getenv("PLANE_INTERNAL_BASE_URL", ""),
+ client_storage=client_storage,
+ required_scopes=["read", "write"],
+ allowed_client_redirect_uris=[
+ # Localhost only for http (dynamic ports from MCP clients)
+ "http://localhost:*",
+ "http://localhost:*/*",
+ "http://127.0.0.1:*",
+ "http://127.0.0.1:*/*",
+ # Known MCP client custom protocol schemes
+ "cursor://*",
+ "vscode://*",
+ "vscode-insiders://*",
+ "windsurf://*",
+ "claude://*",
+ ],
+ ),
+ )
+ register_tools(oauth_mcp)
+ return oauth_mcp
+
+
+def get_header_mcp():
+ header_mcp = FastMCP(
+ "Plane MCP Server (header-http)",
+ auth=PlaneHeaderAuthProvider(
+ required_scopes=["read", "write"],
+ ),
+ )
+ register_tools(header_mcp)
+ return header_mcp
+
+
+def get_stdio_mcp():
+ stdio_mcp = FastMCP(
+ "Plane Journey MCP Server (stdio)",
+ )
+ register_tools(stdio_mcp)
+ return stdio_mcp
diff --git a/plane_mcp/journey/tools/__init__.py b/plane_mcp/journey/tools/__init__.py
new file mode 100644
index 0000000..26976c4
--- /dev/null
+++ b/plane_mcp/journey/tools/__init__.py
@@ -0,0 +1,12 @@
+"""Journey tools initialization."""
+
+from fastmcp import FastMCP
+from .read import register_read_tools
+from .create_update import register_create_update_tools
+from .workflow import register_workflow_tools
+
+def register_tools(mcp: FastMCP) -> None:
+ """Register all journey tools with the MCP server."""
+ register_read_tools(mcp)
+ register_create_update_tools(mcp)
+ register_workflow_tools(mcp)
diff --git a/plane_mcp/journey/tools/create_update.py b/plane_mcp/journey/tools/create_update.py
new file mode 100644
index 0000000..02a4abf
--- /dev/null
+++ b/plane_mcp/journey/tools/create_update.py
@@ -0,0 +1,265 @@
+"""Create and Update tools for Journey Endpoint."""
+
+from fastmcp import FastMCP
+from plane_mcp.journey.cache import get_cached_project_slugs_docstring
+from plane_mcp.client import get_plane_client_context
+from plane_mcp.resolver import EntityResolver
+from plane_mcp.journey.base import JourneyBase, mcp_error_boundary
+from plane_mcp.journey.lod import LODProfile
+from plane_mcp.sanitize import sanitize_html
+from plane.models.work_items import CreateWorkItem, UpdateWorkItem
+from plane.models.labels import CreateLabel
+from plane.models.cycles import CreateCycle
+from plane.errors.errors import HttpError
+import logging
+
+logger = logging.getLogger(__name__)
+
+MAX_NEW_LABELS_PER_REQUEST = 3
+
+
+class CreateUpdateJourney(JourneyBase):
+ def _resolve_or_create_labels(self, project_id: str, label_names: list[str]) -> list[str]:
+ client, workspace_slug = get_plane_client_context()
+ existing = client.labels.list(workspace_slug=workspace_slug, project_id=project_id).results
+ name_to_id = {l.name.lower(): l.id for l in existing if l.name}
+
+ new_labels_needed = [n for n in label_names if n.lower() not in name_to_id]
+ if len(new_labels_needed) > MAX_NEW_LABELS_PER_REQUEST:
+ raise ValueError(
+ f"Too many new labels requested ({len(new_labels_needed)}). "
+ f"Maximum {MAX_NEW_LABELS_PER_REQUEST} new labels can be auto-created per request. "
+ f"Unknown labels: {new_labels_needed}"
+ )
+
+ label_ids = []
+ for name in label_names:
+ key = name.lower()
+ if key in name_to_id:
+ label_ids.append(name_to_id[key])
+ else:
+ new_label = client.labels.create(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ data=CreateLabel(name=name, color="#000000")
+ )
+ label_ids.append(new_label.id)
+ name_to_id[key] = new_label.id
+ return label_ids
+
+ def _resolve_or_create_cycle(self, project_id: str, cycle_name: str) -> str | None:
+ client, workspace_slug = get_plane_client_context()
+ existing = client.cycles.list(workspace_slug=workspace_slug, project_id=project_id).results
+ for c in existing:
+ if c.name and c.name.lower() == cycle_name.lower():
+ return c.id
+
+ # Create missing cycle
+ me = client.users.get_me()
+ user_id = me.id if hasattr(me, "id") else None
+
+ import datetime
+ start_dt = datetime.date.today().isoformat()
+ end_dt = (datetime.date.today() + datetime.timedelta(days=14)).isoformat()
+ try:
+ new_cycle = client.cycles.create(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ data=CreateCycle(name=cycle_name, project_id=project_id, owned_by=user_id, start_date=start_dt, end_date=end_dt)
+ )
+ return new_cycle.id
+ except HttpError as e:
+ if getattr(e, "status_code", getattr(getattr(e, "response", None), "status_code", None)) == 400:
+ logger.warning(f"Cycles appear to be disabled for project {project_id}. Skipping cycle creation.")
+ return None
+ raise
+
+ def create_ticket(
+ self,
+ title: str,
+ project_slug: str,
+ description: str | None = None,
+ state_name: str | None = None,
+ labels: list[str] | None = None,
+ cycle_name: str | None = None
+ ) -> dict:
+
+ if project_slug.lower() == 'help':
+ from plane_mcp.journey.cache import get_cached_workspace_context
+ return {"status": "help", "message": "Here are the valid options for this workspace.", "options": get_cached_workspace_context(0)}
+
+ project_id = self.resolver.resolve_project(project_slug)
+ client, workspace_slug = get_plane_client_context()
+
+ state_id = self.resolver.resolve_state(project_slug, state_name) if state_name else None
+ label_ids = self._resolve_or_create_labels(project_id, labels) if labels else None
+
+ cycle_id = None
+ if cycle_name:
+ cycle_id = self._resolve_or_create_cycle(project_id, cycle_name)
+
+ data = CreateWorkItem(
+ name=title,
+ description_html=sanitize_html(description),
+ state=state_id,
+ labels=label_ids
+ )
+
+ new_ticket = client.work_items.create(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ data=data
+ )
+
+ if cycle_id and new_ticket.id:
+ client.cycles.add_work_items(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ cycle_id=cycle_id,
+ issue_ids=[new_ticket.id]
+ )
+
+ return {"key": f"{project_slug}-{new_ticket.sequence_id}"}
+
+ def update_ticket(
+ self,
+ ticket_id: str,
+ new_title: str | None = None,
+ append_text: str | None = None,
+ append_after_snippet: str | None = None,
+ replace_text: str | None = None,
+ replace_target_snippet: str | None = None,
+ comment: str | None = None
+ ) -> dict:
+ work_item_id = self.resolver.resolve_ticket(ticket_id)
+ project_identifier, _ = self.parse_ticket_id(ticket_id)
+ project_id = self.resolver.resolve_project(project_identifier)
+
+ client, workspace_slug = get_plane_client_context()
+
+ # Retrieve using internal _get to bypass Pydantic ValidationError when
+ # the API returns label UUIDs as strings instead of Label objects
+ current = client.work_items._get(
+ f"{workspace_slug}/projects/{project_id}/work-items/{work_item_id}"
+ )
+
+ title_changed = False
+ final_title = current.get("name", "")
+ if new_title and new_title != current.get("name"):
+ final_title = new_title
+ title_changed = True
+
+ desc_changed = False
+ final_desc = current.get("description_html") or ""
+
+ if replace_text is not None:
+ if not replace_target_snippet:
+ return {"status": "error", "message": "You must provide 'replace_target_snippet' to specify exactly which text to replace."}
+
+ occurrences = final_desc.count(replace_target_snippet)
+ if occurrences == 0:
+ return {"status": "error", "message": f"The snippet '{replace_target_snippet}' was not found in the description. Ensure you matched the exact text, spaces, and casing."}
+ if occurrences > 1:
+ return {"status": "error", "message": f"The snippet '{replace_target_snippet}' matched multiple times. Please provide a longer, more specific snippet to uniquely identify the text to replace."}
+
+ final_desc = final_desc.replace(replace_target_snippet, replace_text)
+ desc_changed = True
+
+ if append_text is not None:
+ if append_after_snippet:
+ occurrences = final_desc.count(append_after_snippet)
+ if occurrences == 0:
+ return {"status": "error", "message": f"The snippet '{append_after_snippet}' was not found in the description. Ensure you matched the exact text."}
+ if occurrences > 1:
+ return {"status": "error", "message": f"The snippet '{append_after_snippet}' matched multiple times. Please provide a longer snippet."}
+
+ final_desc = final_desc.replace(append_after_snippet, f"{append_after_snippet}
{append_text}")
+ else:
+ if final_desc:
+ final_desc = f"{final_desc}
{append_text}"
+ else:
+ final_desc = append_text
+ desc_changed = True
+
+ if title_changed or desc_changed:
+ data = UpdateWorkItem(
+ name=final_title,
+ description_html=sanitize_html(final_desc)
+ )
+
+ client.work_items.update(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ work_item_id=work_item_id,
+ data=data
+ )
+
+ if comment:
+ from plane.models.work_items import CreateWorkItemComment
+ safe_comment = sanitize_html(f"{comment}
")
+ client.work_items.comments.create(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ work_item_id=work_item_id,
+ data=CreateWorkItemComment(comment_html=safe_comment)
+ )
+
+ if not title_changed and not desc_changed and not comment:
+ return {"status": "warning", "message": "No changes were provided to update_ticket."}
+
+ return {"key": ticket_id, "status": "success", "message": "Ticket updated successfully."}
+
+def register_create_update_tools(mcp: FastMCP) -> None:
+ def create_ticket(
+ title: str,
+ project_slug: str,
+ description: str | None = None,
+ labels: list[str] | None = None,
+ cycle_name: str | None = None,
+ ) -> dict:
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+ journey = CreateUpdateJourney(resolver)
+ return journey.create_ticket(title=title, project_slug=project_slug, description=description, labels=labels, cycle_name=cycle_name)
+
+ create_ticket.__doc__ = """
+ Create a new ticket with automatic resolution of labels and cycles.
+ Missing labels or cycles will be automatically created.
+ The ticket is automatically placed in the default Backlog state.
+
+ Args:
+ title: Title of the ticket.
+ project_slug: The Plane project identifier (e.g., 'PLANE'). To discover valid project slugs, call this tool with project_slug='help'.
+ description: Optional detailed markdown description of the ticket.
+ labels: List of label names.
+ cycle_name: Name of the cycle to add this ticket to.
+ """
+ create_ticket = mcp.tool()(mcp_error_boundary(create_ticket))
+
+ @mcp.tool()
+ @mcp_error_boundary
+ def update_ticket(
+ ticket_id: str,
+ new_title: str | None = None,
+ append_text: str | None = None,
+ append_after_snippet: str | None = None,
+ replace_text: str | None = None,
+ replace_target_snippet: str | None = None,
+ comment: str | None = None
+ ) -> dict:
+ """
+ Update a ticket's title, description, or add a comment. Features smart targeting to avoid JSON escaping errors.
+
+ Args:
+ ticket_id: The globally unique, human-readable identifier (e.g., ENG-123).
+ new_title: Completely replaces the ticket title.
+ append_text: Text to append to the description.
+ append_after_snippet: If populated, 'append_text' will be inserted immediately after this exact snippet. If blank, 'append_text' is added to the very end of the description.
+ replace_text: The new text that will replace a snippet.
+ replace_target_snippet: The exact existing text to replace. Required if 'replace_text' is used.
+ comment: Adds a new comment to the ticket thread.
+ """
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+ journey = CreateUpdateJourney(resolver)
+ return journey.update_ticket(ticket_id, new_title, append_text, append_after_snippet, replace_text, replace_target_snippet, comment)
diff --git a/plane_mcp/journey/tools/read.py b/plane_mcp/journey/tools/read.py
new file mode 100644
index 0000000..b35083b
--- /dev/null
+++ b/plane_mcp/journey/tools/read.py
@@ -0,0 +1,246 @@
+"""Read tools for Journey Endpoint."""
+
+from typing import Literal
+from fastmcp import FastMCP
+from plane_mcp.journey.cache import get_cached_project_slugs_docstring
+from plane_mcp.client import get_plane_client_context
+from plane_mcp.resolver import EntityResolver
+from plane_mcp.journey.base import JourneyBase, mcp_error_boundary
+from plane_mcp.journey.lod import LODProfile
+from plane.models.query_params import RetrieveQueryParams
+
+class ReadJourney(JourneyBase):
+ def search_tickets(
+ self,
+ project_slug: str,
+ query: str | None = None,
+ labels: list[str] | None = None,
+ priority: list[str] | None = None,
+ states: list[str] | None = None,
+ assignees: list[str] | None = None,
+ limit: int = 50,
+ cursor: str | None = None,
+ lod: Literal["summary", "standard", "full"] = "standard"
+ ) -> dict:
+ import json
+
+
+ if project_slug.lower() == 'help':
+ from plane_mcp.journey.cache import get_cached_workspace_context
+ opts = get_cached_workspace_context(0).copy()
+ opts.pop("all_states", None)
+ opts.pop("all_labels", None)
+ return {"status": "help", "message": "Here are the valid options for this workspace.", "options": opts}
+
+ project_id = self.resolver.resolve_project(project_slug)
+ client, workspace_slug = get_plane_client_context()
+
+ query_params = {
+ "per_page": 100, # Fetch max per page for faster deep searching
+ "expand": "assignees,labels,state"
+ }
+
+ if cursor:
+ query_params["cursor"] = cursor
+ if priority:
+ query_params["priority"] = ",".join([p.lower() for p in priority])
+
+ if states:
+ state_ids = [self.resolver.resolve_state(project_slug, s) for s in states]
+ query_params["state"] = ",".join(state_ids)
+
+ if labels:
+ unresolved_labels = []
+ try:
+ existing_labels = client.labels.list(workspace_slug=workspace_slug, project_id=project_id).results
+ name_to_id = {l.name.lower(): l.id for l in existing_labels if l.name}
+ label_ids = []
+ for name in labels:
+ if name.lower() in name_to_id:
+ label_ids.append(name_to_id[name.lower()])
+ else:
+ unresolved_labels.append(name)
+ if label_ids:
+ query_params["labels"] = ",".join(label_ids)
+ except Exception as e:
+ import logging
+ logging.getLogger(__name__).warning(f"Failed to resolve labels: {e}")
+
+ if assignees:
+ assignee_ids = []
+ for a in assignees:
+ if a.lower() == "me":
+ try:
+ me = client.users.get_me()
+ if hasattr(me, "id"):
+ assignee_ids.append(me.id)
+ except Exception:
+ pass
+ if assignee_ids:
+ query_params["assignees"] = ",".join(assignee_ids)
+
+ from plane.models.work_items import PaginatedWorkItemResponse
+
+ matched_results = []
+ current_cursor = cursor
+ next_cursor_to_return = None
+
+ query_lower = query.lower() if query else None
+
+ max_pages_to_fetch = 5
+ pages_fetched = 0
+
+ if limit <= 0:
+ return {"results": [], "next_cursor": None, "prev_cursor": None}
+
+ # Loop to pull pages and deep-search until we hit the requested limit or run out of pages
+ while len(matched_results) < limit and pages_fetched < max_pages_to_fetch:
+ if current_cursor:
+ query_params["cursor"] = current_cursor
+
+ response = client.work_items._get(
+ f"{workspace_slug}/projects/{project_id}/work-items",
+ params=query_params
+ )
+ pages_fetched += 1
+
+ paginated = PaginatedWorkItemResponse.model_validate(response)
+
+ for item in paginated.results:
+ if query_lower:
+ # Deep JSON string match
+ item_json = json.dumps(item.model_dump(), default=str).lower()
+ if query_lower in item_json:
+ matched_results.append(item)
+ else:
+ matched_results.append(item)
+
+ if len(matched_results) >= limit:
+ break
+
+ # If we don't have a next page, or we've hit our limit, break the loop
+ if not hasattr(paginated, "next_cursor") or not paginated.next_cursor:
+ break
+
+ current_cursor = paginated.next_cursor
+ next_cursor_to_return = current_cursor
+
+ try:
+ profile = LODProfile(lod)
+ except ValueError:
+ profile = LODProfile.STANDARD
+
+ transformed_results = self.apply_lod(matched_results, profile=profile, project_identifier=project_slug)
+
+ result = {
+ "results": transformed_results,
+ "next_cursor": next_cursor_to_return,
+ "prev_cursor": paginated.prev_cursor if hasattr(paginated, "prev_cursor") else None
+ }
+ if labels and unresolved_labels:
+ result["warnings"] = [f"Label not found and filter was skipped: {', '.join(unresolved_labels)}"]
+ return result
+
+ def read_ticket(self, ticket_id: str, lod: Literal["summary", "standard", "full"] = "standard", comments: bool = False) -> dict:
+ work_item_id = self.resolver.resolve_ticket(ticket_id)
+ project_identifier, issue_sequence = self.parse_ticket_id(ticket_id)
+ project_id = self.resolver.resolve_project(project_identifier)
+
+ client, workspace_slug = get_plane_client_context()
+
+ params = RetrieveQueryParams(expand="assignees,labels,state")
+
+ result = client.work_items.retrieve(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ work_item_id=work_item_id,
+ params=params
+ )
+
+ try:
+ profile = LODProfile(lod)
+ except ValueError:
+ profile = LODProfile.STANDARD
+
+ transformed = self.apply_lod(result, profile=profile, project_identifier=project_identifier)
+
+ if comments:
+ try:
+ comments_resp = client.work_items.comments.list(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ work_item_id=work_item_id
+ )
+ formatted_comments = []
+ for c in comments_resp.results:
+ date_str = str(c.created_at)[:10] if hasattr(c, 'created_at') and c.created_at else "YYYY-MM-DD"
+
+ username = "user"
+ # Try to extract the best available identifier
+ if hasattr(c, 'actor_detail') and c.actor_detail:
+ username = getattr(c.actor_detail, 'display_name', getattr(c.actor_detail, 'username', username))
+ elif hasattr(c, 'actor') and c.actor:
+ username = str(c.actor)
+
+ text = getattr(c, 'comment_stripped', '') or getattr(c, 'comment_html', '')
+ formatted_comments.append(f"\n{date_str}-@{username}:\n{text}")
+
+ if formatted_comments:
+ transformed["comments"] = "".join(formatted_comments)
+ except Exception as e:
+ import logging
+ logging.getLogger(__name__).warning(f"Failed to fetch comments for {ticket_id}: {e}")
+
+ return transformed
+
+
+def register_read_tools(mcp: FastMCP) -> None:
+
+ def search_tickets(
+ project_slug: str,
+ query: str | None = None,
+ labels: list[str] | None = None,
+ priority: list[str] | None = None,
+ states: list[str] | None = None,
+ assignees: list[str] | None = None,
+ limit: int = 50,
+ cursor: str | None = None,
+ lod: Literal["summary", "standard", "full"] = "standard"
+ ) -> dict:
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+ journey = ReadJourney(resolver)
+ return journey.search_tickets(project_slug, query, labels, priority, states, assignees, limit, cursor, lod)
+
+ search_tickets.__doc__ = """
+ Search for issues. You can use standard filters or a text query.
+ If the desired result is not in the current page, call again with the provided next_cursor.
+
+ Args:
+ project_slug: The Plane project identifier (e.g., 'PLANE' or 'TEST'). To discover valid project slugs, states, and labels, call this tool with project_slug='help'.
+ query: Free-form text search query.
+ labels: List of label names to filter by (e.g., ['bug', 'feature']).
+ priority: List of priorities to filter by (e.g., ['urgent', 'high', 'medium', 'low', 'none']).
+ states: List of state names to filter by (e.g., ['In Progress', 'Backlog', 'Done']).
+ assignees: List of usernames or 'me' to filter by.
+ limit: Maximum number of results to return (max 100). Default is 50.
+ cursor: Pagination cursor for getting the next set of results.
+ lod: Level of Detail profile ("summary", "standard", or "full"). Default is "standard".
+ """
+ search_tickets = mcp.tool()(mcp_error_boundary(search_tickets))
+
+ @mcp.tool()
+ @mcp_error_boundary
+ def read_ticket(ticket_id: str, lod: Literal["summary", "standard", "full"] = "standard", comments: bool = False) -> dict:
+ """
+ Read the details of a single ticket.
+
+ Args:
+ ticket_id: The globally unique, human-readable identifier (e.g., ENG-123). The system automatically resolves the project and issue routing from this prefix.
+ lod: Level of Detail profile ("summary", "standard", or "full"). Default is "standard".
+ comments: If true, fetches and appends the ticket's comments to the result.
+ """
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+ journey = ReadJourney(resolver)
+ return journey.read_ticket(ticket_id, lod, comments)
diff --git a/plane_mcp/journey/tools/workflow.py b/plane_mcp/journey/tools/workflow.py
new file mode 100644
index 0000000..47a3bfe
--- /dev/null
+++ b/plane_mcp/journey/tools/workflow.py
@@ -0,0 +1,205 @@
+"""Workflow tools for Journey Endpoint."""
+
+from collections import defaultdict
+from fastmcp import FastMCP
+from plane_mcp.journey.cache import get_cached_project_slugs_docstring
+from plane.errors.errors import HttpError
+from plane_mcp.client import get_plane_client_context
+from plane_mcp.resolver import EntityResolver
+from plane_mcp.journey.base import JourneyBase, mcp_error_boundary
+from plane_mcp.journey.lod import LODProfile
+from plane_mcp.sanitize import sanitize_html
+from plane.models.work_items import UpdateWorkItem, CreateWorkItemComment
+from plane.models.cycles import CreateCycle
+from datetime import datetime, timedelta
+import logging
+
+logger = logging.getLogger(__name__)
+
+class WorkflowJourney(JourneyBase):
+ def _resolve_or_create_cycle(self, project_id: str, cycle_name: str) -> str | None:
+ client, workspace_slug = get_plane_client_context()
+ try:
+ existing = client.cycles.list(workspace_slug=workspace_slug, project_id=project_id).results
+ for c in existing:
+ if c.name and c.name.lower() == cycle_name.lower():
+ return c.id
+
+ me = client.users.get_me()
+ user_id = me.id if hasattr(me, "id") else None
+
+ start_date = datetime.now()
+ end_date = start_date + timedelta(days=14)
+
+ new_cycle = client.cycles.create(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ data=CreateCycle(
+ name=cycle_name,
+ project_id=project_id,
+ owned_by=user_id,
+ start_date=start_date.strftime("%Y-%m-%d"),
+ end_date=end_date.strftime("%Y-%m-%d")
+ )
+ )
+ return new_cycle.id
+ except HttpError as e:
+ # If the project has the Cycles module disabled, the API returns 400 Bad Request.
+ # We catch this so the rest of the workflow (like status transitions) can still proceed.
+ if getattr(e, "status_code", getattr(getattr(e, "response", None), "status_code", None)) == 400:
+ logger.warning(f"Cycles appear to be disabled for project {project_id}. Skipping cycle creation.")
+ return None
+ raise
+
+ def transition_ticket(self, ticket_id: str, state_name: str) -> dict:
+ work_item_id = self.resolver.resolve_ticket(ticket_id)
+ project_identifier, _ = self.parse_ticket_id(ticket_id)
+ project_id = self.resolver.resolve_project(project_identifier)
+ state_id = self.resolver.resolve_state(project_identifier, state_name)
+
+ client, workspace_slug = get_plane_client_context()
+
+ updated = client.work_items.update(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ work_item_id=work_item_id,
+ data=UpdateWorkItem(state=state_id)
+ )
+
+ return self.apply_lod(updated, profile=LODProfile.SUMMARY, project_identifier=project_identifier)
+
+ def begin_work(self, ticket_ids: list[str], cycle_name: str) -> dict:
+ """Batch operation to add multiple tickets to a cycle and potentially transition them."""
+ client, workspace_slug = get_plane_client_context()
+
+ # Group tickets by project
+ project_to_tickets = defaultdict(list)
+ for t_id in ticket_ids:
+ proj_id_str, _ = self.parse_ticket_id(t_id)
+ w_id = self.resolver.resolve_ticket(t_id)
+ project_to_tickets[proj_id_str].append(w_id)
+
+ results = {}
+ for proj_identifier, w_ids in project_to_tickets.items():
+ proj_id = self.resolver.resolve_project(proj_identifier)
+ cycle_id = self._resolve_or_create_cycle(proj_id, cycle_name)
+
+ msg = f"Processed {len(w_ids)} tickets."
+
+ if cycle_id:
+ # Add all tickets in this project to the cycle
+ client.cycles.add_work_items(
+ workspace_slug=workspace_slug,
+ project_id=proj_id,
+ cycle_id=cycle_id,
+ issue_ids=w_ids
+ )
+ msg += f" Added to cycle '{cycle_name}'."
+ else:
+ msg += " Note: Cycles are disabled for this project, skipped cycle assignment."
+
+ # Optionally transition them to In Progress if such state exists
+ try:
+ state_id = self.resolver.resolve_state(proj_identifier, "In Progress")
+ for w_id in w_ids:
+ client.work_items.update(
+ workspace_slug=workspace_slug,
+ project_id=proj_id,
+ work_item_id=w_id,
+ data=UpdateWorkItem(state=state_id)
+ )
+ except ValueError:
+ # "In Progress" state not found, skip automatic transition
+ pass
+
+ results[proj_identifier] = msg
+
+ return {"status": "success", "details": results}
+
+ def complete_work(self, ticket_id: str, comment: str) -> dict:
+ work_item_id = self.resolver.resolve_ticket(ticket_id)
+ project_identifier, _ = self.parse_ticket_id(ticket_id)
+ project_id = self.resolver.resolve_project(project_identifier)
+
+ client, workspace_slug = get_plane_client_context()
+
+ # Add comment
+ safe_comment = sanitize_html(f"{comment}
")
+ client.work_items.comments.create(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ work_item_id=work_item_id,
+ data=CreateWorkItemComment(comment_html=safe_comment)
+ )
+
+ # Try to transition to Done or Completed
+ state_id = None
+ for state_name in ["Done", "Completed"]:
+ try:
+ state_id = self.resolver.resolve_state(project_identifier, state_name)
+ break
+ except ValueError:
+ continue
+
+ if state_id:
+ updated = client.work_items.update(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ work_item_id=work_item_id,
+ data=UpdateWorkItem(state=state_id)
+ )
+
+ return self.apply_lod(updated, profile=LODProfile.SUMMARY, project_identifier=project_identifier)
+
+ return {"status": "partial", "message": "Comment added, but no 'Done' or 'Completed' state found. Call transition_ticket explicitly to close this ticket."}
+
+
+def register_workflow_tools(mcp: FastMCP) -> None:
+ def transition_ticket(ticket_id: str, state_name: str) -> dict:
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+ journey = WorkflowJourney(resolver)
+ return journey.transition_ticket(ticket_id, state_name)
+
+ transition_ticket.__doc__ = """
+ Transition a ticket to a new state.
+ Use this primitive for granular edge-case routing, such as moving a ticket to Canceled, Duplicate, or custom review states.
+
+ Args:
+ ticket_id: The globally unique, human-readable identifier (e.g., ENG-123).
+ state_name: The name of the state to transition to (e.g. 'In Progress').
+ """
+ transition_ticket = mcp.tool()(mcp_error_boundary(transition_ticket))
+
+ @mcp.tool()
+ @mcp_error_boundary
+ def begin_work(ticket_ids: list[str], cycle_name: str) -> dict:
+ """
+ Add multiple tickets to a cycle (creating it if missing) and attempt to transition them to 'In Progress'.
+ Supports batch operations across multiple tickets and potentially multiple projects.
+ Use this macro as your primary method for standard workflow progression (starting work).
+
+ Args:
+ ticket_ids: List of globally unique, human-readable identifiers (e.g. ['ENG-123', 'ENG-124']). The system automatically resolves the project and issue routing from these prefixes.
+ cycle_name: The name of the cycle to add tickets to. If you are unsure, make your best logical guess.
+ """
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+ journey = WorkflowJourney(resolver)
+ return journey.begin_work(ticket_ids, cycle_name)
+
+ @mcp.tool()
+ @mcp_error_boundary
+ def complete_work(ticket_id: str, comment: str) -> dict:
+ """
+ Add a completion comment to a ticket and attempt to transition it to a 'Done' or 'Completed' state.
+ Use this macro as your primary method for standard workflow progression (finishing work).
+
+ Args:
+ ticket_id: The globally unique, human-readable identifier (e.g., ENG-123). The system automatically resolves the project and issue routing from this prefix; no separate project context is needed.
+ comment: The text to add as a comment.
+ """
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+ journey = WorkflowJourney(resolver)
+ return journey.complete_work(ticket_id, comment)
diff --git a/plane_mcp/resolver.py b/plane_mcp/resolver.py
new file mode 100644
index 0000000..5ff1c8d
--- /dev/null
+++ b/plane_mcp/resolver.py
@@ -0,0 +1,160 @@
+"""Stateless EntityResolver for mapping human-readable identifiers to Plane UUIDs."""
+
+from typing import Dict, List, Optional
+from plane import PlaneClient
+from plane.models.query_params import PaginatedQueryParams
+from plane.errors.errors import HttpError
+
+
+class EntityResolutionError(ValueError):
+ """Exception raised when an entity cannot be resolved, containing actionable options."""
+ def __init__(self, message: str, available_options: Optional[List[str]] = None):
+ super().__init__(message)
+ self.available_options = available_options or []
+
+
+import time
+
+# Global caches to prevent N+1 queries across tool invocations in the same process
+_GLOBAL_PROJECT_CACHE: Dict[str, str] = {}
+_GLOBAL_STATE_CACHE: Dict[str, Dict[str, str]] = {}
+_GLOBAL_WORK_ITEM_CACHE: Dict[str, str] = {}
+_CACHE_LAST_UPDATED: Dict[str, float] = {"projects": 0.0, "states": 0.0, "work_items": 0.0}
+_CACHE_TTL_SECONDS = 300 # 5 minutes
+
+class EntityResolver:
+ """
+ Stateless resolver for mapping human-readable identifiers to Plane UUIDs.
+ Implements global caching to prevent N+1 API calls and returns Actionable Errors
+ when an entity cannot be found.
+ """
+
+ def __init__(self, client: PlaneClient, workspace_slug: str):
+ self.client = client
+ self.workspace_slug = workspace_slug
+
+ # We now use the global caches
+ self._project_cache = _GLOBAL_PROJECT_CACHE
+ self._state_cache = _GLOBAL_STATE_CACHE
+ self._work_item_cache = _GLOBAL_WORK_ITEM_CACHE
+
+ def resolve_project(self, identifier_or_slug: str) -> str:
+ """
+ Resolve a project identifier (e.g. 'ENG') or slug to its UUID.
+ """
+ key = identifier_or_slug.upper()
+ if key in self._project_cache and (time.time() - _CACHE_LAST_UPDATED["projects"] < _CACHE_TTL_SECONDS):
+ return self._project_cache[key]
+
+ # Fetch all projects to cache and find
+ try:
+ projects_resp = self.client.projects.list(
+ workspace_slug=self.workspace_slug,
+ params=PaginatedQueryParams(per_page=100)
+ )
+ available = []
+ for p in projects_resp.results:
+ self._project_cache[p.identifier.upper()] = p.id
+ self._project_cache[p.name.upper()] = p.id
+ available.append(p.identifier)
+ if hasattr(p, 'slug') and p.slug:
+ self._project_cache[p.slug.upper()] = p.id
+
+ _CACHE_LAST_UPDATED["projects"] = time.time()
+
+ if key in self._project_cache:
+ return self._project_cache[key]
+
+ raise EntityResolutionError(
+ f"Project '{identifier_or_slug}' not found. Available projects: {', '.join(available)}",
+ available_options=available
+ )
+ except Exception as e:
+ if isinstance(e, EntityResolutionError):
+ raise
+ raise RuntimeError(f"Failed to fetch projects: {str(e)}")
+
+ def resolve_state(self, project_identifier: str, state_name: str) -> str:
+ """
+ Resolve a state name (e.g. 'In Progress') to its UUID for a specific project.
+ """
+ project_id = self.resolve_project(project_identifier)
+ key = state_name.lower()
+
+ if project_id in self._state_cache and key in self._state_cache[project_id] and (time.time() - _CACHE_LAST_UPDATED["states"] < _CACHE_TTL_SECONDS):
+ return self._state_cache[project_id][key]
+
+ try:
+ # We don't have the states in cache for this project, fetch them
+ states_resp = self.client.states.list(
+ workspace_slug=self.workspace_slug,
+ project_id=project_id
+ )
+
+ if project_id not in self._state_cache:
+ self._state_cache[project_id] = {}
+
+ available = []
+ for s in states_resp.results:
+ self._state_cache[project_id][s.name.lower()] = s.id
+ available.append(s.name)
+
+ _CACHE_LAST_UPDATED["states"] = time.time()
+
+ if key in self._state_cache[project_id]:
+ return self._state_cache[project_id][key]
+
+ raise EntityResolutionError(
+ f"State '{state_name}' not found for project '{project_identifier}'. Available states: {', '.join(available)}",
+ available_options=available
+ )
+ except Exception as e:
+ if isinstance(e, EntityResolutionError):
+ raise
+ raise RuntimeError(f"Failed to fetch states for project {project_identifier}: {str(e)}")
+
+ def resolve_ticket(self, ticket_id: str) -> str:
+ """
+ Resolve a ticket ID (e.g. 'ENG-123') to its work_item UUID.
+ """
+ key = ticket_id.upper()
+ if key in self._work_item_cache and (time.time() - _CACHE_LAST_UPDATED["work_items"] < _CACHE_TTL_SECONDS):
+ return self._work_item_cache[key]
+
+ parts = key.split('-')
+ if len(parts) != 2:
+ raise ValueError(f"Invalid ticket ID format: '{ticket_id}'. Expected format like 'ENG-123'.")
+
+ project_identifier = parts[0]
+ try:
+ issue_sequence = int(parts[1])
+ except ValueError:
+ raise ValueError(f"Invalid ticket sequence in '{ticket_id}'. Must be an integer (e.g., 123).")
+
+ # Resolve project to ensure the project exists and is valid
+ self.resolve_project(project_identifier)
+
+ try:
+ # Bypass Pydantic validation by using internal _get, as the WorkItemDetail
+ # model currently has an issue with assignees parsing (list of str vs UserLite).
+ response = self.client.work_items._get(
+ f"{self.workspace_slug}/work-items/{project_identifier}-{issue_sequence}"
+ )
+ work_item_id = response.get('id')
+ if not work_item_id:
+ raise EntityResolutionError(f"Ticket '{ticket_id}' not found.")
+
+ self._work_item_cache[key] = work_item_id
+ _CACHE_LAST_UPDATED["work_items"] = time.time()
+ return work_item_id
+ except HttpError as e:
+ if getattr(e, "status_code", None) == 404:
+ raise EntityResolutionError(
+ f"Ticket '{ticket_id}' not found.",
+ available_options=[]
+ )
+ raise RuntimeError(f"Failed to retrieve ticket '{ticket_id}': {str(e)}")
+ except Exception as e:
+ if isinstance(e, EntityResolutionError):
+ raise
+ raise RuntimeError(f"Failed to retrieve ticket '{ticket_id}': {str(e)}")
diff --git a/plane_mcp/sanitize.py b/plane_mcp/sanitize.py
new file mode 100644
index 0000000..05526ce
--- /dev/null
+++ b/plane_mcp/sanitize.py
@@ -0,0 +1,44 @@
+"""HTML sanitization utilities for Plane MCP Server.
+
+Provides safe HTML cleaning to prevent stored XSS attacks when accepting
+user-provided HTML content (descriptions, comments) before sending to the
+Plane API.
+"""
+
+import nh3
+
+ALLOWED_TAGS = {
+ "p", "br", "strong", "b", "em", "i", "u", "s", "del",
+ "h1", "h2", "h3", "h4", "h5", "h6",
+ "ul", "ol", "li",
+ "blockquote", "pre", "code",
+ "a", "img",
+ "table", "thead", "tbody", "tr", "th", "td",
+ "hr", "span", "div", "sub", "sup",
+}
+
+ALLOWED_ATTRIBUTES = {
+ "a": {"href", "title", "target"},
+ "img": {"src", "alt", "title", "width", "height"},
+ "td": {"colspan", "rowspan"},
+ "th": {"colspan", "rowspan"},
+ "span": {"class"},
+ "div": {"class"},
+ "code": {"class"},
+ "pre": {"class"},
+}
+
+
+def sanitize_html(html: str | None) -> str | None:
+ if html is None:
+ return None
+ if not html:
+ return html
+
+ return nh3.clean(
+ html,
+ tags=ALLOWED_TAGS,
+ attributes=ALLOWED_ATTRIBUTES,
+ link_rel="noopener noreferrer",
+ url_schemes={"http", "https", "mailto"},
+ )
diff --git a/plane_mcp/tools/generated_core.py b/plane_mcp/tools/generated_core.py
new file mode 100644
index 0000000..11e9e73
--- /dev/null
+++ b/plane_mcp/tools/generated_core.py
@@ -0,0 +1,276 @@
+"""Generated tools for Plane MCP Server."""
+
+import uuid
+from typing import Any
+
+import requests
+from fastmcp import FastMCP
+
+from plane_mcp.client import get_plane_client_context
+
+
+def register_core_generated_tools(mcp: FastMCP) -> None:
+ """Register generated core tools."""
+
+ @mcp.tool()
+ def get_workspace() -> dict[str, Any]:
+ """
+ Get workspace details returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/".format(workspace_slug=workspace_slug)
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("GET", url, headers=headers, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def update_workspace(data: dict[str, Any]) -> dict[str, Any]:
+ """
+ Update workspace details returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ data: JSON payload
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/".format(workspace_slug=workspace_slug)
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("PATCH", url, headers=headers, json=data, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def list_projects() -> dict[str, Any]:
+ """
+ List all projects in a workspace returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/".format(workspace_slug=workspace_slug)
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("GET", url, headers=headers, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def create_project(data: dict[str, Any]) -> dict[str, Any]:
+ """
+ Create a project returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ data: JSON payload
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/".format(workspace_slug=workspace_slug)
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("POST", url, headers=headers, json=data, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def get_project(project_id: uuid.UUID) -> dict[str, Any]:
+ """
+ Get a specific project returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/".format(workspace_slug=workspace_slug, project_id=str(project_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("GET", url, headers=headers, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def list_issues(project_id: uuid.UUID) -> dict[str, Any]:
+ """
+ List all issues in a project returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/issues/".format(workspace_slug=workspace_slug, project_id=str(project_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("GET", url, headers=headers, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def create_issue_raw(project_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]:
+ """
+ Create an issue returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+ data: JSON payload
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/issues/".format(workspace_slug=workspace_slug, project_id=str(project_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("POST", url, headers=headers, json=data, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def update_issue_raw(project_id: uuid.UUID, issue_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]:
+ """
+ Update an issue returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+ issue_id: UUID of the issue
+ data: JSON payload
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/issues/{issue_id}/".format(workspace_slug=workspace_slug, project_id=str(project_id), issue_id=str(issue_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("PATCH", url, headers=headers, json=data, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def delete_issue(project_id: uuid.UUID, issue_id: uuid.UUID) -> dict[str, Any]:
+ """
+ Delete an issue returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+ issue_id: UUID of the issue
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/issues/{issue_id}/".format(workspace_slug=workspace_slug, project_id=str(project_id), issue_id=str(issue_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("DELETE", url, headers=headers, timeout=client.config.timeout)
+ response.raise_for_status()
+ return {"status": "deleted"} if response.status_code == 204 else response.json()
diff --git a/plane_mcp/tools/generated_metadata.py b/plane_mcp/tools/generated_metadata.py
new file mode 100644
index 0000000..6c2346b
--- /dev/null
+++ b/plane_mcp/tools/generated_metadata.py
@@ -0,0 +1,341 @@
+"""Generated tools for Plane MCP Server."""
+
+import uuid
+from typing import Any
+
+import requests
+from fastmcp import FastMCP
+
+from plane_mcp.client import get_plane_client_context
+
+
+def register_metadata_generated_tools(mcp: FastMCP) -> None:
+ """Register generated metadata tools."""
+
+ @mcp.tool()
+ def list_states(project_id: uuid.UUID) -> dict[str, Any]:
+ """
+ List all states in a project returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/states/".format(workspace_slug=workspace_slug, project_id=str(project_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("GET", url, headers=headers, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def create_state(project_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]:
+ """
+ Create a state returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+ data: JSON payload
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/states/".format(workspace_slug=workspace_slug, project_id=str(project_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("POST", url, headers=headers, json=data, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def update_state(project_id: uuid.UUID, state_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]:
+ """
+ Update a state returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+ state_id: UUID of the state
+ data: JSON payload
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/states/{state_id}/".format(workspace_slug=workspace_slug, project_id=str(project_id), state_id=str(state_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("PATCH", url, headers=headers, json=data, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def list_labels(project_id: uuid.UUID) -> dict[str, Any]:
+ """
+ List all labels in a project returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/labels/".format(workspace_slug=workspace_slug, project_id=str(project_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("GET", url, headers=headers, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def create_label(project_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]:
+ """
+ Create a label returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+ data: JSON payload
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/labels/".format(workspace_slug=workspace_slug, project_id=str(project_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("POST", url, headers=headers, json=data, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def delete_label(project_id: uuid.UUID, label_id: uuid.UUID) -> dict[str, Any]:
+ """
+ Delete a label returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+ label_id: UUID of the label
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/labels/{label_id}/".format(workspace_slug=workspace_slug, project_id=str(project_id), label_id=str(label_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("DELETE", url, headers=headers, timeout=client.config.timeout)
+ response.raise_for_status()
+ return {"status": "deleted"} if response.status_code == 204 else response.json()
+
+ @mcp.tool()
+ def list_cycles(project_id: uuid.UUID) -> dict[str, Any]:
+ """
+ List all cycles in a project returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/cycles/".format(workspace_slug=workspace_slug, project_id=str(project_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("GET", url, headers=headers, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def create_cycle(project_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]:
+ """
+ Create a cycle returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+ data: JSON payload
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/cycles/".format(workspace_slug=workspace_slug, project_id=str(project_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("POST", url, headers=headers, json=data, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def update_cycle(project_id: uuid.UUID, cycle_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]:
+ """
+ Update a cycle returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+ cycle_id: UUID of the cycle
+ data: JSON payload
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/cycles/{cycle_id}/".format(workspace_slug=workspace_slug, project_id=str(project_id), cycle_id=str(cycle_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("PATCH", url, headers=headers, json=data, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def list_modules(project_id: uuid.UUID) -> dict[str, Any]:
+ """
+ List all modules in a project returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/modules/".format(workspace_slug=workspace_slug, project_id=str(project_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("GET", url, headers=headers, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
+
+ @mcp.tool()
+ def create_module(project_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]:
+ """
+ Create a module returning raw JSON.
+
+ Args:
+ workspace_slug: Workspace slug (injected by context)
+ project_id: UUID of the project
+ data: JSON payload
+
+ Returns:
+ Raw JSON response from Plane API
+ """
+ client, workspace_slug = get_plane_client_context()
+
+ url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/modules/".format(workspace_slug=workspace_slug, project_id=str(project_id))
+
+ headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ if client.config.access_token:
+ headers["Authorization"] = f"Bearer {client.config.access_token}"
+ elif client.config.api_key:
+ headers["x-api-key"] = client.config.api_key
+
+ response = requests.request("POST", url, headers=headers, json=data, timeout=client.config.timeout)
+ response.raise_for_status()
+ return response.json()
diff --git a/plane_mcp/tools/journeys.py b/plane_mcp/tools/journeys.py
new file mode 100644
index 0000000..83b294a
--- /dev/null
+++ b/plane_mcp/tools/journeys.py
@@ -0,0 +1,314 @@
+"""AI Journey tools for Plane MCP Server. Designed for semantic compression and intent-based operations."""
+
+import re
+
+from fastmcp import FastMCP
+from plane.models.query_params import PaginatedQueryParams, RetrieveQueryParams
+from plane.models.work_items import CreateWorkItem, CreateWorkItemComment, UpdateWorkItem
+
+from plane_mcp.client import get_plane_client_context
+
+
+class EntityResolver:
+ """Helper to resolve human-readable identifiers to Plane UUIDs and minify responses."""
+ def __init__(self, client, workspace_slug):
+ self.client = client
+ self.workspace_slug = workspace_slug
+ self._project_cache = {}
+ self._state_cache = {} # project_id -> { group: state_id }
+ self._me = None
+
+ def get_me_id(self):
+ if not self._me:
+ self._me = self.client.users.get_me()
+ return self._me.id
+
+ def get_project_id(self, identifier: str) -> str:
+ identifier = identifier.upper()
+ if identifier in self._project_cache:
+ return self._project_cache[identifier]
+
+ projects_resp = self.client.projects.list(workspace_slug=self.workspace_slug, params=PaginatedQueryParams(per_page=100))
+ for p in projects_resp.results:
+ self._project_cache[p.identifier.upper()] = p.id
+ if p.identifier.upper() == identifier:
+ return p.id
+ raise ValueError(f"Project with identifier '{identifier}' not found in workspace '{self.workspace_slug}'.")
+
+ def get_state_id(self, project_id: str, group: str) -> str:
+ group = group.lower()
+ if project_id not in self._state_cache:
+ self._state_cache[project_id] = {}
+ states_resp = self.client.states.list(workspace_slug=self.workspace_slug, project_id=project_id)
+ for s in states_resp.results:
+ if s.group not in self._state_cache[project_id]:
+ self._state_cache[project_id][s.group] = s.id
+
+ if group not in self._state_cache[project_id]:
+ raise ValueError(f"State group '{group}' not found in project.")
+ return self._state_cache[project_id][group]
+
+ def parse_issue_id(self, ticket_id: str):
+ match = re.match(r"^([A-Z0-9]+)-(\d+)$", ticket_id, re.IGNORECASE)
+ if not match:
+ raise ValueError(f"Invalid ticket format: {ticket_id}. Expected format like 'PLANE-123'")
+ return match.group(1).upper(), int(match.group(2))
+
+ def get_issue_uuid(self, project_identifier: str, issue_sequence: int):
+ try:
+ item = self.client.work_items.retrieve_by_identifier(
+ workspace_slug=self.workspace_slug,
+ project_identifier=project_identifier,
+ issue_identifier=issue_sequence,
+ params=RetrieveQueryParams(expand="assignees,state")
+ )
+ return item.id, item
+ except Exception as e:
+ raise ValueError(f"Could not find ticket {project_identifier}-{issue_sequence}: {str(e)}")
+
+ def minify_issue(self, item: dict, project_identifier: str) -> dict:
+ """Compress a work item to save AI tokens."""
+ if hasattr(item, 'model_dump'):
+ item = item.model_dump()
+ elif hasattr(item, 'dict'):
+ item = item.dict()
+ elif hasattr(item, '__dict__'):
+ item = item.__dict__
+
+ assignees = []
+ if item.get("assignees"):
+ for a in item.get("assignees"):
+ if isinstance(a, dict):
+ assignees.append(a.get("display_name", a.get("id")))
+ else:
+ assignees.append(str(a))
+
+ return {
+ "ticket_id": f"{project_identifier}-{item.get('sequence_id')}",
+ "title": item.get("name"),
+ "state": item.get("state_detail", {}).get("group") if isinstance(item.get("state_detail"), dict) else item.get("state"),
+ "priority": item.get("priority"),
+ "assignees": assignees
+ }
+
+def register_ai_journeys(mcp: FastMCP) -> None:
+ """Register AI intent-based tools."""
+
+ @mcp.tool()
+ def create_ticket(
+ title: str,
+ description: str,
+ project_slug: str = "PLANE",
+ priority: str = "none",
+ is_epic: bool = False,
+ child_tickets: list[dict] | None = None
+ ) -> dict:
+ """
+ Create a new ticket in Plane.
+
+ Args:
+ title: The title of the ticket.
+ description: Detailed description of the ticket.
+ project_slug: The identifier of the project (e.g., 'PLANE', 'TEST'). Defaults to 'PLANE'.
+ priority: Ticket priority ('urgent', 'high', 'medium', 'low', 'none'). Defaults to 'none'.
+ is_epic: Set to True if this ticket is an Epic (a large feature with children).
+ child_tickets: Optional list of dicts [{'title': '...', 'description': '...'}] to create as children.
+
+ Returns:
+ A token-optimized summary of the created ticket (and its children).
+ """
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+
+ project_id = resolver.get_project_id(project_slug)
+ state_id = resolver.get_state_id(project_id, "backlog")
+
+ data = CreateWorkItem(
+ name=title,
+ description_stripped=description,
+ priority=priority if priority != "none" else None,
+ state=state_id
+ )
+
+ result = client.work_items.create(workspace_slug=workspace_slug, project_id=project_id, data=data)
+ parent_minified = resolver.minify_issue(result, project_slug)
+
+ if child_tickets:
+ parent_id = result.id
+ children_results = []
+ for child in child_tickets:
+ child_data = CreateWorkItem(
+ name=child.get('title', 'Untitled Child'),
+ description_stripped=child.get('description', ''),
+ state=state_id,
+ parent=parent_id
+ )
+ child_res = client.work_items.create(workspace_slug=workspace_slug, project_id=project_id, data=child_data)
+ children_results.append(resolver.minify_issue(child_res, project_slug))
+ parent_minified['children'] = children_results
+
+ return parent_minified
+
+ @mcp.tool()
+ def update_ticket(
+ ticket_id: str,
+ title: str | None = None,
+ description: str | None = None,
+ priority: str | None = None
+ ) -> dict:
+ """
+ Update an existing ticket's standard fields.
+
+ Args:
+ ticket_id: The human-readable ID, e.g., 'PLANE-123'.
+ title: New title (optional).
+ description: New description (optional).
+ priority: New priority ('urgent', 'high', 'medium', 'low', 'none') (optional).
+ """
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+
+ proj_identifier, issue_seq = resolver.parse_issue_id(ticket_id)
+ project_id = resolver.get_project_id(proj_identifier)
+ issue_uuid, _ = resolver.get_issue_uuid(proj_identifier, issue_seq)
+
+ data_dict = {}
+ if title is not None:
+ data_dict['name'] = title
+ if description is not None:
+ data_dict['description_stripped'] = description
+ if priority is not None:
+ data_dict['priority'] = priority
+ update_model = UpdateWorkItem(**data_dict)
+
+ result = client.work_items.update(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ work_item_id=issue_uuid,
+ data=update_model
+ )
+ return resolver.minify_issue(result, proj_identifier)
+
+ @mcp.tool()
+ def begin_work(ticket_id: str) -> dict:
+ """
+ Begin work on a ticket. This automatically:
+ 1. Assigns the ticket to you.
+ 2. Transitions the ticket to the 'In Progress' (started) state.
+
+ Args:
+ ticket_id: The human-readable ID, e.g., 'PLANE-123'.
+ """
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+
+ proj_identifier, issue_seq = resolver.parse_issue_id(ticket_id)
+ project_id = resolver.get_project_id(proj_identifier)
+ issue_uuid, existing_item = resolver.get_issue_uuid(proj_identifier, issue_seq)
+
+ me_id = resolver.get_me_id()
+ started_state_id = resolver.get_state_id(project_id, "started")
+
+ # Merge existing assignees with 'me' to avoid overwriting
+ existing_assignees = []
+ if hasattr(existing_item, 'assignees') and existing_item.assignees:
+ for a in existing_item.assignees:
+ if isinstance(a, dict):
+ existing_assignees.append(a.get('id'))
+ elif hasattr(a, 'id'):
+ existing_assignees.append(a.id)
+ if me_id not in existing_assignees:
+ existing_assignees.append(me_id)
+
+ data = UpdateWorkItem(
+ state=started_state_id,
+ assignees=existing_assignees
+ )
+
+ result = client.work_items.update(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ work_item_id=issue_uuid,
+ data=data
+ )
+ return resolver.minify_issue(result, proj_identifier)
+
+ @mcp.tool()
+ def complete_work(ticket_id: str, comment: str | None = None) -> dict:
+ """
+ Complete work on a ticket. This automatically:
+ 1. Transitions the ticket to the 'Done' (completed) state.
+ 2. Optionally adds a comment to the ticket.
+
+ Args:
+ ticket_id: The human-readable ID, e.g., 'PLANE-123'.
+ comment: A comment explaining the resolution (optional).
+ """
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+
+ proj_identifier, issue_seq = resolver.parse_issue_id(ticket_id)
+ project_id = resolver.get_project_id(proj_identifier)
+ issue_uuid, _ = resolver.get_issue_uuid(proj_identifier, issue_seq)
+
+ completed_state_id = resolver.get_state_id(project_id, "completed")
+
+ data = UpdateWorkItem(state=completed_state_id)
+ result = client.work_items.update(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ work_item_id=issue_uuid,
+ data=data
+ )
+
+ if comment:
+ # Plane client comments
+ comment_data = CreateWorkItemComment(comment_stripped=comment)
+ client.work_items.comments.create(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ work_item_id=issue_uuid,
+ data=comment_data
+ )
+
+ return resolver.minify_issue(result, proj_identifier)
+
+ @mcp.tool()
+ def read_ticket(ticket_id: str) -> dict:
+ """
+ Read a ticket's current status and details.
+
+ Args:
+ ticket_id: The human-readable ID, e.g., 'PLANE-123'.
+ """
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+
+ proj_identifier, issue_seq = resolver.parse_issue_id(ticket_id)
+ _, existing_item = resolver.get_issue_uuid(proj_identifier, issue_seq)
+
+ return resolver.minify_issue(existing_item, proj_identifier)
+
+ @mcp.tool()
+ def search_tickets(query: str, project_slug: str = "PLANE") -> list[dict]:
+ """
+ Search for tickets using natural language query.
+
+ Args:
+ query: The text to search for.
+ project_slug: The project to search within (e.g., 'PLANE').
+ """
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+
+ project_id = resolver.get_project_id(project_slug)
+ results = client.work_items.search(workspace_slug=workspace_slug, query=query, params=RetrieveQueryParams())
+
+ minified = []
+ if hasattr(results, 'issues'):
+ for item in results.issues:
+ if hasattr(item, 'project') and item.project == project_id:
+ minified.append(resolver.minify_issue(item, project_slug))
+ elif isinstance(item, dict) and item.get('project') == project_id:
+ minified.append(resolver.minify_issue(item, project_slug))
+ return minified
diff --git a/pyproject.toml b/pyproject.toml
index e51fd4d..848749e 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -18,6 +18,8 @@ dependencies = [
"mcp==1.26.0",
"PyJWT>=2.12.0",
"authlib>=1.6.9",
+ "markdownify>=0.14.1",
+ "nh3>=0.2.17",
]
[project.optional-dependencies]
diff --git a/tests/conftest.py b/tests/conftest.py
index 58439a1..f8ab74a 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1 +1,14 @@
"""Pytest configuration for Plane MCP Server tests."""
+
+import pytest
+import plane_mcp.resolver
+
+@pytest.fixture(autouse=True)
+def clear_resolver_caches():
+ """Clear global caches before each test to prevent test state leakage."""
+ plane_mcp.resolver._GLOBAL_PROJECT_CACHE.clear()
+ plane_mcp.resolver._GLOBAL_STATE_CACHE.clear()
+ plane_mcp.resolver._GLOBAL_WORK_ITEM_CACHE.clear()
+ plane_mcp.resolver._CACHE_LAST_UPDATED["projects"] = 0.0
+ plane_mcp.resolver._CACHE_LAST_UPDATED["states"] = 0.0
+ plane_mcp.resolver._CACHE_LAST_UPDATED["work_items"] = 0.0
diff --git a/tests/test_generated_tools.py b/tests/test_generated_tools.py
new file mode 100644
index 0000000..0239373
--- /dev/null
+++ b/tests/test_generated_tools.py
@@ -0,0 +1,20 @@
+import ast
+import os
+
+def check_file_for_uuid(filepath):
+ with open(filepath, "r") as f:
+ tree = ast.parse(f.read())
+
+ for node in ast.walk(tree):
+ if isinstance(node, ast.FunctionDef) and node.name in ('get_project', 'list_issues', 'create_issue_raw', 'update_issue_raw', 'delete_issue', 'list_states', 'create_state'):
+ for arg in node.args.args:
+ if arg.arg.endswith('_id'):
+ assert ast.unparse(arg.annotation) == "uuid.UUID", f"{arg.arg} in {node.name} is not UUID!"
+
+def test_generated_core_type_hints():
+ """Verify that the auto-generated core tools require UUIDs and proper types."""
+ check_file_for_uuid(os.path.join(os.path.dirname(__file__), "../plane_mcp/tools/generated_core.py"))
+
+def test_generated_metadata_type_hints():
+ """Verify that the auto-generated metadata tools require UUIDs."""
+ check_file_for_uuid(os.path.join(os.path.dirname(__file__), "../plane_mcp/tools/generated_metadata.py"))
diff --git a/tests/test_input_validation.py b/tests/test_input_validation.py
new file mode 100644
index 0000000..1d749b7
--- /dev/null
+++ b/tests/test_input_validation.py
@@ -0,0 +1,242 @@
+"""Tests for input validation and data safety sprint.
+
+Covers:
+- PLANE-27: HTML sanitization strips XSS vectors while preserving safe formatting
+- PLANE-29: Label auto-creation is bounded (max 3 new labels per request)
+- PLANE-38: update_ticket retrieves without expand to avoid label ValidationError
+"""
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from plane_mcp.sanitize import sanitize_html
+
+
+class TestHTMLSanitization:
+ """PLANE-27: Sanitize HTML inputs to prevent stored XSS."""
+
+ def test_strips_script_tags(self):
+ result = sanitize_html("Hello
")
+ assert "">')
+ assert "data:" not in result
+
+ def test_strips_style_attribute(self):
+ result = sanitize_html('content
')
+ assert "style=" not in result
+ assert "content" in result
+
+ def test_complex_xss_payload(self):
+ payload = """Normal text
+
+
"}
+
+ with patch("plane_mcp.journey.tools.create_update.get_plane_client_context") as mock_ctx:
+ mock_client = MagicMock()
+ mock_client.work_items._get.return_value = mock_work_item
+ mock_ctx.return_value = (mock_client, "test-ws")
+
+ journey.update_ticket("TEST-1", new_title="New Title")
+
+ # Verify _get was called
+ mock_client.work_items._get.assert_called_once()
+
+ def test_update_ticket_sanitizes_description(self):
+ """Verify descriptions are sanitized before sending to API."""
+ from plane_mcp.journey.tools.create_update import CreateUpdateJourney
+
+ mock_resolver = MagicMock()
+ mock_resolver.resolve_ticket.return_value = "work-item-uuid"
+ mock_resolver.resolve_project.return_value = "project-uuid"
+
+ journey = CreateUpdateJourney(mock_resolver)
+
+ mock_work_item = {
+ "id": "ticket-1",
+ "name": "Title",
+ "description_html": ""
+ }
+
+ with patch("plane_mcp.journey.tools.create_update.get_plane_client_context") as mock_ctx:
+ mock_client = MagicMock()
+ mock_client.work_items._get.return_value = mock_work_item
+ mock_ctx.return_value = (mock_client, "test-ws")
+
+ journey.update_ticket("TEST-1", append_text='safe text')
+
+ update_call = mock_client.work_items.update.call_args
+ data = update_call.kwargs["data"]
+ assert "