-
Notifications
You must be signed in to change notification settings - Fork 23
feat: Add workflow support to MCP server #178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
5266e26
c6e8bf5
ab4cdb4
b4b12f7
870a864
ef286c4
6fea82d
1b0d4da
9b5ed83
f48b673
4d1633c
0095946
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,11 @@ | |
| create_glossary_category_assets, | ||
| create_glossary_assets, | ||
| create_glossary_term_assets, | ||
| get_workflow_package_names, | ||
| get_workflows_by_type, | ||
| get_workflow_by_id, | ||
| get_workflow_runs, | ||
| get_workflow_runs_by_status_and_time_range, | ||
| UpdatableAttribute, | ||
| CertificateStatus, | ||
| UpdatableAsset, | ||
|
|
@@ -905,6 +910,328 @@ def create_glossary_categories(categories) -> List[Dict[str, Any]]: | |
| return create_glossary_category_assets(categories) | ||
|
|
||
|
|
||
| @mcp.tool() | ||
| def get_workflow_package_names_tool() -> List[str]: | ||
| """ | ||
| Get the values of all available workflow packages. | ||
|
|
||
| This tool returns a list of all workflow package names that can be used | ||
| with other workflow tools. | ||
|
|
||
| Returns: | ||
| List[str]: List of workflow package names (e.g., ['atlan-snowflake', 'atlan-bigquery', ...]) | ||
|
|
||
| Examples: | ||
| # Get all workflow package names | ||
| packages = get_workflow_package_names_tool() | ||
| """ | ||
| return get_workflow_package_names() | ||
|
|
||
|
|
||
| @mcp.tool() | ||
| def get_workflows_by_type_tool(workflow_package_name: str, max_results: int = 10) -> Dict[str, Any]: | ||
| """ | ||
| Retrieve workflows (WorkflowTemplate) by workflow package name. | ||
|
|
||
| Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: | ||
| - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) | ||
| - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) | ||
|
|
||
| IMPORTANT: This tool returns only basic metadata. It does NOT return full workflow | ||
| instructions, steps, or transformations. To get complete workflow details including all steps and | ||
| transformations, use get_workflow_by_id_tool instead. | ||
|
|
||
| Note: This tool only returns workflows (WorkflowTemplate), not workflow_runs. | ||
| To get workflow_run information, use functions that start with get_workflow_run. | ||
|
|
||
| When to use this tool: | ||
| - Use when you need to list multiple workflows by package type | ||
| - Use when you only need metadata (package info, certification status) without full workflow details | ||
| - Do NOT use if you need the complete workflow specification with all steps and transformations | ||
| - Do NOT use if you need workflow_run (execution) information | ||
|
|
||
| Args: | ||
| workflow_package_name (str): The workflow package name (e.g., 'atlan-snowflake', 'atlan-bigquery', 'SNOWFLAKE'). | ||
| Can be the full package name like 'atlan-snowflake' or the enum name like 'SNOWFLAKE'. | ||
| max_results (int, optional): Maximum number of workflows to return. Defaults to 10. | ||
|
|
||
| Returns: | ||
| Dict[str, Any]: Dictionary containing: | ||
| - workflows: List of workflow (WorkflowTemplate) dictionaries. Each workflow contains: | ||
| - template_name: Name of the template | ||
| - package_name, package_version: Package information | ||
| - source_system, source_category, workflow_type: Source information | ||
| - certified, verified: Certification status | ||
| - creator_email, creator_username: Creator information | ||
| - modifier_email, modifier_username: Last modifier information | ||
| - creation_timestamp: When template was created | ||
| - package_author, package_description, package_repository: Package metadata | ||
| - total: Total count of workflows found | ||
|
|
||
| Examples: | ||
| # Get Snowflake workflows | ||
| result = get_workflows_by_type_tool("atlan-snowflake") | ||
|
|
||
| # Get BigQuery workflows with custom limit | ||
| result = get_workflows_by_type_tool("atlan-bigquery", max_results=50) | ||
|
|
||
| # Get workflows using enum name | ||
| result = get_workflows_by_type_tool("SNOWFLAKE") | ||
| """ | ||
| return get_workflows_by_type(workflow_package_name=workflow_package_name, max_results=max_results) | ||
|
|
||
|
|
||
| @mcp.tool() | ||
| def get_workflow_by_id_tool(id: str) -> Dict[str, Any]: | ||
| """ | ||
| Retrieve a specific workflow (WorkflowTemplate) by its ID. | ||
|
|
||
| Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: | ||
| - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) | ||
| - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) | ||
|
|
||
| IMPORTANT: This is the ONLY tool that returns full workflow instructions with all steps and transformations. | ||
| All other workflow tools (get_workflows_by_type_tool, get_workflow_runs_tool, etc.) return only metadata | ||
| and execution status, but NOT the complete workflow definition, steps, or transformation details. | ||
|
|
||
| This tool uses include_dag=True internally, which means it returns the complete workflow specification | ||
| including the workflow DAG (directed acyclic graph), all steps, transformations, and execution details. | ||
|
|
||
| Note: This tool only returns workflows (WorkflowTemplate), not workflow_runs. | ||
|
christopher-tin-atlan marked this conversation as resolved.
Outdated
|
||
| To get workflow_run information, use functions that start with get_workflow_run. | ||
|
|
||
| When to use this tool: | ||
| - Use when you need the COMPLETE workflow definition with all steps and transformations | ||
| - Use when you need to understand the full workflow execution flow | ||
| - Use when you need workflow_spec and workflow_steps data | ||
| - Do NOT use if you only need basic metadata (use other workflow tools instead) | ||
| - Do NOT use if you need workflow_run (execution) information | ||
|
|
||
| Args: | ||
| id (str): The unique identifier (ID) of the workflow (e.g., 'atlan-snowflake-miner-1714638976'). | ||
|
|
||
| Returns: | ||
| Dict[str, Any]: Dictionary containing: | ||
| - workflow: The workflow (WorkflowTemplate) dictionary with comprehensive metadata, or None if not found. | ||
| The workflow dictionary contains: | ||
| - template_name: Name of the template | ||
| - package_name, package_version: Package information | ||
| - source_system, source_category, workflow_type: Source information | ||
| - certified, verified: Certification status | ||
| - creator_email, creator_username: Creator information | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should not send all this information
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted I saw your request on removing PII data in later comments. I have removed created/modified user information. |
||
| - modifier_email, modifier_username: Last modifier information | ||
| - creation_timestamp: When template was created | ||
| - package_author, package_description, package_repository: Package metadata | ||
| - workflow_steps: Workflow steps (when include_dag=True) | ||
| - workflow_spec: Full workflow specification (when include_dag=True) | ||
| - error: None if no error occurred, otherwise the error message | ||
|
|
||
| Examples: | ||
| # Get a specific workflow by ID to see full workflow instructions | ||
| result = get_workflow_by_id_tool("atlan-snowflake-miner-1714638976") | ||
|
|
||
| # Access full workflow steps and transformations | ||
| workflow = result.get("workflow") | ||
| # The workflow object contains complete workflow definition with workflow_steps and workflow_spec | ||
| """ | ||
| return get_workflow_by_id(id=id) | ||
|
|
||
|
|
||
| @mcp.tool() | ||
| def get_workflow_runs_tool( | ||
| workflow_name: str, | ||
| workflow_phase: str, | ||
| from_: int = 0, | ||
| size: int = 100, | ||
| ) -> Dict[str, Any]: | ||
| """ | ||
| Retrieve all workflow_runs for a specific workflow and phase. | ||
|
|
||
| Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: | ||
|
christopher-tin-atlan marked this conversation as resolved.
Outdated
|
||
| - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) | ||
| - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) | ||
|
|
||
| This tool returns workflow_run instances (kind="Workflow") that match the specified workflow name and phase. | ||
| Each workflow_run contains execution details, timing, resource usage, and status information. | ||
|
|
||
| IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow | ||
| instructions, steps, or transformations. To get complete workflow details including all steps and | ||
| transformations, use get_workflow_by_id_tool instead. | ||
|
|
||
| When to use this tool: | ||
| - Use when you need to find all workflow_runs of a specific workflow by execution phase | ||
| - Use when you need execution metadata (status, timing, resource usage) for multiple workflow_runs | ||
| - Use when you need to filter workflow_runs by phase (Succeeded, Failed, Running, etc.) | ||
| - Do NOT use if you need the complete workflow specification (use get_workflow_by_id_tool instead) | ||
|
|
||
| Args: | ||
| workflow_name (str): Name of the workflow (template) as displayed in the UI (e.g., 'atlan-snowflake-miner-1714638976'). | ||
| This refers to the workflow name, not individual workflow_run IDs. | ||
| workflow_phase (str): Phase of the workflow_run. Common values: 'Succeeded', 'Running', 'Failed', 'Error', 'Pending'. | ||
| Case-insensitive matching is supported. | ||
| from_ (int, optional): Starting index of the search results for pagination. Defaults to 0. | ||
| size (int, optional): Maximum number of search results to return. Defaults to 100. | ||
|
|
||
| Returns: | ||
| Dict[str, Any]: Dictionary containing: | ||
| - runs: List of workflow_run dictionaries. Each workflow_run contains: | ||
| Run Metadata: | ||
| - run_id: Unique identifier for this workflow_run | ||
| - run_phase: Execution phase (Succeeded, Running, Failed, etc.) | ||
| - run_started_at: When the workflow_run started (ISO timestamp) | ||
| - run_finished_at: When the workflow_run finished (ISO timestamp, None if still running) | ||
| - run_estimated_duration: Estimated execution duration | ||
| - run_progress: Progress indicator (e.g., "1/3") | ||
| - run_cpu_usage: CPU resource usage duration | ||
| - run_memory_usage: Memory resource usage duration | ||
|
|
||
| Workflow Metadata: | ||
| - workflow_id: Reference to the workflow (template) used | ||
| - workflow_package_name: Package identifier | ||
| - workflow_cron_schedule: Cron schedule if scheduled | ||
| - workflow_cron_timezone: Timezone for cron schedule | ||
| - workflow_creator_id, workflow_creator_email, workflow_creator_username: Creator information | ||
|
christopher-tin-atlan marked this conversation as resolved.
|
||
| - workflow_modifier_id, workflow_modifier_email, workflow_modifier_username: Last modifier information | ||
| - workflow_creation_timestamp: When the workflow was created | ||
| - workflow_archiving_status: Archiving status | ||
|
christopher-tin-atlan marked this conversation as resolved.
|
||
| - total: Total count of workflow_runs matching the criteria | ||
| - error: None if no error occurred, otherwise the error message | ||
|
|
||
| Examples: | ||
| # Get succeeded workflow_runs | ||
| result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Succeeded") | ||
|
|
||
| # Get running workflow_runs | ||
| result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Running") | ||
|
|
||
| # Get failed workflow_runs with pagination | ||
| result = get_workflow_runs_tool("atlan-snowflake-miner-1714638976", "Failed", from_=0, size=50) | ||
|
|
||
| # Analyze workflow_run durations | ||
| runs = result.get("runs", []) | ||
| for run in runs: | ||
| if run.get("run_finished_at") and run.get("run_started_at"): | ||
| print(f"Workflow run {run['run_id']} took {run.get('run_estimated_duration')}") | ||
| """ | ||
| return get_workflow_runs( | ||
| workflow_name=workflow_name, | ||
| workflow_phase=workflow_phase, | ||
| from_=from_, | ||
| size=size, | ||
| ) | ||
|
|
||
|
|
||
| @mcp.tool() | ||
| def get_workflow_runs_by_status_and_time_range_tool( | ||
| status: List[str], | ||
| started_at: str | None = None, | ||
| finished_at: str | None = None, | ||
| from_: int = 0, | ||
| size: int = 100, | ||
| ) -> Dict[str, Any]: | ||
| """ | ||
| Retrieve workflow_runs based on their status and time range. | ||
|
|
||
| Naming Convention Note: The Argo/Kubernetes API uses terminology that can be confusing: | ||
| - "WorkflowTemplate" (kind="WorkflowTemplate") = workflow (template definition) | ||
| - "Workflow" (kind="Workflow") = workflow_run (executed instance/run) | ||
|
|
||
| This tool allows you to search for workflow_run instances (kind="Workflow") by filtering on their execution status | ||
| (e.g., Succeeded, Failed, Running) and optionally by time ranges. This is useful for | ||
| monitoring, debugging, and analyzing workflow execution patterns. | ||
|
|
||
| IMPORTANT: This tool returns only metadata and execution status. It does NOT return full workflow | ||
| instructions, steps, or transformations. To get complete workflow details including all steps and | ||
| transformations, use get_workflow_by_id_tool instead. | ||
|
|
||
| When to use this tool: | ||
| - Use when you need to find workflow_runs across multiple workflows filtered by status and time | ||
| - Use when monitoring workflow execution patterns or analyzing failures | ||
| - Use when you need execution metadata for workflow_runs within a specific time window | ||
| - Do NOT use if you need the complete workflow specification (use get_workflow_by_id_tool instead) | ||
|
|
||
| Args: | ||
| status (List[str]): List of workflow_run phases to filter by. Common values: | ||
| - 'Succeeded': Successfully completed workflow_runs | ||
| - 'Failed': Workflow_runs that failed | ||
| - 'Running': Currently executing workflow_runs | ||
| - 'Error': Workflow_runs that encountered errors | ||
| - 'Pending': Workflow_runs waiting to start | ||
| Case-insensitive matching is supported. Example: ['Succeeded', 'Failed'] | ||
| started_at (str, optional): Lower bound on 'status.startedAt' timestamp. Accepts: | ||
| - Relative time format: 'now-2h', 'now-24h', 'now-7d', 'now-30d' | ||
| - ISO 8601 format: '2024-01-01T00:00:00Z' | ||
| Filters workflow_runs that started at or after this time. | ||
| finished_at (str, optional): Lower bound on 'status.finishedAt' timestamp. Accepts: | ||
| - Relative time format: 'now-1h', 'now-12h' | ||
| - ISO 8601 format: '2024-01-01T00:00:00Z' | ||
| Filters workflow_runs that finished at or after this time. | ||
| from_ (int, optional): Starting index of the search results for pagination. Defaults to 0. | ||
| size (int, optional): Maximum number of search results to return. Defaults to 100. | ||
|
|
||
| Returns: | ||
| Dict[str, Any]: Dictionary containing: | ||
| - runs: List of workflow_run dictionaries matching the criteria. Each workflow_run contains: | ||
| Run Metadata: | ||
| - run_id: Unique identifier for this workflow_run | ||
| - run_phase: Execution phase (Succeeded, Running, Failed, etc.) | ||
| - run_started_at: When the workflow_run started (ISO timestamp) | ||
| - run_finished_at: When the workflow_run finished (ISO timestamp, None if still running) | ||
| - run_estimated_duration: Estimated execution duration | ||
| - run_progress: Progress indicator (e.g., "1/3") | ||
| - run_cpu_usage: CPU resource usage duration | ||
| - run_memory_usage: Memory resource usage duration | ||
|
|
||
| Workflow Metadata: | ||
| - workflow_id: Reference to the workflow (template) used | ||
| - workflow_package_name: Package identifier | ||
| - workflow_cron_schedule: Cron schedule if scheduled | ||
| - workflow_cron_timezone: Timezone for cron schedule | ||
| - workflow_creator_id, workflow_creator_email, workflow_creator_username: Creator information | ||
| - workflow_modifier_id, workflow_modifier_email, workflow_modifier_username: Last modifier information | ||
| - workflow_creation_timestamp: When the workflow was created | ||
| - workflow_archiving_status: Archiving status | ||
| - total: Total count of workflow_runs matching the criteria (may be larger than the returned list if paginated) | ||
| - error: None if no error occurred, otherwise the error message | ||
|
|
||
| Examples: | ||
| # Get succeeded workflow_runs from the last 2 hours | ||
| result = get_workflow_runs_by_status_and_time_range_tool( | ||
| status=["Succeeded"], | ||
| started_at="now-2h" | ||
| ) | ||
|
|
||
| # Get failed workflow_runs from the last 24 hours | ||
| result = get_workflow_runs_by_status_and_time_range_tool( | ||
| status=["Failed"], | ||
| started_at="now-24h" | ||
| ) | ||
|
|
||
| # Get multiple statuses with both time filters | ||
| result = get_workflow_runs_by_status_and_time_range_tool( | ||
| status=["Succeeded", "Failed"], | ||
| started_at="now-7d", | ||
| finished_at="now-1h" | ||
| ) | ||
|
|
||
| # Get running workflow_runs | ||
| result = get_workflow_runs_by_status_and_time_range_tool( | ||
| status=["Running"] | ||
| ) | ||
|
|
||
| # Analyze failure rates | ||
| failed_runs = [r for r in result.get("runs", []) if r.get("run_phase") == "Failed"] | ||
| print(f"Found {len(failed_runs)} failed workflow_runs in the time range") | ||
| """ | ||
| return get_workflow_runs_by_status_and_time_range( | ||
| status=status, | ||
| started_at=started_at, | ||
| finished_at=finished_at, | ||
| from_=from_, | ||
| size=size, | ||
| ) | ||
|
|
||
|
|
||
| def main(): | ||
| """Main entry point for the Atlan MCP Server.""" | ||
|
|
||
|
|
||

Uh oh!
There was an error while loading. Please reload this page.