diff --git a/.gitignore b/.gitignore index 874636bb..4165a9fd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,9 @@ -.idea/ \ No newline at end of file +.idea/ +__pycache__/ +*.pyc +*.pyo +*.egg-info/ +.venv/ +dist/ +build/ diff --git a/mcp/README.md b/mcp/README.md new file mode 100644 index 00000000..6e6e1ac0 --- /dev/null +++ b/mcp/README.md @@ -0,0 +1,111 @@ +# figshare-mcp + +An MCP (Model Context Protocol) server that exposes the [Figshare API v2](https://docs.figshare.com) as tools for Claude and other MCP-compatible AI assistants. + +## What it does + +Provides 9 semantic tools for interacting with any Figshare instance: + +| Tool | Description | +|------|-------------| +| `search_articles` | Search public or private articles | +| `get_article` | Get article details, files, and version history | +| `manage_article` | Create or update a draft article (never publishes) | +| `search_collections` | Search public or private collections | +| `get_collection` | Get collection details and its articles | +| `manage_collection` | Create or update a collection | +| `get_projects` | List or inspect a project | +| `get_account_info` | Fetch profile, available licenses, and subject categories | +| `manage_embargo` | Get, set, or remove an embargo on an article | + +**This MCP never publishes or deletes articles/collections.** Destructive and publish operations must be done via the Figshare web interface. + +## Requirements + +- Python 3.11+ +- Claude Desktop, Claude Code, or any other MCP-compatible client + +## Installation + +Clone the repo and install the package: + +```bash +git clone https://github.com/digital-science/figshare-user-documentation.git +cd figshare-user-documentation/mcp +pip install -e . +``` + +## Configuration + +### 1. Get a personal access token + +In Figshare, go to **Account → Applications → Create personal token**. + +You only need a token for private/authenticated operations. Public search and read work without a token. + +### 2. Add to Claude Desktop + +Edit `~/Library/Application Support/Claude/claude_desktop_config.json`: + +```json +{ + "mcpServers": { + "figshare": { + "command": "python3", + "args": ["-m", "figshare_mcp.server"], + "env": { + "FIGSHARE_TOKEN": "your-token-here", + "FIGSHARE_BASE_URL": "https://api.figshare.com/v2" + } + } + } +} +``` + +For an institutional instance, replace `FIGSHARE_BASE_URL` with your institution's API base URL. + +### 3. Add to Claude Code + +```bash +claude mcp add figshare \ + -e FIGSHARE_TOKEN=your-token-here \ + -e FIGSHARE_BASE_URL=https://api.figshare.com/v2 \ + -- python3 -m figshare_mcp.server +``` + +## Usage examples + +Once connected, you can ask Claude things like: + +- *"Search for articles about climate change published after 2022"* +- *"Get the details and files for article 12345678"* +- *"Create a new draft article titled 'My Dataset' with tags 'climate' and 'ocean'"* +- *"Show me my private articles"* +- *"What embargo options does my institution have?"* +- *"Set an embargo on article 123 until 2026-06-01"* + +## Running tests + +Integration tests run against a local Figshare instance. + +```bash +# Public tests only (no token needed, uses figshare.com): +pytest tests/ -v -m "not requires_token and not write" + +# All tests against a local instance: +FIGSHARE_TOKEN=xxx \ +FIGSHARE_BASE_URL=http://localhost:8080/v2 \ +pytest tests/ -v + +# Skip write tests (read-only against local instance): +FIGSHARE_TOKEN=xxx \ +FIGSHARE_BASE_URL=http://localhost:8080/v2 \ +pytest tests/ -v -m "not write" +``` + +## Environment variables + +| Variable | Required | Default | Description | +|----------|----------|---------|-------------| +| `FIGSHARE_TOKEN` | For private/write ops | — | Personal access token | +| `FIGSHARE_BASE_URL` | No | `https://api.figshare.com/v2` | API base URL | diff --git a/mcp/figshare_mcp/__init__.py b/mcp/figshare_mcp/__init__.py new file mode 100644 index 00000000..eba56fbd --- /dev/null +++ b/mcp/figshare_mcp/__init__.py @@ -0,0 +1,3 @@ +"""Figshare MCP server — exposes Figshare API v2 as MCP tools.""" + +__version__ = "0.1.0" diff --git a/mcp/figshare_mcp/client.py b/mcp/figshare_mcp/client.py new file mode 100644 index 00000000..a51fc0e6 --- /dev/null +++ b/mcp/figshare_mcp/client.py @@ -0,0 +1,135 @@ +""" +HTTP client for Figshare API v2. + +Reads configuration from environment variables: + FIGSHARE_TOKEN — personal access token (required for private/account endpoints) + FIGSHARE_BASE_URL — API base URL (default: https://api.figshare.com/v2) +""" + +import os +from typing import Any + +import httpx + +DEFAULT_BASE_URL = "https://api.figshare.com/v2" +MAX_PAGE_SIZE = 50 +MAX_TOTAL_RESULTS = 1000 + +# Human-readable error messages keyed by HTTP status code. +_ERROR_MESSAGES: dict[int, str] = { + 400: "Bad request: {detail}", + 401: "Authentication failed — check your FIGSHARE_TOKEN environment variable", + 403: "Access denied: your token does not have permission for this operation", + 404: "Resource not found", + 409: "Conflict: {detail}", + 422: "Validation error: {detail}", + 429: "Rate limit exceeded — please try again later", + 500: "Figshare API server error", + 502: "Figshare API is temporarily unavailable (502)", + 503: "Figshare API is temporarily unavailable (503)", +} + + +def _build_error_message(status_code: int, response_body: dict | None) -> str: + """Produce a human-readable error string from an HTTP error response.""" + template = _ERROR_MESSAGES.get(status_code, f"Unexpected error (HTTP {status_code})") + # Try to extract a detail message from the response body. + detail = "" + if response_body: + detail = ( + response_body.get("message") + or response_body.get("detail") + or response_body.get("error") + or str(response_body) + ) + return template.format(detail=detail) if "{detail}" in template else template + + +class FigshareClient: + """Thin async wrapper around httpx for Figshare API v2.""" + + def __init__(self) -> None: + raw_url = os.getenv("FIGSHARE_BASE_URL", DEFAULT_BASE_URL).rstrip("/") + self.base_url = self._normalize_base_url(raw_url) + token = os.getenv("FIGSHARE_TOKEN", "") + # Auth header only set when a token is present — public endpoints work without it. + self._headers: dict[str, str] = {"Content-Type": "application/json"} + if token: + self._headers["Authorization"] = f"token {token}" + + @staticmethod + def _normalize_base_url(url: str) -> str: + """Upgrade http:// to https:// for non-local URLs. + + Remote Figshare instances redirect http→https, but the 301 Location header + sometimes omits the 'api.' subdomain, resulting in silent 404s. Upgrading + the scheme upfront avoids the redirect entirely. + """ + if url.startswith("http://") and not any( + url.startswith(f"http://{h}") for h in ("localhost", "127.0.0.1", "0.0.0.0") + ): + return "https://" + url[len("http://"):] + return url + + @property + def has_token(self) -> bool: + return "Authorization" in self._headers + + async def _request( + self, + method: str, + path: str, + params: dict | None = None, + json: dict | None = None, + ) -> Any: + """Execute an HTTP request and return the parsed JSON body. + + Raises RuntimeError with a human-readable message on non-2xx responses. + """ + url = f"{self.base_url}{path}" + # Strip None values from query params so we don't send ?foo=None. + clean_params = {k: v for k, v in (params or {}).items() if v is not None} + + async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http: + response = await http.request( + method=method, + url=url, + headers=self._headers, + params=clean_params or None, + json=json, + ) + + if response.is_success: + # Handle empty bodies (204 No Content, 205 Reset Content, or any 2xx with no body). + if not response.content: + return {} + return response.json() + + # Parse error body if available. + body = None + try: + body = response.json() + except Exception: + pass + + raise RuntimeError(_build_error_message(response.status_code, body)) + + async def get(self, path: str, params: dict | None = None) -> Any: + return await self._request("GET", path, params=params) + + async def post(self, path: str, json: dict | None = None, params: dict | None = None) -> Any: + return await self._request("POST", path, params=params, json=json) + + async def put(self, path: str, json: dict | None = None) -> Any: + return await self._request("PUT", path, json=json) + + async def patch(self, path: str, json: dict | None = None) -> Any: + return await self._request("PATCH", path, json=json) + + async def delete(self, path: str) -> Any: + return await self._request("DELETE", path) + + +def clamp_page_size(page_size: int) -> int: + """Clamp page_size to the allowed range [1, MAX_PAGE_SIZE].""" + return max(1, min(page_size, MAX_PAGE_SIZE)) diff --git a/mcp/figshare_mcp/server.py b/mcp/figshare_mcp/server.py new file mode 100644 index 00000000..ee7a3a61 --- /dev/null +++ b/mcp/figshare_mcp/server.py @@ -0,0 +1,49 @@ +""" +Figshare MCP server entry point. + +Exposes 9 semantic tools over the MCP stdio transport: + search_articles — search public or private articles + get_article — get article details, files, and versions + manage_article — create or update a draft article (no publish) + search_collections — search public or private collections + get_collection — get collection details and its articles + manage_collection — create or update a collection (no publish) + get_projects — list or get details of a project + get_account_info — profile, licenses, categories, embargo options + manage_embargo — get/set/remove embargo on an article + +Configuration via environment variables: + FIGSHARE_TOKEN — personal access token (required for private/write operations) + FIGSHARE_BASE_URL — API base URL (default: https://api.figshare.com/v2) +""" + +from mcp.server.fastmcp import FastMCP + +from figshare_mcp.tools.account import get_account_info +from figshare_mcp.tools.articles import get_article, manage_article, search_articles +from figshare_mcp.tools.collections import get_collection, manage_collection, search_collections +from figshare_mcp.tools.embargo import manage_embargo +from figshare_mcp.tools.projects import get_projects + +# Create the MCP server instance. +mcp = FastMCP("figshare") + +# Register all tools — FastMCP reads the function signature and docstring automatically. +mcp.tool()(search_articles) +mcp.tool()(get_article) +mcp.tool()(manage_article) +mcp.tool()(search_collections) +mcp.tool()(get_collection) +mcp.tool()(manage_collection) +mcp.tool()(get_projects) +mcp.tool()(get_account_info) +mcp.tool()(manage_embargo) + + +def main() -> None: + """Start the MCP server using stdio transport.""" + mcp.run(transport="stdio") + + +if __name__ == "__main__": + main() diff --git a/mcp/figshare_mcp/tools/__init__.py b/mcp/figshare_mcp/tools/__init__.py new file mode 100644 index 00000000..bca3cb59 --- /dev/null +++ b/mcp/figshare_mcp/tools/__init__.py @@ -0,0 +1 @@ +"""Figshare MCP tool modules.""" diff --git a/mcp/figshare_mcp/tools/account.py b/mcp/figshare_mcp/tools/account.py new file mode 100644 index 00000000..26b6db1b --- /dev/null +++ b/mcp/figshare_mcp/tools/account.py @@ -0,0 +1,69 @@ +""" +MCP tool for Figshare account information. + +Tools: + get_account_info — retrieve profile, licenses, categories, and institution embargo options +""" + +import json + +from figshare_mcp.client import FigshareClient + + +async def get_account_info( + include_profile: bool = True, + include_licenses: bool = True, + include_categories: bool = True, + include_embargo_options: bool = False, +) -> str: + """Retrieve Figshare account metadata: user profile, available licenses, categories, and embargo options. + + Use this tool to look up valid license IDs or category IDs before creating/updating articles. + + Args: + include_profile: Include the authenticated user's profile details (requires token). + include_licenses: Include the list of available licenses and their IDs. + include_categories: Include the Figshare taxonomy of subject categories. + include_embargo_options: Include the institution-level embargo configuration (requires token). + + Returns: + JSON string with the requested account information sections. + """ + client = FigshareClient() + result = {} + errors = {} + + if include_profile: + if not client.has_token: + errors["profile"] = "FIGSHARE_TOKEN is required to fetch profile" + else: + try: + result["profile"] = await client.get("/account") + except RuntimeError as exc: + errors["profile"] = str(exc) + + if include_licenses: + try: + result["licenses"] = await client.get("/licenses") + except RuntimeError as exc: + errors["licenses"] = str(exc) + + if include_categories: + try: + result["categories"] = await client.get("/categories") + except RuntimeError as exc: + errors["categories"] = str(exc) + + if include_embargo_options: + if not client.has_token: + errors["embargo_options"] = "FIGSHARE_TOKEN is required to fetch embargo options" + else: + try: + result["embargo_options"] = await client.get("/account/institution/embargo_options") + except RuntimeError as exc: + errors["embargo_options"] = str(exc) + + if errors: + result["errors"] = errors + + return json.dumps(result, default=str) diff --git a/mcp/figshare_mcp/tools/articles.py b/mcp/figshare_mcp/tools/articles.py new file mode 100644 index 00000000..a430d58c --- /dev/null +++ b/mcp/figshare_mcp/tools/articles.py @@ -0,0 +1,264 @@ +""" +MCP tools for Figshare articles. + +Tools: + search_articles — search public or private articles + get_article — retrieve article details, files, and versions + manage_article — create or update a draft article +""" + +import json +from typing import Any + +from figshare_mcp.client import FigshareClient, clamp_page_size + + +def _format_article(article: dict) -> dict: + """Return a trimmed article dict with only the most useful fields.""" + return { + "id": article.get("id"), + "title": article.get("title"), + "doi": article.get("doi"), + "url": article.get("url_public_html") or article.get("url"), + "published_date": article.get("published_date"), + "modified_date": article.get("modified_date"), + "status": article.get("status"), + "defined_type_name": article.get("defined_type_name"), + "authors": [a.get("full_name") for a in article.get("authors", [])], + "tags": article.get("tags", []), + "categories": [c.get("title") for c in article.get("categories", [])], + "files_count": len(article.get("files", [])), + } + + +async def search_articles( + query: str = "", + private: bool = False, + page: int = 1, + page_size: int = 10, + order: str = "published_date", + order_direction: str = "desc", + institution: int | None = None, + published_since: str | None = None, + modified_since: str | None = None, + group: int | None = None, + item_type: int | None = None, +) -> str: + """Search Figshare articles. + + For public articles use private=False (no token needed). + For private/draft articles owned by the authenticated user use private=True (token required). + + Args: + query: Free-text search string. + private: If True, search within the authenticated user's own articles (requires token). + page: Page number (starts at 1). + page_size: Results per page (1–50, default 10). + order: Sort field — e.g. published_date, modified_date, views, citations. + order_direction: "asc" or "desc". + institution: Filter by institution ID. + published_since: ISO 8601 date string (e.g. "2023-01-01"). + modified_since: ISO 8601 date string. + group: Filter by group ID. + item_type: Figshare item type ID (e.g. 1=figure, 3=dataset, 9=software). + + Returns: + JSON string with matching articles and pagination metadata. + """ + client = FigshareClient() + page_size = clamp_page_size(page_size) + + if private: + if not client.has_token: + return json.dumps({"error": "FIGSHARE_TOKEN is required to search private articles"}) + # Private search uses POST with a body. + body: dict[str, Any] = { + "page": page, + "page_size": page_size, + "order_direction": order_direction, + } + if query: + body["search_for"] = query + if institution is not None: + body["institution"] = institution + if published_since: + body["published_since"] = published_since + if modified_since: + body["modified_since"] = modified_since + if group is not None: + body["group"] = group + try: + results = await client.post("/account/articles/search", json=body) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) + else: + # Public search uses POST /articles/search. + body = { + "page": page, + "page_size": page_size, + "order_direction": order_direction, + } + if query: + body["search_for"] = query + if institution is not None: + body["institution"] = institution + if published_since: + body["published_since"] = published_since + if modified_since: + body["modified_since"] = modified_since + if group is not None: + body["group"] = group + if item_type is not None: + body["item_type"] = item_type + try: + results = await client.post("/articles/search", json=body) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) + + articles = [_format_article(a) for a in (results if isinstance(results, list) else [])] + return json.dumps( + { + "articles": articles, + "count": len(articles), + "page": page, + "page_size": page_size, + "has_more": len(articles) == page_size, + "note": "Use page+1 to fetch the next page if has_more is true. Hard cap: 1000 total results.", + }, + default=str, + ) + + +async def get_article( + article_id: int, + include_files: bool = True, + include_versions: bool = True, + private: bool = False, +) -> str: + """Get full details for a single Figshare article. + + Args: + article_id: Numeric Figshare article ID. + include_files: Also fetch the list of files attached to this article. + include_versions: Also fetch the list of published versions. + private: If True, fetch from the authenticated user's private articles (requires token). + + Returns: + JSON string with article details, optionally including files and versions. + """ + client = FigshareClient() + + if private and not client.has_token: + return json.dumps({"error": "FIGSHARE_TOKEN is required to fetch private article details"}) + + base_path = f"/account/articles/{article_id}" if private else f"/articles/{article_id}" + + try: + article = await client.get(base_path) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) + + result: dict[str, Any] = {"article": article} + + if include_files: + try: + files = await client.get(f"/articles/{article_id}/files") + result["files"] = files + except RuntimeError as exc: + result["files_error"] = str(exc) + + if include_versions: + try: + versions = await client.get(f"/articles/{article_id}/versions") + result["versions"] = versions + except RuntimeError as exc: + result["versions_error"] = str(exc) + + return json.dumps(result, default=str) + + +async def manage_article( + action: str, + article_id: int | None = None, + title: str | None = None, + description: str | None = None, + tags: list[str] | None = None, + categories: list[int] | None = None, + authors: list[dict] | None = None, + license: int | None = None, + defined_type: str | None = None, + doi: str | None = None, + funding: str | None = None, + group_id: int | None = None, +) -> str: + """Create or update a draft Figshare article. Requires authentication token. + + This tool never publishes — it only creates/edits drafts. + To publish, go to the Figshare web interface. + + Args: + action: "create" to create a new draft, "update" to edit an existing one. + article_id: Required when action is "update". The article to update. + title: Article title. Required when action is "create". + description: Article description / abstract (HTML or plain text). + tags: List of keyword tags. + categories: List of category IDs. + authors: List of author objects. Each can be {"name": "..."} or {"id": 123}. + license: License ID (use get_account_info to list available licenses). + defined_type: Item type string — e.g. "dataset", "figure", "software", "paper". + doi: Custom DOI (leave blank to let Figshare assign one). + funding: Funding acknowledgement string. + group_id: Group ID to associate the article with. + + Returns: + JSON string with the created/updated article details or an error message. + """ + client = FigshareClient() + + if not client.has_token: + return json.dumps({"error": "FIGSHARE_TOKEN is required to create or update articles"}) + + if action not in ("create", "update"): + return json.dumps({"error": f"Invalid action '{action}'. Use 'create' or 'update'."}) + + if action == "create" and not title: + return json.dumps({"error": "title is required when action is 'create'"}) + + if action == "update" and article_id is None: + return json.dumps({"error": "article_id is required when action is 'update'"}) + + # Build the request body with only the provided (non-None) fields. + body: dict[str, Any] = {} + if title is not None: + body["title"] = title + if description is not None: + body["description"] = description + if tags is not None: + body["tags"] = tags + if categories is not None: + body["categories"] = categories + if authors is not None: + body["authors"] = authors + if license is not None: + body["license"] = license + if defined_type is not None: + body["defined_type"] = defined_type + if doi is not None: + body["doi"] = doi + if funding is not None: + body["funding"] = funding + if group_id is not None: + body["group_id"] = group_id + + try: + if action == "create": + result = await client.post("/account/articles", json=body) + return json.dumps({"success": True, "action": "created", "article": result}, default=str) + else: + # PUT replaces all fields; PATCH updates only provided fields — we use PUT here. + await client.put(f"/account/articles/{article_id}", json=body) + # PUT returns 205 with no body; fetch the updated article to confirm. + updated = await client.get(f"/account/articles/{article_id}") + return json.dumps({"success": True, "action": "updated", "article": updated}, default=str) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) diff --git a/mcp/figshare_mcp/tools/collections.py b/mcp/figshare_mcp/tools/collections.py new file mode 100644 index 00000000..ff3f7ee8 --- /dev/null +++ b/mcp/figshare_mcp/tools/collections.py @@ -0,0 +1,226 @@ +""" +MCP tools for Figshare collections. + +Tools: + search_collections — search public or private collections + get_collection — retrieve collection details and its articles + manage_collection — create or update a draft collection +""" + +import json +from typing import Any + +from figshare_mcp.client import FigshareClient, clamp_page_size + + +async def search_collections( + query: str = "", + private: bool = False, + page: int = 1, + page_size: int = 10, + order: str = "published_date", + order_direction: str = "desc", + institution: int | None = None, + published_since: str | None = None, + modified_since: str | None = None, + group: int | None = None, +) -> str: + """Search Figshare collections. + + Args: + query: Free-text search string. + private: If True, search the authenticated user's own collections (requires token). + page: Page number (starts at 1). + page_size: Results per page (1–50, default 10). + order: Sort field — e.g. published_date, modified_date, views. + order_direction: "asc" or "desc". + institution: Filter by institution ID. + published_since: ISO 8601 date string (e.g. "2023-01-01"). + modified_since: ISO 8601 date string. + group: Filter by group ID. + + Returns: + JSON string with matching collections and pagination metadata. + """ + client = FigshareClient() + page_size = clamp_page_size(page_size) + + body: dict[str, Any] = { + "page": page, + "page_size": page_size, + "order_direction": order_direction, + } + if query: + body["search_for"] = query + if institution is not None: + body["institution"] = institution + if published_since: + body["published_since"] = published_since + if modified_since: + body["modified_since"] = modified_since + if group is not None: + body["group"] = group + + if private: + if not client.has_token: + return json.dumps({"error": "FIGSHARE_TOKEN is required to search private collections"}) + # Private collections: use GET with query params (no search POST for account collections). + params = { + "page": page, + "page_size": page_size, + "order": order, + "order_direction": order_direction, + } + try: + results = await client.get("/account/collections", params=params) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) + else: + try: + results = await client.post("/collections/search", json=body) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) + + collections = results if isinstance(results, list) else [] + return json.dumps( + { + "collections": collections, + "count": len(collections), + "page": page, + "page_size": page_size, + "has_more": len(collections) == page_size, + "note": "Use page+1 to fetch the next page if has_more is true.", + }, + default=str, + ) + + +async def get_collection( + collection_id: int, + include_articles: bool = True, + articles_page: int = 1, + articles_page_size: int = 10, + private: bool = False, +) -> str: + """Get full details for a single Figshare collection, optionally including its articles. + + Args: + collection_id: Numeric Figshare collection ID. + include_articles: Also fetch the first page of articles in this collection. + articles_page: Page number for articles listing (starts at 1). + articles_page_size: Articles per page (1–50, default 10). + private: If True, fetch from the authenticated user's private collections (requires token). + + Returns: + JSON string with collection details and optionally its articles. + """ + client = FigshareClient() + + if private and not client.has_token: + return json.dumps({"error": "FIGSHARE_TOKEN is required to fetch private collection details"}) + + base_path = ( + f"/account/collections/{collection_id}" if private else f"/collections/{collection_id}" + ) + + try: + collection = await client.get(base_path) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) + + result: dict[str, Any] = {"collection": collection} + + if include_articles: + articles_page_size = clamp_page_size(articles_page_size) + try: + articles = await client.get( + f"/collections/{collection_id}/articles", + params={"page": articles_page, "page_size": articles_page_size}, + ) + result["articles"] = articles + result["articles_page"] = articles_page + result["articles_has_more"] = len(articles) == articles_page_size + except RuntimeError as exc: + result["articles_error"] = str(exc) + + return json.dumps(result, default=str) + + +async def manage_collection( + action: str, + collection_id: int | None = None, + title: str | None = None, + description: str | None = None, + articles: list[int] | None = None, + tags: list[str] | None = None, + categories: list[int] | None = None, + authors: list[dict] | None = None, + doi: str | None = None, + group_id: int | None = None, + funding: str | None = None, +) -> str: + """Create or update a Figshare collection. Requires authentication token. + + This tool never publishes — it only creates/edits drafts. + + Args: + action: "create" to create a new collection, "update" to edit an existing one. + collection_id: Required when action is "update". + title: Collection title. Required when action is "create". + description: Collection description (HTML or plain text). + articles: List of article IDs to include in the collection. + tags: List of keyword tags. + categories: List of category IDs. + authors: List of author objects. Each can be {"name": "..."} or {"id": 123}. + doi: Custom DOI. + group_id: Group ID to associate the collection with. + funding: Funding acknowledgement string. + + Returns: + JSON string with the created/updated collection details or an error message. + """ + client = FigshareClient() + + if not client.has_token: + return json.dumps({"error": "FIGSHARE_TOKEN is required to create or update collections"}) + + if action not in ("create", "update"): + return json.dumps({"error": f"Invalid action '{action}'. Use 'create' or 'update'."}) + + if action == "create" and not title: + return json.dumps({"error": "title is required when action is 'create'"}) + + if action == "update" and collection_id is None: + return json.dumps({"error": "collection_id is required when action is 'update'"}) + + # Build body with only non-None fields. + body: dict[str, Any] = {} + if title is not None: + body["title"] = title + if description is not None: + body["description"] = description + if articles is not None: + body["articles"] = articles + if tags is not None: + body["tags"] = tags + if categories is not None: + body["categories"] = categories + if authors is not None: + body["authors"] = authors + if doi is not None: + body["doi"] = doi + if group_id is not None: + body["group_id"] = group_id + if funding is not None: + body["funding"] = funding + + try: + if action == "create": + result = await client.post("/account/collections", json=body) + return json.dumps({"success": True, "action": "created", "collection": result}, default=str) + else: + await client.put(f"/account/collections/{collection_id}", json=body) + updated = await client.get(f"/account/collections/{collection_id}") + return json.dumps({"success": True, "action": "updated", "collection": updated}, default=str) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) diff --git a/mcp/figshare_mcp/tools/embargo.py b/mcp/figshare_mcp/tools/embargo.py new file mode 100644 index 00000000..76e578cc --- /dev/null +++ b/mcp/figshare_mcp/tools/embargo.py @@ -0,0 +1,124 @@ +""" +MCP tool for Figshare article embargo management. + +Tools: + manage_embargo — get, set, or remove embargo on an article +""" + +import json +from typing import Any + +from figshare_mcp.client import FigshareClient + + +async def manage_embargo( + action: str, + article_id: int | None = None, + is_embargoed: bool | None = None, + embargo_date: str | None = None, + embargo_type: str | None = None, + embargo_title: str | None = None, + embargo_reason: str | None = None, + include_institution_options: bool = False, +) -> str: + """Get, set, or remove embargo on a Figshare article. Requires authentication token. + + Embargo controls public access to an article's files until a specified date. + + Actions: + "get" — retrieve the current embargo status for an article + "set" — apply or update an embargo on an article + "remove" — lift (delete) the embargo, making the article publicly accessible immediately + "options" — list available embargo types for your institution (no article_id needed) + + Args: + action: One of "get", "set", "remove", "options". + article_id: The article to act on. Required for get/set/remove. + is_embargoed: Whether to enable the embargo. Required for "set". + embargo_date: Embargo expiry date in ISO 8601 format (e.g. "2025-12-31"). Required for "set". + embargo_type: Embargo type string (e.g. "file", "article"). Required for "set". + Use action="options" to see valid types for your institution. + embargo_title: Optional human-readable title for the embargo notice. + embargo_reason: Optional explanation shown to users who try to access embargoed content. + include_institution_options: For action="get", also fetch institution-level embargo options. + + Returns: + JSON string with the embargo details or a confirmation of the action taken. + """ + client = FigshareClient() + + if not client.has_token: + return json.dumps({"error": "FIGSHARE_TOKEN is required for all embargo operations"}) + + if action not in ("get", "set", "remove", "options"): + return json.dumps({"error": f"Invalid action '{action}'. Use 'get', 'set', 'remove', or 'options'."}) + + # Fetch institution-level embargo options (no article needed). + if action == "options": + try: + options = await client.get("/account/institution/embargo_options") + return json.dumps({"embargo_options": options}, default=str) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) + + # All other actions require an article_id. + if article_id is None: + return json.dumps({"error": f"article_id is required for action '{action}'"}) + + if action == "get": + try: + embargo = await client.get(f"/account/articles/{article_id}/embargo") + result: dict[str, Any] = {"article_id": article_id, "embargo": embargo} + if include_institution_options: + try: + result["institution_options"] = await client.get( + "/account/institution/embargo_options" + ) + except RuntimeError as exc: + result["institution_options_error"] = str(exc) + return json.dumps(result, default=str) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) + + if action == "set": + if is_embargoed is None: + return json.dumps({"error": "is_embargoed is required for action 'set'"}) + if not embargo_date: + return json.dumps({"error": "embargo_date is required for action 'set' (ISO 8601, e.g. '2025-12-31')"}) + if not embargo_type: + return json.dumps({"error": "embargo_type is required for action 'set'. Use action='options' to see valid types."}) + + body: dict[str, Any] = { + "is_embargoed": is_embargoed, + "embargo_date": embargo_date, + "embargo_type": embargo_type, + } + if embargo_title is not None: + body["embargo_title"] = embargo_title + if embargo_reason is not None: + body["embargo_reason"] = embargo_reason + + try: + await client.put(f"/account/articles/{article_id}/embargo", json=body) + # Fetch the updated embargo to confirm. + updated = await client.get(f"/account/articles/{article_id}/embargo") + return json.dumps( + {"success": True, "action": "embargo_set", "article_id": article_id, "embargo": updated}, + default=str, + ) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) + + # action == "remove" + try: + await client.delete(f"/account/articles/{article_id}/embargo") + return json.dumps( + { + "success": True, + "action": "embargo_removed", + "article_id": article_id, + "note": "The embargo has been lifted. The article is now publicly accessible.", + } + ) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) diff --git a/mcp/figshare_mcp/tools/projects.py b/mcp/figshare_mcp/tools/projects.py new file mode 100644 index 00000000..b7218781 --- /dev/null +++ b/mcp/figshare_mcp/tools/projects.py @@ -0,0 +1,103 @@ +""" +MCP tool for Figshare projects. + +Tools: + get_projects — list or retrieve a specific project (public or private) +""" + +import json +from typing import Any + +from figshare_mcp.client import FigshareClient, clamp_page_size + + +async def get_projects( + project_id: int | None = None, + private: bool = False, + page: int = 1, + page_size: int = 10, + order: str = "published_date", + order_direction: str = "desc", + institution: int | None = None, + group: int | None = None, + storage: str | None = None, + roles: str | None = None, +) -> str: + """List Figshare projects or get details of a specific project. + + Args: + project_id: If provided, returns details for that specific project. + If omitted, returns a paginated list of projects. + private: If True, lists/fetches from the authenticated user's own projects (requires token). + page: Page number (starts at 1). Used only when project_id is not provided. + page_size: Results per page (1–50, default 10). + order: Sort field — e.g. published_date, modified_date. + order_direction: "asc" or "desc". + institution: Filter by institution ID (public listing only). + group: Filter by group ID (public listing only). + storage: Filter by storage type ("individual" or "group") — private only. + roles: Filter by role ("viewer", "collaborator", "owner") — private only. + + Returns: + JSON string with project(s) details and pagination metadata. + """ + client = FigshareClient() + + if private and not client.has_token: + return json.dumps({"error": "FIGSHARE_TOKEN is required to access private projects"}) + + # Fetch a single project by ID. + if project_id is not None: + path = f"/account/projects/{project_id}" if private else f"/projects/{project_id}" + try: + project = await client.get(path) + return json.dumps({"project": project}, default=str) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) + + # List projects. + page_size = clamp_page_size(page_size) + + if private: + params: dict[str, Any] = { + "page": page, + "page_size": page_size, + "order": order, + "order_direction": order_direction, + } + if storage: + params["storage"] = storage + if roles: + params["roles"] = roles + try: + results = await client.get("/account/projects", params=params) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) + else: + params = { + "page": page, + "page_size": page_size, + "order": order, + "order_direction": order_direction, + } + if institution is not None: + params["institution"] = institution + if group is not None: + params["group"] = group + try: + results = await client.get("/projects", params=params) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) + + projects = results if isinstance(results, list) else [] + return json.dumps( + { + "projects": projects, + "count": len(projects), + "page": page, + "page_size": page_size, + "has_more": len(projects) == page_size, + "note": "Use page+1 to fetch the next page if has_more is true.", + }, + default=str, + ) diff --git a/mcp/pyproject.toml b/mcp/pyproject.toml new file mode 100644 index 00000000..5f842334 --- /dev/null +++ b/mcp/pyproject.toml @@ -0,0 +1,28 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "figshare-mcp" +version = "0.1.0" +description = "MCP server for the Figshare API" +requires-python = ">=3.11" +dependencies = [ + "mcp[cli]>=1.0.0", + "httpx>=0.27.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0", + "pytest-asyncio>=0.23.0", +] + +[project.scripts] +figshare-mcp = "figshare_mcp.server:main" + +[tool.hatch.build.targets.wheel] +packages = ["figshare_mcp"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" diff --git a/mcp/tests/__init__.py b/mcp/tests/__init__.py new file mode 100644 index 00000000..8e08d8ea --- /dev/null +++ b/mcp/tests/__init__.py @@ -0,0 +1 @@ +"""Integration tests for figshare-mcp — require a running local Figshare instance.""" diff --git a/mcp/tests/conftest.py b/mcp/tests/conftest.py new file mode 100644 index 00000000..fb8f1cc3 --- /dev/null +++ b/mcp/tests/conftest.py @@ -0,0 +1,37 @@ +""" +Pytest configuration for figshare-mcp integration tests. + +Required environment variables before running: + FIGSHARE_TOKEN — personal access token for a test account on the local instance + FIGSHARE_BASE_URL — e.g. http://localhost:8080/v2 + +Run with: + FIGSHARE_TOKEN=xxx FIGSHARE_BASE_URL=http://localhost:8080/v2 pytest mcp/tests/ -v +""" + +import os + +import pytest + + +def pytest_configure(config): + """Register custom markers.""" + config.addinivalue_line("markers", "requires_token: test requires FIGSHARE_TOKEN to be set") + config.addinivalue_line("markers", "write: test creates or modifies data on the instance") + + +def pytest_runtest_setup(item): + """Skip token-required tests if no token is configured.""" + if item.get_closest_marker("requires_token"): + if not os.getenv("FIGSHARE_TOKEN"): + pytest.skip("FIGSHARE_TOKEN not set — skipping authenticated test") + + +@pytest.fixture(scope="session") +def base_url() -> str: + return os.getenv("FIGSHARE_BASE_URL", "https://api.figshare.com/v2") + + +@pytest.fixture(scope="session") +def has_token() -> bool: + return bool(os.getenv("FIGSHARE_TOKEN")) diff --git a/mcp/tests/test_account.py b/mcp/tests/test_account.py new file mode 100644 index 00000000..a410ee25 --- /dev/null +++ b/mcp/tests/test_account.py @@ -0,0 +1,77 @@ +"""Integration tests for account info tools.""" + +import json + +import pytest + +from figshare_mcp.tools.account import get_account_info + + +class TestGetAccountInfo: + async def test_licenses_always_accessible(self): + result = json.loads( + await get_account_info( + include_profile=False, + include_licenses=True, + include_categories=False, + ) + ) + assert "licenses" in result + assert isinstance(result["licenses"], list) + assert len(result["licenses"]) > 0 + + async def test_categories_always_accessible(self): + result = json.loads( + await get_account_info( + include_profile=False, + include_licenses=False, + include_categories=True, + ) + ) + assert "categories" in result + assert isinstance(result["categories"], list) + + async def test_profile_without_token_returns_error(self, monkeypatch): + monkeypatch.delenv("FIGSHARE_TOKEN", raising=False) + result = json.loads( + await get_account_info( + include_profile=True, + include_licenses=False, + include_categories=False, + ) + ) + # Profile fetch should fail, others should succeed or be absent. + assert "errors" in result + assert "FIGSHARE_TOKEN" in result["errors"].get("profile", "") + + +@pytest.mark.requires_token +class TestGetAccountInfoAuthenticated: + async def test_profile_with_token(self): + result = json.loads( + await get_account_info( + include_profile=True, + include_licenses=False, + include_categories=False, + ) + ) + assert "profile" in result + assert "errors" not in result or "profile" not in result.get("errors", {}) + + async def test_all_sections(self): + result = json.loads(await get_account_info()) + assert "profile" in result + assert "licenses" in result + assert "categories" in result + + async def test_embargo_options_with_token(self): + result = json.loads( + await get_account_info( + include_profile=False, + include_licenses=False, + include_categories=False, + include_embargo_options=True, + ) + ) + # May be empty list if institution has no custom options — should not be an error. + assert "embargo_options" in result or "errors" in result diff --git a/mcp/tests/test_articles.py b/mcp/tests/test_articles.py new file mode 100644 index 00000000..6e55567a --- /dev/null +++ b/mcp/tests/test_articles.py @@ -0,0 +1,147 @@ +"""Integration tests for article tools.""" + +import json +import os + +import pytest + +from figshare_mcp.tools.articles import get_article, manage_article, search_articles + + +class TestSearchArticles: + """Public article search — no token required.""" + + async def test_basic_search_returns_results(self): + result = json.loads(await search_articles(query="data")) + assert "articles" in result + assert isinstance(result["articles"], list) + assert "count" in result + assert "has_more" in result + + async def test_empty_query_returns_results(self): + result = json.loads(await search_articles()) + assert "articles" in result + + async def test_page_size_is_clamped_to_50(self): + result = json.loads(await search_articles(page_size=999)) + # We asked for 999 but should get at most 50 per page. + assert result["page_size"] == 50 + + async def test_page_size_minimum_is_1(self): + result = json.loads(await search_articles(page_size=0)) + assert result["page_size"] == 1 + + async def test_pagination_metadata(self): + page1 = json.loads(await search_articles(page=1, page_size=3)) + assert page1["page"] == 1 + assert page1["page_size"] == 3 + + async def test_order_direction_desc(self): + result = json.loads(await search_articles(order_direction="desc", page_size=5)) + assert "articles" in result + + async def test_private_search_without_token_returns_error(self, monkeypatch): + monkeypatch.delenv("FIGSHARE_TOKEN", raising=False) + result = json.loads(await search_articles(private=True)) + assert "error" in result + assert "FIGSHARE_TOKEN" in result["error"] + + +@pytest.mark.requires_token +class TestSearchArticlesPrivate: + """Private article search — requires FIGSHARE_TOKEN.""" + + async def test_private_search_returns_results(self): + result = json.loads(await search_articles(private=True)) + assert "articles" in result + assert "error" not in result + + +class TestGetArticle: + """Public article retrieval — no token required for public articles.""" + + async def test_get_nonexistent_article_returns_error(self): + result = json.loads(await get_article(article_id=999999999)) + assert "error" in result + assert "not found" in result["error"].lower() + + async def test_get_article_without_files_or_versions(self): + # Fetch first available article to get a real ID. + search = json.loads(await search_articles(page_size=1)) + if not search["articles"]: + pytest.skip("No public articles available on this instance") + + article_id = search["articles"][0]["id"] + result = json.loads( + await get_article(article_id=article_id, include_files=False, include_versions=False) + ) + assert "article" in result + assert "files" not in result + assert "versions" not in result + + async def test_get_article_with_files_and_versions(self): + search = json.loads(await search_articles(page_size=1)) + if not search["articles"]: + pytest.skip("No public articles available on this instance") + + article_id = search["articles"][0]["id"] + result = json.loads(await get_article(article_id=article_id)) + assert "article" in result + assert "files" in result + assert "versions" in result + + +@pytest.mark.requires_token +@pytest.mark.write +class TestManageArticle: + """Article create/update — requires FIGSHARE_TOKEN, creates data.""" + + async def test_create_article_without_title_returns_error(self): + result = json.loads(await manage_article(action="create")) + assert "error" in result + assert "title" in result["error"] + + async def test_create_article_invalid_action_returns_error(self): + result = json.loads(await manage_article(action="publish", title="Test")) + assert "error" in result + assert "Invalid action" in result["error"] + + async def test_update_article_without_id_returns_error(self): + result = json.loads(await manage_article(action="update", title="Test")) + assert "error" in result + assert "article_id" in result["error"] + + async def test_create_and_update_draft_article(self): + # Create a draft. + create_result = json.loads( + await manage_article( + action="create", + title="MCP Integration Test Article", + description="Created by figshare-mcp integration test — safe to delete.", + tags=["mcp-test", "integration-test"], + ) + ) + assert create_result.get("success") is True, f"Create failed: {create_result}" + assert "article" in create_result + + article_id = ( + create_result["article"].get("id") + or create_result["article"].get("entity_id") + ) + assert article_id, "No article ID returned after create" + + # Update the draft. + update_result = json.loads( + await manage_article( + action="update", + article_id=article_id, + title="MCP Integration Test Article (updated)", + ) + ) + assert update_result.get("success") is True, f"Update failed: {update_result}" + + async def test_create_article_without_token_returns_error(self, monkeypatch): + monkeypatch.delenv("FIGSHARE_TOKEN", raising=False) + result = json.loads(await manage_article(action="create", title="Test")) + assert "error" in result + assert "FIGSHARE_TOKEN" in result["error"] diff --git a/mcp/tests/test_collections.py b/mcp/tests/test_collections.py new file mode 100644 index 00000000..257b5b34 --- /dev/null +++ b/mcp/tests/test_collections.py @@ -0,0 +1,98 @@ +"""Integration tests for collection tools.""" + +import json + +import pytest + +from figshare_mcp.tools.collections import get_collection, manage_collection, search_collections + + +class TestSearchCollections: + async def test_basic_search_returns_results(self): + result = json.loads(await search_collections()) + assert "collections" in result + assert isinstance(result["collections"], list) + + async def test_page_size_clamped(self): + result = json.loads(await search_collections(page_size=999)) + assert result["page_size"] == 50 + + async def test_private_without_token_returns_error(self, monkeypatch): + monkeypatch.delenv("FIGSHARE_TOKEN", raising=False) + result = json.loads(await search_collections(private=True)) + assert "error" in result + assert "FIGSHARE_TOKEN" in result["error"] + + +@pytest.mark.requires_token +class TestSearchCollectionsPrivate: + async def test_private_collections(self): + result = json.loads(await search_collections(private=True)) + assert "collections" in result + assert "error" not in result + + +class TestGetCollection: + async def test_nonexistent_collection_returns_error(self): + result = json.loads(await get_collection(collection_id=999999999)) + assert "error" in result + + async def test_get_collection_with_articles(self): + search = json.loads(await search_collections(page_size=1)) + if not search["collections"]: + pytest.skip("No public collections available on this instance") + + cid = search["collections"][0]["id"] + result = json.loads(await get_collection(collection_id=cid)) + assert "collection" in result + assert "articles" in result + + async def test_get_collection_without_articles(self): + search = json.loads(await search_collections(page_size=1)) + if not search["collections"]: + pytest.skip("No public collections available on this instance") + + cid = search["collections"][0]["id"] + result = json.loads(await get_collection(collection_id=cid, include_articles=False)) + assert "collection" in result + assert "articles" not in result + + +@pytest.mark.requires_token +@pytest.mark.write +class TestManageCollection: + async def test_create_without_title_returns_error(self): + result = json.loads(await manage_collection(action="create")) + assert "error" in result + assert "title" in result["error"] + + async def test_update_without_id_returns_error(self): + result = json.loads(await manage_collection(action="update")) + assert "error" in result + assert "collection_id" in result["error"] + + async def test_create_and_update_collection(self): + create_result = json.loads( + await manage_collection( + action="create", + title="MCP Integration Test Collection", + description="Created by figshare-mcp integration test — safe to delete.", + tags=["mcp-test"], + ) + ) + assert create_result.get("success") is True, f"Create failed: {create_result}" + + cid = ( + create_result["collection"].get("id") + or create_result["collection"].get("entity_id") + ) + assert cid + + update_result = json.loads( + await manage_collection( + action="update", + collection_id=cid, + title="MCP Integration Test Collection (updated)", + ) + ) + assert update_result.get("success") is True, f"Update failed: {update_result}" diff --git a/mcp/tests/test_embargo.py b/mcp/tests/test_embargo.py new file mode 100644 index 00000000..c79c0e38 --- /dev/null +++ b/mcp/tests/test_embargo.py @@ -0,0 +1,120 @@ +"""Integration tests for embargo management tool.""" + +import json + +import pytest + +from figshare_mcp.tools.embargo import manage_embargo + + +class TestManageEmbargoValidation: + """Validation tests — do not require a token or live instance.""" + + async def test_no_token_returns_error(self, monkeypatch): + monkeypatch.delenv("FIGSHARE_TOKEN", raising=False) + result = json.loads(await manage_embargo(action="get", article_id=1)) + assert "error" in result + assert "FIGSHARE_TOKEN" in result["error"] + + async def test_invalid_action_returns_error(self, monkeypatch): + monkeypatch.setenv("FIGSHARE_TOKEN", "fake-token") + result = json.loads(await manage_embargo(action="delete", article_id=1)) + assert "error" in result + assert "Invalid action" in result["error"] + + async def test_set_without_is_embargoed_returns_error(self, monkeypatch): + monkeypatch.setenv("FIGSHARE_TOKEN", "fake-token") + result = json.loads( + await manage_embargo(action="set", article_id=1, embargo_date="2025-12-31", embargo_type="file") + ) + assert "error" in result + assert "is_embargoed" in result["error"] + + async def test_set_without_embargo_date_returns_error(self, monkeypatch): + monkeypatch.setenv("FIGSHARE_TOKEN", "fake-token") + result = json.loads( + await manage_embargo(action="set", article_id=1, is_embargoed=True, embargo_type="file") + ) + assert "error" in result + assert "embargo_date" in result["error"] + + async def test_set_without_embargo_type_returns_error(self, monkeypatch): + monkeypatch.setenv("FIGSHARE_TOKEN", "fake-token") + result = json.loads( + await manage_embargo(action="set", article_id=1, is_embargoed=True, embargo_date="2025-12-31") + ) + assert "error" in result + assert "embargo_type" in result["error"] + + async def test_get_without_article_id_returns_error(self, monkeypatch): + monkeypatch.setenv("FIGSHARE_TOKEN", "fake-token") + result = json.loads(await manage_embargo(action="get")) + assert "error" in result + assert "article_id" in result["error"] + + +@pytest.mark.requires_token +class TestManageEmbargoLive: + """Live embargo tests against the local Figshare instance.""" + + async def test_get_embargo_options(self): + result = json.loads(await manage_embargo(action="options")) + # Personal accounts (not belonging to an institution) get a 400 from this endpoint. + # Both are valid outcomes — we just verify no unexpected exception is raised. + assert "embargo_options" in result or "error" in result + + async def test_get_nonexistent_article_embargo_returns_error(self): + result = json.loads(await manage_embargo(action="get", article_id=999999999)) + assert "error" in result + + +@pytest.mark.requires_token +@pytest.mark.write +class TestManageEmbargoWriteLive: + """End-to-end embargo lifecycle — creates a draft article, applies and removes embargo.""" + + async def test_embargo_lifecycle(self): + from figshare_mcp.tools.articles import manage_article + + # 1. Create a test draft article. + create_result = json.loads( + await manage_article( + action="create", + title="MCP Embargo Lifecycle Test", + description="Created by figshare-mcp embargo test — safe to delete.", + ) + ) + assert create_result.get("success"), f"Article create failed: {create_result}" + article_id = ( + create_result["article"].get("id") + or create_result["article"].get("entity_id") + ) + assert article_id, "No article ID returned" + + # 2. Fetch embargo options to get a valid type (may not be available for personal accounts). + options_result = json.loads(await manage_embargo(action="options")) + embargo_options = options_result.get("embargo_options", []) + embargo_type = embargo_options[0].get("type", "file") if embargo_options else "file" + + # 3. Set embargo — use a date within the API's 25-year limit. + set_result = json.loads( + await manage_embargo( + action="set", + article_id=article_id, + is_embargoed=True, + embargo_date="2030-12-31", + embargo_type=embargo_type, + embargo_title="Test embargo", + embargo_reason="Integration test", + ) + ) + assert set_result.get("success"), f"Embargo set failed: {set_result}" + assert set_result["embargo"].get("is_embargoed") is True + + # 4. Get embargo to verify. + get_result = json.loads(await manage_embargo(action="get", article_id=article_id)) + assert get_result["embargo"].get("is_embargoed") is True + + # 5. Remove embargo. + remove_result = json.loads(await manage_embargo(action="remove", article_id=article_id)) + assert remove_result.get("success"), f"Embargo remove failed: {remove_result}" diff --git a/mcp/tests/test_projects.py b/mcp/tests/test_projects.py new file mode 100644 index 00000000..51f5c78b --- /dev/null +++ b/mcp/tests/test_projects.py @@ -0,0 +1,46 @@ +"""Integration tests for project tools.""" + +import json + +import pytest + +from figshare_mcp.tools.projects import get_projects + + +class TestGetProjects: + async def test_list_public_projects(self): + result = json.loads(await get_projects()) + assert "projects" in result + assert isinstance(result["projects"], list) + + async def test_page_size_clamped(self): + result = json.loads(await get_projects(page_size=999)) + assert result["page_size"] == 50 + + async def test_nonexistent_project_returns_error(self): + result = json.loads(await get_projects(project_id=999999999)) + assert "error" in result + + async def test_private_without_token_returns_error(self, monkeypatch): + monkeypatch.delenv("FIGSHARE_TOKEN", raising=False) + result = json.loads(await get_projects(private=True)) + assert "error" in result + assert "FIGSHARE_TOKEN" in result["error"] + + async def test_get_specific_public_project(self): + listing = json.loads(await get_projects(page_size=1)) + if not listing["projects"]: + pytest.skip("No public projects available on this instance") + + pid = listing["projects"][0]["id"] + result = json.loads(await get_projects(project_id=pid)) + assert "project" in result + assert result["project"]["id"] == pid + + +@pytest.mark.requires_token +class TestGetProjectsPrivate: + async def test_list_private_projects(self): + result = json.loads(await get_projects(private=True)) + assert "projects" in result + assert "error" not in result