diff --git a/.gitignore b/.gitignore index 3052861..59c8b07 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,9 @@ exports/worlds/*.csv # Auto-created models (from create_model tool) models/_created_*.nlogox + +# Claude Code session scratch (worktrees, session state) +.claude/ + +# COMSES download cache — live data, not source +models/comses/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 57d80b2..e41de62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,32 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added — CoMSES Net integration + +- 5 new tools for exploring the CoMSES Net computational model library: + `search_comses`, `get_comses_model`, `download_comses_model`, + `open_comses_model`, `read_comses_files`. +- 1 new prompt: `explore_comses` — NetLogo-first, source-introspection, + never fabricates commands, stops-and-asks on runtime errors. +- Safe download pipeline: HEAD screen + mid-stream byte cap + (`COMSES_MAX_DOWNLOAD_MB`, default 50), zip-member path-traversal + validation, zip-bomb guard, atomic temp-to-final extract with + `.comses_complete` marker, race reconciliation. +- `"latest"` version resolution with snapshot semantics — resolved to + a concrete version before any cache path is computed; the resolved + version is returned so follow-up reads stay pinned. +- `read_comses_files` returns a precise contract with per-file + `{content, full_size, returned_size, truncated}`, priority ordering + (ODD → NetLogo → other code → md/txt), byte cap with line-boundary + truncation, UTF-8 decoding with `errors="replace"`, zero-match case + handled explicitly. +- `httpx>=0.27` dependency. +- 44 new tests covering retry matrix, zip-slip, zip-bomb, marker, + race-orphan, NetLogo-file selection rule, ODD discovery, cache + reuse, latest resolution, truncation, extension filters, prompt rules. + ## [0.1.0] - 2025-02-23 ### Added diff --git a/README.md b/README.md index da63969..03a5768 100644 --- a/README.md +++ b/README.md @@ -59,8 +59,27 @@ By default, a real NetLogo window opens so you can watch your simulations run li | `save_model(name, code)` | Save model to file | | `export_world()` | Export full world state to CSV | | `list_models()` | List model files in models directory | +| `search_comses(query)` | Search the CoMSES Net model library | +| `get_comses_model(uuid)` | Fetch metadata + citation text for one COMSES model | +| `download_comses_model(uuid)` | Safely download + extract a COMSES archive | +| `open_comses_model(uuid)` | Download (or reuse cache) and load NetLogo models | +| `read_comses_files(uuid)` | Read ODD / source contents from a downloaded model | -Plus 3 resources (primitives reference, programming guide, model source) and 3 prompts (`analyze_model`, `create_abm`, `parameter_sweep`). +Plus 3 resources (primitives reference, programming guide, model source) and 4 prompts (`analyze_model`, `create_abm`, `parameter_sweep`, `explore_comses`). + +### CoMSES Net integration + +NetLogo MCP can search and safely fetch any model from the [CoMSES Net computational model library](https://www.comses.net/) — the largest peer-reviewed ABM repository. NetLogo models load automatically; Python / R / Julia models are identified and cached locally so you can inspect their source and ODD documentation from any MCP client, including clients with no filesystem tools. + +Try it with the `explore_comses` prompt or just ask: *"Find me a predator-prey ABM on COMSES and run a short baseline."* + +Safety properties (applied to every download): +- Archives streamed with a hard byte cap (`COMSES_MAX_DOWNLOAD_MB`, default 50 MB) enforced mid-stream, not just via HEAD. +- Every zip member is path-traversal-validated before extraction. +- Zip-bomb refusal on uncompressed-size overflow. +- Extraction is atomic: downloads land in a temp dir first, then move to the cache only on success. +- Cache directories are trusted only when they carry the `.comses_complete` marker. +- `"latest"` is resolved to a concrete version before any cache path is computed; the resolved version is returned to the AI so follow-up reads stay pinned to the same slot. ## Prerequisites @@ -163,6 +182,7 @@ JAVA_HOME=C:/Program Files/Eclipse Adoptium/jdk-25.0.2.10-hotspot | `NETLOGO_MODELS_DIR` | No | Directory for model files (defaults to `./models`) | | `NETLOGO_GUI` | No | `"true"` (default) for live GUI window, `"false"` for headless | | `NETLOGO_EXPORTS_DIR` | No | Directory for exported views/worlds (defaults to `./exports`) | +| `COMSES_MAX_DOWNLOAD_MB` | No | Max CoMSES archive size in MB (default 50). Enforced mid-stream. | ## Client Setup diff --git a/docs/DEVELOPMENT.md b/docs/DEVELOPMENT.md index 631e5c3..2e0d57c 100644 --- a/docs/DEVELOPMENT.md +++ b/docs/DEVELOPMENT.md @@ -16,9 +16,10 @@ NetLogo_MCP/ ├── src/ │ └── netlogo_mcp/ │ ├── server.py # FastMCP app, stdout protection, lifespan -│ ├── tools.py # All 12 tools +│ ├── tools.py # All 17 tools (12 NetLogo + 5 CoMSES) +│ ├── comses.py # CoMSES Net API client + safe zip extract │ ├── resources.py # 3 resources (docs + model source) -│ ├── prompts.py # 3 prompts (analyze, create, sweep) +│ ├── prompts.py # 4 prompts (analyze, create, sweep, explore_comses) │ ├── config.py # Environment variable loading │ ├── py.typed # PEP 561 type marker │ └── data/ @@ -30,7 +31,9 @@ NetLogo_MCP/ ├── conftest.py # Mock fixtures (no JVM needed) ├── test_server.py ├── test_tools.py - └── test_resources.py + ├── test_comses.py # CoMSES integration: API, zip safety, tools, prompt + ├── test_resources.py + └── fixtures/comses/ # Captured JSON fixtures for CoMSES tests ``` ## Tech Stack diff --git a/pyproject.toml b/pyproject.toml index d1bdc8b..c6da991 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "pandas", "numpy", "python-dotenv", + "httpx>=0.27", ] [project.optional-dependencies] diff --git a/src/netlogo_mcp/comses.py b/src/netlogo_mcp/comses.py new file mode 100644 index 0000000..0df8466 --- /dev/null +++ b/src/netlogo_mcp/comses.py @@ -0,0 +1,793 @@ +"""CoMSES Net API client + safe archive handling. + +Pure HTTP + filesystem logic. No MCP coupling. Easy to unit test with +`httpx.MockTransport`. + +Covered here: +- `ComsesClient`: async HTTP client for search, metadata, HEAD, and stream download. +- Retry policy: only on 502/503/504/network errors; never on 4xx. +- Safe zip extraction: path validation, zip-bomb guard, atomic temp-to-final move. +- Completion marker: `.comses_complete` written on success; cache only trusted if present. +- `resolve_latest`: turns "latest" into a concrete version string before any cache path. +- Language detection from `codemeta.json` with extension fallback. +- Deterministic NetLogo-file selection rule (Section 4.4 of COMSES_PLAN.md). + +Everything that can be called from a tool is async. +""" + +from __future__ import annotations + +import asyncio +import json +import os +import shutil +import zipfile +from dataclasses import dataclass +from pathlib import Path, PurePosixPath + +import httpx + +BASE_URL = "https://www.comses.net" + +# ── Retry / network policy ──────────────────────────────────────────────────── + +RETRY_STATUSES = frozenset({502, 503, 504}) +MAX_RETRIES = 2 +RETRY_BACKOFF_SECONDS = (1.0, 2.0) # len must equal MAX_RETRIES +DEFAULT_TIMEOUT = httpx.Timeout(30.0, connect=10.0) + +# Retriable network exception types (from httpx). +_RETRIABLE_EXCS: tuple[type[Exception], ...] = ( + httpx.ConnectError, + httpx.ReadError, + httpx.ReadTimeout, + httpx.ConnectTimeout, + httpx.WriteError, +) + + +class ComsesError(Exception): + """Any COMSES client or extraction failure.""" + + +class ComsesHTTPError(ComsesError): + """HTTP-level error against the COMSES API (non-retriable 4xx, or retries exhausted).""" + + def __init__(self, message: str, status: int | None = None) -> None: + super().__init__(message) + self.status = status + + +class ComsesSafetyError(ComsesError): + """A zip member failed safety validation (traversal, absolute path, zip bomb).""" + + +# ── Client ──────────────────────────────────────────────────────────────────── + + +@dataclass +class StreamResult: + """Result of a streamed download to disk.""" + + path: Path + bytes_written: int + content_type: str + + +class ComsesClient: + """Async client for the CoMSES Net HTTP API. + + Intentionally thin. Parsing / shape-shifting is done by callers, not here. + The only intelligence this class owns is: + - retry rules (per `_request_with_retry`) + - stream-time byte cap (per `stream_download`) + - the right `Accept`/query-string dance for JSON endpoints + """ + + BASE_URL: str = BASE_URL + + def __init__( + self, + client: httpx.AsyncClient | None = None, + *, + base_url: str | None = None, + ) -> None: + self._base_url = (base_url or self.BASE_URL).rstrip("/") + self._owned = client is None + self._client = client or httpx.AsyncClient( + timeout=DEFAULT_TIMEOUT, + follow_redirects=True, + headers={ + "User-Agent": "NetLogo-MCP/0.1 (+https://github.com/Razee4315/NetLogo-MCP)" + }, + ) + + async def aclose(self) -> None: + if self._owned: + await self._client.aclose() + + async def __aenter__(self) -> ComsesClient: + return self + + async def __aexit__(self, *exc_info: object) -> None: + await self.aclose() + + # ── Retry core ─────────────────────────────────────────────────────────── + + async def _request_with_retry( + self, + method: str, + url: str, + *, + max_retries: int = MAX_RETRIES, + **kwargs: object, + ) -> httpx.Response: + """Execute an HTTP request with the documented retry policy. + + Retriable: + - HTTP 502 / 503 / 504 + - httpx.ConnectError / ReadError / ReadTimeout / ConnectTimeout / WriteError + + Not retriable: + - Any 4xx (client-side / missing data — retry won't change it) + - Any 2xx (even with unexpected Content-Type — caller decides) + - Any other 5xx (500 etc. — caller decides) + """ + last_exc: Exception | None = None + for attempt in range(max_retries + 1): + try: + resp = await self._client.request(method, url, **kwargs) # type: ignore[arg-type] + except _RETRIABLE_EXCS as exc: + last_exc = exc + if attempt >= max_retries: + raise ComsesHTTPError( + f"Network error after {max_retries + 1} attempts: {exc!r}" + ) from exc + await asyncio.sleep(RETRY_BACKOFF_SECONDS[attempt]) + continue + + if resp.status_code in RETRY_STATUSES and attempt < max_retries: + await asyncio.sleep(RETRY_BACKOFF_SECONDS[attempt]) + continue + + return resp + + # Defensive: loop exited without returning or raising. + raise ComsesHTTPError( + f"Exhausted retries: {last_exc!r}" if last_exc else "Exhausted retries" + ) + + # ── JSON endpoints ─────────────────────────────────────────────────────── + + async def search(self, query: str = "", page: int = 1) -> dict: + """Paginated codebase search. `query=""` browses everything.""" + params: dict[str, str | int] = {"format": "json", "page": page} + if query: + params["query"] = query + resp = await self._request_with_retry( + "GET", f"{self._base_url}/codebases/", params=params + ) + return _json_or_raise(resp, "search") + + async def get_codebase(self, identifier: str) -> dict: + resp = await self._request_with_retry( + "GET", + f"{self._base_url}/codebases/{identifier}/", + params={"format": "json"}, + ) + return _json_or_raise(resp, "get_codebase") + + async def get_release(self, identifier: str, version: str) -> dict: + resp = await self._request_with_retry( + "GET", + f"{self._base_url}/codebases/{identifier}/releases/{version}/", + params={"format": "json"}, + ) + return _json_or_raise(resp, "get_release") + + async def resolve_latest(self, identifier: str, version: str) -> str: + """Turn "latest" into a concrete version string; passthrough otherwise. + + Snapshot semantics: this resolution is pinned for the current call. + If the author publishes a newer version mid-download, the in-flight + operation stays on the version resolved here; the next caller asking + for "latest" will resolve again and may get a newer one. + """ + if version != "latest": + return version + data = await self.get_codebase(identifier) + latest = data.get("latestVersionNumber") + if not latest: + raise ComsesError( + f"Codebase {identifier} has no latestVersionNumber — " + "cannot resolve 'latest'." + ) + return str(latest) + + # ── Download (HEAD + streamed body) ────────────────────────────────────── + + def _download_url(self, identifier: str, version: str) -> str: + """Build a UUID-based download URL (fallback path). + + Real COMSES requires the numeric codebase ID (from the release's + `absoluteUrl`) for /download/ to return a zip — UUID 404s. Prefer + `resolve_download_url` over this when you have release metadata. + """ + return f"{self._base_url}/codebases/{identifier}/releases/{version}/download/" + + async def resolve_download_url(self, identifier: str, version: str) -> str: + """Return the correct /download/ URL for a release. + + COMSES accepts the UUID for metadata endpoints but requires the + numeric codebase id for the download endpoint. Fetch the release + once and build the URL from its `absoluteUrl` (e.g. + `/codebases/5445/releases/2.0.0/`). + """ + rel = await self.get_release(identifier, version) + abs_url = rel.get("absoluteUrl") + if isinstance(abs_url, str) and abs_url.startswith("/"): + return f"{self._base_url}{abs_url.rstrip('/')}/download/" + # Fallback to UUID path if the field is missing — may 404. + return self._download_url(identifier, version) + + async def head_download( + self, identifier: str, version: str, *, url: str | None = None + ) -> tuple[int | None, str]: + """Return (content_length_or_None, content_type). + + `Content-Length` is advisory — some responses omit it. The real cap is + enforced at stream time. Pass `url` to skip URL resolution when the + caller already knows the correct /download/ path. + """ + target = url or self._download_url(identifier, version) + resp = await self._request_with_retry("HEAD", target) + if resp.status_code >= 400: + raise ComsesHTTPError( + f"HEAD download failed: HTTP {resp.status_code}", + status=resp.status_code, + ) + cl = resp.headers.get("Content-Length") + try: + length = int(cl) if cl is not None else None + except ValueError: + length = None + return length, resp.headers.get("Content-Type", "") + + async def stream_download( + self, + identifier: str, + version: str, + dest: Path, + *, + max_bytes: int, + url: str | None = None, + ) -> StreamResult: + """Stream the archive to `dest`, aborting if it exceeds `max_bytes`. + + Returns a `StreamResult`. Raises `ComsesHTTPError` if the response is + non-zip, non-2xx, or the stream exceeds the cap. On abort, the partial + file is deleted. Pass `url` to override the default UUID-based path + (real COMSES requires the numeric codebase id in /download/). + """ + dest.parent.mkdir(parents=True, exist_ok=True) + if url is None: + url = self._download_url(identifier, version) + + # Streaming uses the underlying client directly. We still apply the + # retry policy for the initial response; an interrupted in-flight + # stream is *not* retried (partial bytes already written). + last_exc: Exception | None = None + for attempt in range(MAX_RETRIES + 1): + try: + async with self._client.stream("GET", url) as resp: + if resp.status_code in RETRY_STATUSES and attempt < MAX_RETRIES: + await asyncio.sleep(RETRY_BACKOFF_SECONDS[attempt]) + continue + if resp.status_code >= 400: + raise ComsesHTTPError( + f"Download failed: HTTP {resp.status_code}", + status=resp.status_code, + ) + + content_type = resp.headers.get("Content-Type", "") + if "zip" not in content_type.lower(): + # COMSES sometimes returns an HTML interstitial instead + # of a zip. Treat that as a hard failure — retry won't + # fix a policy-level interstitial. + raise ComsesHTTPError( + f"Expected zip, got Content-Type={content_type!r}. " + "You may need to download this archive manually." + ) + + written = 0 + try: + with dest.open("wb") as fh: + async for chunk in resp.aiter_bytes(): + if not chunk: + continue + written += len(chunk) + if written > max_bytes: + # Abort the stream and delete partial file. + raise ComsesHTTPError( + f"Download exceeded cap ({max_bytes} bytes) " + f"at {written} bytes — aborted." + ) + fh.write(chunk) + except Exception: + # Any failure mid-stream: clean up the partial file. + try: + dest.unlink(missing_ok=True) + except OSError: + pass + raise + + return StreamResult( + path=dest, bytes_written=written, content_type=content_type + ) + except _RETRIABLE_EXCS as exc: + last_exc = exc + if attempt >= MAX_RETRIES: + raise ComsesHTTPError( + f"Network error streaming download: {exc!r}" + ) from exc + await asyncio.sleep(RETRY_BACKOFF_SECONDS[attempt]) + continue + + raise ComsesHTTPError( + f"Exhausted retries on stream download: {last_exc!r}" + if last_exc + else "Exhausted retries on stream download" + ) + + +def _json_or_raise(resp: httpx.Response, op: str) -> dict: + if resp.status_code >= 400: + raise ComsesHTTPError(f"{op}: HTTP {resp.status_code}", status=resp.status_code) + ct = resp.headers.get("Content-Type", "") + if "json" not in ct.lower(): + raise ComsesHTTPError(f"{op}: expected JSON, got Content-Type={ct!r}") + try: + data: dict = resp.json() + except json.JSONDecodeError as exc: + raise ComsesHTTPError(f"{op}: response was not valid JSON: {exc}") from exc + return data + + +# ── Zip safety + atomic extract ─────────────────────────────────────────────── + +COMPLETION_MARKER = ".comses_complete" + + +def is_cache_trusted(cache_dir: Path) -> bool: + """A cache directory is trusted only if its `.comses_complete` marker is present.""" + return cache_dir.is_dir() and (cache_dir / COMPLETION_MARKER).is_file() + + +def _member_is_safe(target_root: Path, member: str) -> bool: + """Return True iff this zip member is safe to extract under `target_root`. + + Rejects traversal (`..`), absolute paths, Windows drive letters. + """ + # Normalize separators. Zips use forward slash by spec, but be defensive. + name = member.replace("\\", "/").strip() + if not name: + return False + # Absolute / rooted paths, Windows drives, explicit ".." segments. + if name.startswith("/") or name.startswith("\\"): + return False + if len(name) >= 2 and name[1] == ":": + return False + parts = PurePosixPath(name).parts + if any(p == ".." for p in parts): + return False + + candidate = (target_root / name).resolve() + root_resolved = target_root.resolve() + try: + candidate.relative_to(root_resolved) + except ValueError: + return False + return True + + +def validate_zip_members(zf: zipfile.ZipFile, target_root: Path) -> None: + """Raise `ComsesSafetyError` if any member would escape `target_root`.""" + for info in zf.infolist(): + if not _member_is_safe(target_root, info.filename): + raise ComsesSafetyError(f"Unsafe zip member rejected: {info.filename!r}") + + +def check_zip_bomb(zf: zipfile.ZipFile, max_bytes: int) -> int: + """Sum uncompressed sizes. Raise if total > 2 × max_bytes. + + The 2× margin is a cheap guard against high-ratio zip bombs where the + compressed bytes were under the download cap but the expanded tree is + massive. Returns the computed total for logging. + """ + total = sum(info.file_size for info in zf.infolist()) + if total > 2 * max_bytes: + raise ComsesSafetyError( + f"Archive would expand to {total} bytes, " + f"more than 2 × download cap ({max_bytes}). Refusing." + ) + return total + + +def safe_extract_zip( + archive_path: Path, + final_dir: Path, + *, + max_bytes: int, + tmp_root: Path | None = None, +) -> Path: + """Safely extract `archive_path` to `final_dir`. + + Pipeline: + 1. Open zip, validate every member, check zip-bomb threshold. + 2. Extract to a sibling temp directory. + 3. Atomically `shutil.move` temp → `final_dir`. + 4. Write `.comses_complete` marker. + + Race-safe: if `final_dir` appeared between steps 2 and 3 (another writer), + we check its marker — if marked we discard our temp and keep theirs; if + unmarked we assume orphan, wipe it, and retry the move once. + """ + tmp_root = tmp_root or final_dir.parent / ".tmp" + tmp_root.mkdir(parents=True, exist_ok=True) + + # Unique-ish tmp dir; pid keeps it obvious which process owned it. + tmp_dir = tmp_root / f"extract-{os.getpid()}-{final_dir.name}" + if tmp_dir.exists(): + shutil.rmtree(tmp_dir) + tmp_dir.mkdir(parents=True) + + try: + with zipfile.ZipFile(archive_path) as zf: + validate_zip_members(zf, tmp_dir) + check_zip_bomb(zf, max_bytes) + zf.extractall(tmp_dir) + + _move_or_reconcile(tmp_dir, final_dir) + + (final_dir / COMPLETION_MARKER).write_text("ok\n", encoding="utf-8") + return final_dir + except Exception: + # Leave tmp_dir for debugging? No — silent cleanup avoids littering cache. + if tmp_dir.exists(): + shutil.rmtree(tmp_dir, ignore_errors=True) + raise + finally: + # If tmp_dir still exists (e.g. move succeeded but rename consumed it), + # this is a no-op. + if tmp_dir.exists(): + shutil.rmtree(tmp_dir, ignore_errors=True) + + +def _move_or_reconcile(tmp_dir: Path, final_dir: Path) -> None: + """Move `tmp_dir` to `final_dir`, handling the race with a peer writer. + + See Section 4.6 cache trust table. Retries the move at most once. + """ + if not final_dir.exists(): + final_dir.parent.mkdir(parents=True, exist_ok=True) + shutil.move(str(tmp_dir), str(final_dir)) + return + + # Final dir already there. Check marker. + if (final_dir / COMPLETION_MARKER).is_file(): + # Peer writer finished; keep theirs, discard ours. + shutil.rmtree(tmp_dir, ignore_errors=True) + return + + # Unmarked orphan — wipe and retry once. + shutil.rmtree(final_dir, ignore_errors=True) + shutil.move(str(tmp_dir), str(final_dir)) + + +# ── Post-extract inspection ─────────────────────────────────────────────────── + + +# Language hints derived from codemeta.json or extension fallback. +_EXT_TO_LANGUAGE = { + ".nlogo": "NetLogo", + ".nlogox": "NetLogo", + ".py": "Python", + ".r": "R", + ".R": "R", + ".jl": "Julia", + ".java": "Java", + ".m": "MATLAB", + ".cs": "C#", + ".cpp": "C++", + ".c": "C", +} + + +def detect_language(extracted_dir: Path) -> str: + """Best-effort language detection from codemeta.json, then extension scan.""" + codemeta = extracted_dir / "codemeta.json" + if codemeta.is_file(): + try: + data = json.loads(codemeta.read_text(encoding="utf-8", errors="replace")) + except (OSError, json.JSONDecodeError): + data = None + if isinstance(data, dict): + lang = data.get("programmingLanguage") + # codemeta allows string or object or list + if isinstance(lang, str) and lang.strip(): + return lang.strip() + if isinstance(lang, dict): + name = lang.get("name") + if isinstance(name, str) and name.strip(): + return name.strip() + if isinstance(lang, list) and lang: + first = lang[0] + if isinstance(first, str): + return first + if isinstance(first, dict): + first_name = first.get("name") + if isinstance(first_name, str): + return first_name + + # Fallback: whichever recognized extension appears most under code/ or root. + counts: dict[str, int] = {} + for p in extracted_dir.rglob("*"): + if p.is_file(): + lang = _EXT_TO_LANGUAGE.get(p.suffix) + if lang: + counts[lang] = counts.get(lang, 0) + 1 + if not counts: + return "Unknown" + return max(counts.items(), key=lambda kv: kv[1])[0] + + +def find_netlogo_files(extracted_dir: Path) -> list[Path]: + """All `.nlogo` / `.nlogox` files in the extracted archive, sorted.""" + found = [ + p + for p in extracted_dir.rglob("*") + if p.is_file() and p.suffix in (".nlogo", ".nlogox") + ] + # Stable alphabetical order by relative path. + found.sort(key=lambda p: p.relative_to(extracted_dir).as_posix()) + return found + + +def select_netlogo_file(netlogo_files: list[Path], extracted_dir: Path) -> Path | None: + """Deterministic NetLogo file selection (Section 4.4 of COMSES_PLAN). + + 1. Exactly one candidate → use it. + 2. Prefer files under `code/`. + 3. Prefer `.nlogox` over `.nlogo`. + 4. Lex-largest relative path as a pure tie-breaker (not semver-smart). + """ + if not netlogo_files: + return None + if len(netlogo_files) == 1: + return netlogo_files[0] + + def rel_parts(p: Path) -> tuple[str, ...]: + return p.relative_to(extracted_dir).parts + + # Step 2: prefer under code/. + under_code = [ + p for p in netlogo_files if rel_parts(p) and rel_parts(p)[0] == "code" + ] + candidates = under_code if under_code else netlogo_files + + # Step 3: prefer .nlogox. + nlogox = [p for p in candidates if p.suffix == ".nlogox"] + if nlogox: + candidates = nlogox + + # Step 4: lex-largest relative path. + candidates.sort(key=lambda p: p.relative_to(extracted_dir).as_posix()) + return candidates[-1] + + +_ODD_NAME_PATTERNS = ("ODD", "odd", "documentation", "README", "readme") +_ODD_TEXT_EXTS = (".md", ".txt") +_ODD_BINARY_EXTS = (".pdf", ".docx", ".odt", ".doc") + + +def _find_odd_by_exts(extracted_dir: Path, exts: tuple[str, ...]) -> Path | None: + """Shared scanner for `find_odd_doc` / `find_odd_doc_binary`.""" + docs_dir = extracted_dir / "docs" + search_roots = [docs_dir, extracted_dir] if docs_dir.is_dir() else [extracted_dir] + for root in search_roots: + if not root.is_dir(): + continue + for p in sorted(root.iterdir(), key=lambda x: x.name): + if not p.is_file(): + continue + if p.suffix.lower() not in exts: + continue + name = p.name + for pat in _ODD_NAME_PATTERNS: + if name.startswith(pat): + return p + # Second pass: match anywhere in the filename (e.g. "FNNR ABM - ODD.pdf"). + for root in search_roots: + if not root.is_dir(): + continue + for p in sorted(root.iterdir(), key=lambda x: x.name): + if not p.is_file() or p.suffix.lower() not in exts: + continue + lower = p.name.lower() + if any(pat.lower() in lower for pat in _ODD_NAME_PATTERNS): + return p + return None + + +def find_odd_doc(extracted_dir: Path) -> Path | None: + """First matching text ODD / documentation file, None if none found.""" + return _find_odd_by_exts(extracted_dir, _ODD_TEXT_EXTS) + + +def find_odd_doc_binary(extracted_dir: Path) -> Path | None: + """First matching ODD / documentation file in a binary format (PDF/DOCX/etc.). + + Useful so the tool can tell the AI "an ODD exists but is a PDF — the + user will need to open it in a viewer." The file itself is NOT read + by read_comses_files (v1 scope limit). + """ + return _find_odd_by_exts(extracted_dir, _ODD_BINARY_EXTS) + + +@dataclass +class DownloadOutcome: + """Result of a high-level download_release call. + + Consumed by both `download_comses_model` and `open_comses_model` tools. + """ + + identifier: str + resolved_version: str + extracted_path: Path + cached: bool + language: str + netlogo_files: list[Path] + selected_netlogo_file: Path | None + code_files: list[Path] + odd_doc: Path | None + odd_doc_binary: Path | None + license_name: str | None + title: str | None + + +async def download_release( + client: ComsesClient, + identifier: str, + version: str, + *, + cache_root: Path, + max_bytes: int, +) -> DownloadOutcome: + """End-to-end: resolve, check cache, HEAD, stream, safely extract, inspect. + + Idempotent on cached state: if `cache_root/{uuid}/{concrete_version}/` + has `.comses_complete`, no network round-trips to /download/ happen and + `cached=True` is returned. + + Raises `ComsesError` (or a subclass) on any failure. On size/safety + failure, no cache directory is created. + """ + resolved = await client.resolve_latest(identifier, version) + + final_dir = cache_root / identifier / resolved + if is_cache_trusted(final_dir): + return _inspect_extracted( + identifier=identifier, + resolved_version=resolved, + extracted_path=final_dir, + cached=True, + title=None, + license_name=None, + ) + + # Fetch release + codebase metadata for title/license + the correct + # download URL. `submittedPackage` is NOT a reliable signal — real COMSES + # returns null for it on most releases even when /download/ serves a real + # zip. And the /download/ endpoint 404s on UUID paths; it requires the + # numeric codebase id from the release's `absoluteUrl`. Rely on the + # release response for both. + title: str | None = None + license_name: str | None = None + download_url: str | None = None + try: + rel = await client.get_release(identifier, resolved) + license_name = (rel.get("license") or {}).get("name") + abs_url = rel.get("absoluteUrl") + if isinstance(abs_url, str) and abs_url.startswith("/"): + download_url = f"{client._base_url}{abs_url.rstrip('/')}/download/" + except ComsesError: + pass + try: + cb = await client.get_codebase(identifier) + title = cb.get("title") + except ComsesError: + pass + + # HEAD screen: refuse obvious oversize before streaming. + try: + content_length, _ = await client.head_download( + identifier, resolved, url=download_url + ) + except ComsesHTTPError: + content_length = None # HEAD may not be supported; defer to stream cap. + if content_length is not None and content_length > max_bytes: + raise ComsesHTTPError( + f"Archive is {content_length} bytes, exceeds cap of {max_bytes}. " + "Increase max_mb or COMSES_MAX_DOWNLOAD_MB if you really want it." + ) + + # Stream to a unique temp file; then safe-extract. + tmp_root = cache_root / ".tmp" + tmp_root.mkdir(parents=True, exist_ok=True) + tmp_zip = tmp_root / f"{identifier}-{resolved}-{os.getpid()}.zip" + try: + await client.stream_download( + identifier, resolved, tmp_zip, max_bytes=max_bytes, url=download_url + ) + safe_extract_zip( + tmp_zip, + final_dir, + max_bytes=max_bytes, + tmp_root=tmp_root, + ) + finally: + try: + tmp_zip.unlink(missing_ok=True) + except OSError: + pass + + return _inspect_extracted( + identifier=identifier, + resolved_version=resolved, + extracted_path=final_dir, + cached=False, + title=title, + license_name=license_name, + ) + + +def _inspect_extracted( + *, + identifier: str, + resolved_version: str, + extracted_path: Path, + cached: bool, + title: str | None, + license_name: str | None, +) -> DownloadOutcome: + """Walk the extracted directory and build a DownloadOutcome.""" + language = detect_language(extracted_path) + netlogo = find_netlogo_files(extracted_path) + selected = select_netlogo_file(netlogo, extracted_path) + code = find_code_files(extracted_path) + odd = find_odd_doc(extracted_path) + odd_bin = find_odd_doc_binary(extracted_path) + return DownloadOutcome( + identifier=identifier, + resolved_version=resolved_version, + extracted_path=extracted_path, + cached=cached, + language=language, + netlogo_files=netlogo, + selected_netlogo_file=selected, + code_files=code, + odd_doc=odd, + odd_doc_binary=odd_bin, + license_name=license_name, + title=title, + ) + + +def find_code_files(extracted_dir: Path) -> list[Path]: + """Plausible source files (one level deep under code/, or same at root).""" + code_exts = (".nlogo", ".nlogox", ".py", ".r", ".R", ".jl", ".java") + code_dir = extracted_dir / "code" + root = code_dir if code_dir.is_dir() else extracted_dir + found = [p for p in root.rglob("*") if p.is_file() and p.suffix in code_exts] + found.sort(key=lambda p: p.relative_to(extracted_dir).as_posix()) + return found diff --git a/src/netlogo_mcp/config.py b/src/netlogo_mcp/config.py index 9d4b2d4..1e7e607 100644 --- a/src/netlogo_mcp/config.py +++ b/src/netlogo_mcp/config.py @@ -69,6 +69,26 @@ def get_gui_mode() -> bool: return val not in ("false", "0", "no") +def get_comses_max_download_mb() -> float: + """Max size of a COMSES archive download, in megabytes. + + Enforced at stream time (per-byte), not just via HEAD. Default 50 MB. + """ + val = os.environ.get("COMSES_MAX_DOWNLOAD_MB", "50") + try: + return max(1.0, float(val)) + except ValueError: + return 50.0 + + +def get_comses_cache_dir() -> Path: + """Directory for downloaded/extracted COMSES archives.""" + models_dir = get_models_dir() + p = models_dir / "comses" + p.mkdir(parents=True, exist_ok=True) + return p + + def get_exports_dir() -> Path: """Return the directory where exported images and worlds are saved.""" val = os.environ.get( diff --git a/src/netlogo_mcp/prompts.py b/src/netlogo_mcp/prompts.py index 3018362..65165f8 100644 --- a/src/netlogo_mcp/prompts.py +++ b/src/netlogo_mcp/prompts.py @@ -80,6 +80,73 @@ def create_abm( ] +@mcp.prompt() +def explore_comses(topic: str) -> list[Message]: + """Search CoMSES Net for a topic, pick the best NetLogo match, open it + safely, and run a short baseline simulation — without ever fabricating + NetLogo commands. + + Args: + topic: Free-text description of what you want to model + (e.g. "rumor spreading", "predator-prey", "urban traffic"). + """ + return [ + Message( + role="user", + content=( + f"Find an agent-based model matching '{topic}' on CoMSES Net " + "and run a short baseline. Follow these rules exactly.\n\n" + '1. Call `search_comses(query="' + topic + '")`. Pick a top ' + "match preferring peer-reviewed + NetLogo results when " + "available, but show me other strong candidates too if there " + "aren't enough NetLogo ones.\n" + "2. Call `get_comses_model(identifier=)` and " + "present: title, authors, license, description, and the " + "`citation_text` — researchers always need this.\n" + "3. Call `open_comses_model(identifier=)`. **Capture " + "`resolved_version` from the JSON response** and reuse it for " + "every subsequent `read_comses_files` call. Never pass " + '"latest" again in this flow — the version could change ' + "between calls and you'd inspect a different cache slot.\n" + '4. If the model is NetLogo (status = "loaded_netlogo"):\n' + " a. Call `read_comses_files(identifier=, " + 'version=, extensions=[".nlogo", ' + '".nlogox"])` — always include BOTH extensions, since the ' + "archive may have picked a .nlogox variant.\n" + " b. Scan the source for `to ` procedure names and " + "`to-report ` reporters. Do NOT assume `setup` / `go` " + "exist — read what's actually defined.\n" + " c. **Stop-and-ask fallback:** if no procedure resembles " + "setup/initialize/start, OR no candidate reporters exist, " + "stop after loading and ask me which procedure to run. Do " + "not force-run commands the model does not define.\n" + ' d. Otherwise: call `command("")`. If ' + "that call errors (model wants parameters, files, a " + "different invocation order, etc.), **stop and ask me** — " + "do NOT guess alternates. Otherwise call " + "`run_simulation(ticks=100, reporters=[])` then " + "`export_view`.\n" + " e. Call `read_comses_files(identifier=, " + 'version=, extensions=[".md", ".txt"], ' + "max_total_bytes=50000)` to read the ODD / README and " + "summarize what the model simulates.\n" + "5. If the model is NOT NetLogo (status = " + '"not_runnable_in_netlogo"):\n' + " a. Call `read_comses_files(identifier=, " + 'version=, extensions=[".md", ".txt"])` ' + "for the ODD doc.\n" + " b. State the language clearly, show the citation, " + "summarize the ODD findings, and stop. Do NOT auto-translate " + "to NetLogo. If I explicitly ask you to translate later, you " + "may attempt it with the source — but be honest about " + "simplifications.\n\n" + "Be concise. Show tool results you act on, skip raw JSON I " + "don't need." + ), + ) + ] + + @mcp.prompt() def parameter_sweep( parameter: str, diff --git a/src/netlogo_mcp/tools.py b/src/netlogo_mcp/tools.py index b09df63..65b351e 100644 --- a/src/netlogo_mcp/tools.py +++ b/src/netlogo_mcp/tools.py @@ -13,7 +13,13 @@ from fastmcp.exceptions import ToolError from fastmcp.utilities.types import Image -from .config import get_exports_dir, get_models_dir +from . import comses as _comses +from .config import ( + get_comses_cache_dir, + get_comses_max_download_mb, + get_exports_dir, + get_models_dir, +) from .server import mcp # ── Helpers ────────────────────────────────────────────────────────────────── @@ -448,3 +454,615 @@ async def export_world(ctx: Context) -> str: raise _wrap_netlogo_error(e) from e return f"World exported to {export_path}" + + +# ── CoMSES Net integration ────────────────────────────────────────────────── +# +# Five tools + one prompt that let an AI client browse, inspect, download, and +# open models from https://www.comses.net (the Network for Computational +# Modeling in Social and Ecological Sciences). See docs/COMSES_PLAN.md for the +# full spec. + + +def _compact_search_result(entry: dict) -> dict: + """Pull the fields an LLM actually needs out of a COMSES search result.""" + authors = [] + for c in entry.get("allContributors") or []: + user = (c or {}).get("user") or {} + name = ( + user.get("name") + or (c.get("givenName", "") + " " + c.get("familyName", "")).strip() + or c.get("name") + or "" + ) + if name: + authors.append(name) + + # Language: search results don't include releaseLanguages, so we fall back + # to a text heuristic over title + description + tags. This is surfaced as + # a hint only — the authoritative language lives on get_comses_model. + language = _language_from_releases(entry.get("releases") or []) + if not language: + language = _language_hint_from_text(entry) + + return { + "identifier": entry.get("identifier"), + "title": entry.get("title"), + "description": ( + entry.get("summarizedDescription") or entry.get("description") or "" + )[:500], + "authors": authors, + "latestVersion": entry.get("latestVersionNumber"), + "tags": [ + t.get("name") if isinstance(t, dict) else t for t in entry.get("tags") or [] + ], + "language": language, + "isPeerReviewed": entry.get("peerReviewed", False), + "downloads": entry.get("downloadCount", 0), + "doi": entry.get("doi"), + "live": entry.get("live"), + } + + +def _language_from_releases(releases: list) -> str | None: + """Pick a language name from a release detail, if the field is populated. + + Real COMSES search results do NOT include `releaseLanguages` on nested + releases — that data is only on `/releases/{version}/?format=json`. + This helper still handles the full shape for detail responses and + mocked tests; callers that only have search results should also try + `_language_hint_from_text`. + """ + if not releases: + return None + target = None + for rel in releases: + if (rel or {}).get("latestVersion"): + target = rel + break + if target is None: + target = releases[-1] + langs = (target or {}).get("releaseLanguages") or [] + for lang in langs: + pl = (lang or {}).get("programmingLanguage") or {} + name = pl.get("name") or (lang or {}).get("name") + if name: + return str(name) + tags = (target or {}).get("programmingLanguageTags") or [] + if tags: + first = tags[0] + return first.get("name") if isinstance(first, dict) else str(first) + return None + + +# Keyword → display name, scanned case-insensitively against title / +# description / tags. Keep multi-word and ambiguous keywords out of this +# list so we don't mis-tag ecology models as "R". +_LANGUAGE_TEXT_HINTS: tuple[tuple[str, str], ...] = ( + ("netlogo", "NetLogo"), + ("mesa", "Python"), + ("repast", "Repast"), + ("python", "Python"), + ("julia", "Julia"), + ("matlab", "MATLAB"), + ("gama platform", "GAMA"), + ("gama-platform", "GAMA"), +) + + +def _language_hint_from_text(entry: dict) -> str | None: + """Cheap heuristic: scan title + description + tags for a known language.""" + parts: list[str] = [ + str(entry.get("title") or ""), + str(entry.get("summarizedDescription") or ""), + str(entry.get("description") or ""), + ] + for t in entry.get("tags") or []: + parts.append(str(t.get("name") if isinstance(t, dict) else t)) + haystack = " ".join(parts).lower() + for needle, label in _LANGUAGE_TEXT_HINTS: + if needle in haystack: + return label + return None + + +@mcp.tool() +async def search_comses( + ctx: Context, + query: str = "", + page: int = 1, +) -> str: + """Search the CoMSES Net computational model library. + + Args: + query: Free-text search across title, description, authors, tags. + Leave empty to browse all models. + page: 1-indexed page number (10 results per page). + + Returns JSON with `count`, `page`, `numPages`, and a `results` list of + compact entries (`identifier`, `title`, `description`, `authors`, + `latestVersion`, `tags`, `language`, `isPeerReviewed`, `downloads`, `doi`, + `live`). + + The `language` field (inferred from the latest release) lets the AI decide + whether this is a NetLogo model it can load directly or a Python/R/etc. + model that needs a different runtime. Results are NOT filtered by + language — that's an AI/user decision. + """ + if page < 1: + raise ToolError("page must be >= 1") + try: + async with _comses.ComsesClient() as client: + raw = await client.search(query=query, page=page) + except _comses.ComsesError as e: + raise ToolError(f"COMSES search failed: {e}") from e + + compact = { + "count": raw.get("count", 0), + "page": raw.get("currentPage", page), + "numPages": raw.get("numPages", 1), + "numResults": raw.get("numResults", len(raw.get("results") or [])), + "results": [_compact_search_result(e) for e in (raw.get("results") or [])], + } + return json.dumps(compact, indent=2) + + +def _compact_codebase_detail(data: dict) -> dict: + """Shape the codebase detail endpoint into the AI-friendly payload.""" + authors: list[dict] = [] + for c in data.get("allContributors") or []: + user = (c or {}).get("user") or {} + name = ( + user.get("name") + or (c.get("givenName", "") + " " + c.get("familyName", "")).strip() + or "" + ) + authors.append( + { + "name": name, + "affiliation": user.get("institutionName") or c.get("affiliation"), + "orcid": user.get("orcid") or c.get("orcid"), + } + ) + + releases = [] + for rel in data.get("releases") or []: + releases.append( + { + "versionNumber": rel.get("versionNumber"), + "live": rel.get("live"), + "downloadable": bool(rel.get("submittedPackage")), + "firstPublishedAt": rel.get("firstPublishedAt"), + "lastPublishedOn": rel.get("lastPublishedOn"), + "language": _language_from_releases([rel]), + "license": (rel.get("license") or {}).get("name"), + "doi": rel.get("doi"), + } + ) + + return { + "identifier": data.get("identifier"), + "title": data.get("title"), + "description": data.get("description") or "", + "summarizedDescription": data.get("summarizedDescription") or "", + "authors": authors, + "tags": [ + t.get("name") if isinstance(t, dict) else t for t in data.get("tags") or [] + ], + "releases": releases, + "latestVersion": data.get("latestVersionNumber"), + "doi": data.get("doi"), + "repositoryUrl": data.get("repositoryUrl"), + "downloadCount": data.get("downloadCount", 0), + "peerReviewed": data.get("peerReviewed", False), + "citation_text": data.get("citationText") or "", + } + + +@mcp.tool() +async def get_comses_model(ctx: Context, identifier: str) -> str: + """Get detailed metadata for a specific CoMSES model. + + Args: + identifier: Full UUID from `search_comses` results. + + Returns JSON with title, description, all authors (name/affiliation/ORCID), + all releases (version, language, license, downloadable flag), tags, DOI, + repository URL, download counts, and a ready-to-use `citation_text` + researchers can paste into papers. + + The `citation_text` is pulled from the latest release; call this before + downloading to present the user with author/license/citation info. + """ + if not identifier: + raise ToolError("identifier is required") + try: + async with _comses.ComsesClient() as client: + data = await client.get_codebase(identifier) + # Citation lives on the release, not the codebase. Pull it from + # the latest release if there is one. + latest = data.get("latestVersionNumber") + if latest: + try: + rel = await client.get_release(identifier, str(latest)) + if rel.get("citationText"): + data["citationText"] = rel["citationText"] + # Annotate the matching release in-place so detail shape + # stays consistent without a second request. + for r in data.get("releases") or []: + if r.get("versionNumber") == latest: + r["submittedPackage"] = rel.get("submittedPackage") + r["live"] = rel.get("live", r.get("live")) + r["license"] = rel.get("license", r.get("license")) + r["doi"] = rel.get("doi", r.get("doi")) + except _comses.ComsesError: + # Non-fatal — fall back to what get_codebase returned. + pass + except _comses.ComsesError as e: + raise ToolError(f"COMSES get_comses_model failed: {e}") from e + + return json.dumps(_compact_codebase_detail(data), indent=2) + + +def _outcome_to_payload(outcome: _comses.DownloadOutcome) -> dict: + """Shape a DownloadOutcome into the JSON the MCP tool returns.""" + extracted = outcome.extracted_path + rel = lambda p: p.relative_to(extracted).as_posix() if p else None # noqa: E731 + return { + "identifier": outcome.identifier, + "resolved_version": outcome.resolved_version, + "extracted_path": str(extracted).replace("\\", "/"), + "cached": outcome.cached, + "language": outcome.language, + "title": outcome.title, + "license": outcome.license_name, + "all_netlogo_files": [rel(p) for p in outcome.netlogo_files], + "loaded_netlogo_file": rel(outcome.selected_netlogo_file), + "code_files": [rel(p) for p in outcome.code_files], + "odd_doc": rel(outcome.odd_doc), + # Non-text ODD (PDF/DOCX) — read_comses_files cannot read it, but + # the AI should surface the path so the user can open it manually. + "odd_doc_binary": rel(outcome.odd_doc_binary), + } + + +@mcp.tool() +async def download_comses_model( + ctx: Context, + identifier: str, + version: str = "latest", + max_mb: float = 0.0, +) -> str: + """Download and safely extract a COMSES model archive. + + Standalone "fetch but don't open" tool. Most AI flows should use + `open_comses_model` instead — it subsumes this tool and also loads + NetLogo models into the workspace. + + Safety guarantees: + - `version="latest"` is resolved to a concrete version BEFORE any cache + path is computed. Cache dirs are named by the resolved version. + - HEAD request screens oversize archives before streaming. + - Stream enforces the byte cap mid-download; overruns abort and delete + the partial file. + - Zip members are validated against path traversal before extraction. + - Uncompressed total is checked against 2 × cap to reject zip bombs. + - Extract happens in a temp directory; only a successful extract is + moved atomically into the cache. A `.comses_complete` marker is + written on success; future calls only trust cached dirs with the marker. + + Args: + identifier: Full model UUID (from `search_comses`). + version: Version string (e.g. "1.2.0") or "latest". + max_mb: Size cap in MB. Pass 0 or omit to use + the `COMSES_MAX_DOWNLOAD_MB` env var (default 50 MB). + + Returns JSON with: `identifier`, `resolved_version`, `extracted_path`, + `cached`, `language`, `title`, `license`, `all_netlogo_files`, + `loaded_netlogo_file` (deterministic pick per plan Section 4.4), + `code_files`, and `odd_doc`. + """ + if not identifier: + raise ToolError("identifier is required") + cap_mb = max_mb if max_mb and max_mb > 0 else get_comses_max_download_mb() + max_bytes = int(cap_mb * 1024 * 1024) + cache_root = get_comses_cache_dir() + try: + async with _comses.ComsesClient() as client: + outcome = await _comses.download_release( + client, + identifier, + version, + cache_root=cache_root, + max_bytes=max_bytes, + ) + except _comses.ComsesSafetyError as e: + raise ToolError(f"COMSES archive rejected for safety reasons: {e}") from e + except _comses.ComsesError as e: + raise ToolError(f"COMSES download failed: {e}") from e + + return json.dumps(_outcome_to_payload(outcome), indent=2) + + +@mcp.tool() +async def open_comses_model( + ctx: Context, + identifier: str, + version: str = "latest", + max_mb: float = 0.0, +) -> str: + """Download (or reuse cache), then open a COMSES model ready to use. + + This is the **single entry point** most AI flows should call. + + Behavior: + - Resolves `"latest"` to a concrete version BEFORE any cache path is + computed. The returned `resolved_version` is what every follow-up + `read_comses_files` call MUST pass — never re-pass `"latest"` in the + same flow, or you risk inspecting a different cache slot than the + model you just loaded. + - If the cache for `(identifier, resolved_version)` is already complete + (has `.comses_complete`), skips download. + - Otherwise, downloads + extracts safely (same logic as + `download_comses_model`). + - If the model is NetLogo, picks one `.nlogo` / `.nlogox` per Section + 4.4 rules (exactly one → use it; else prefer `code/`; else prefer + `.nlogox`; else lex-largest relative path — a deterministic + tie-breaker, NOT semver-aware). + - If NetLogo, loads it into the workspace. + - If not NetLogo, returns structured info for manual follow-up. + + Returns JSON with: + - `status`: "loaded_netlogo", "not_runnable_in_netlogo", or "no_netlogo_file". + - `resolved_version`: concrete version string (never "latest"). + - `identifier`, `title`, `language`, `license`, `cached`. + - `extracted_path`: absolute path to cached model directory. + - `all_netlogo_files`: list of every NetLogo file found. + - `loaded_netlogo_file`: the one selected (if any). + - `code_files`: source files by extension. + - `odd_doc`: ODD / README path, if any. + - `message`: short text for the AI to show the user. + + Args: + identifier: Full model UUID. + version: Version string or "latest". + max_mb: Max download size in MB. Pass 0 or omit to use the env default. + """ + if not identifier: + raise ToolError("identifier is required") + cap_mb = max_mb if max_mb and max_mb > 0 else get_comses_max_download_mb() + max_bytes = int(cap_mb * 1024 * 1024) + cache_root = get_comses_cache_dir() + try: + async with _comses.ComsesClient() as client: + outcome = await _comses.download_release( + client, + identifier, + version, + cache_root=cache_root, + max_bytes=max_bytes, + ) + except _comses.ComsesSafetyError as e: + raise ToolError(f"COMSES archive rejected for safety reasons: {e}") from e + except _comses.ComsesError as e: + raise ToolError(f"COMSES open failed: {e}") from e + + payload = _outcome_to_payload(outcome) + language = outcome.language or "" + is_netlogo = language.lower() == "netlogo" or bool(outcome.selected_netlogo_file) + + if not is_netlogo: + payload["status"] = "not_runnable_in_netlogo" + payload["message"] = ( + f"This model is in {language or 'a non-NetLogo language'}, " + "not NetLogo. The source is saved locally at extracted_path. " + "Use read_comses_files to inspect it. Translating it to NetLogo " + "is possible but not automatic — ask explicitly if that's what " + "you want." + ) + return json.dumps(payload, indent=2) + + if outcome.selected_netlogo_file is None: + payload["status"] = "no_netlogo_file" + payload["message"] = ( + "Language looks like NetLogo but no .nlogo / .nlogox file was " + "found in the archive. Use read_comses_files to investigate." + ) + return json.dumps(payload, indent=2) + + # Load into the NetLogo workspace using the forward-slash path form that + # pynetlogo expects on Windows. + nl = _nl(ctx) + path_str = str(outcome.selected_netlogo_file.resolve()).replace("\\", "/") + try: + nl.load_model(path_str) + except Exception as e: + raise _wrap_netlogo_error(e) from e + + payload["status"] = "loaded_netlogo" + payload["message"] = ( + f"Loaded NetLogo model: {outcome.selected_netlogo_file.name} " + f"({'cached' if outcome.cached else 'downloaded'}). " + f"Pin resolved_version={outcome.resolved_version!r} for any " + "follow-up read_comses_files calls." + ) + return json.dumps(payload, indent=2) + + +# ── read_comses_files ──────────────────────────────────────────────────────── + + +_READ_DEFAULT_EXTS = ( + ".nlogo", + ".nlogox", + ".py", + ".r", + ".R", + ".java", + ".jl", + ".md", + ".txt", +) + + +def _priority_rank(rel_path: str) -> int: + """Plan Section 4.5 priority ordering. Lower = read earlier.""" + name = rel_path.lower() + # 1) ODD docs under docs/ or root README + if name.startswith("docs/odd") or "/odd" in name or name.startswith("odd"): + return 0 + if name.startswith("docs/documentation") or name.startswith("documentation"): + return 0 + if name.startswith("docs/readme") or name.startswith("readme"): + return 1 + # 2) NetLogo source + if name.endswith(".nlogo") or name.endswith(".nlogox"): + return 2 + # 3) Other code by extension + if name.endswith((".py", ".r", ".java", ".jl")): + return 3 + # 4) Other .md / .txt outside docs/ + if name.endswith((".md", ".txt")) and not name.startswith("docs/"): + return 4 + return 5 + + +@mcp.tool() +async def read_comses_files( + ctx: Context, + identifier: str, + version: str = "latest", + extensions: list[str] | None = None, + max_total_bytes: int = 50_000, +) -> str: + """Return text contents of source and documentation files from a + downloaded COMSES model. + + The model MUST already be downloaded by `open_comses_model` or + `download_comses_model`. If the cache is absent, this tool returns an + error telling the AI to call one of those first. + + The AI should pass the `resolved_version` it captured from + `open_comses_model` — not the literal string `"latest"` — or it risks + inspecting a different cache slot than the model it just loaded. + When `version="latest"` is passed, this tool calls the COMSES API to + resolve it (so it works standalone) and surfaces the concrete version + in the `resolved_version` field of the response. + + Behavior: + - Files are UTF-8 decoded with `errors="replace"` so binary junk never + aborts the call. Every file that matches `extensions` is returned as + a string (may contain replacement characters for non-text bytes). + - Files are included in priority order: ODD docs → NetLogo source → + other code → other .md/.txt → everything else matching extensions. + - Total body is capped at `max_total_bytes` (default 50 KB — sized to + fit in a single conversational-LLM tool response). When the cap is + hit mid-file, that file is truncated at a line boundary; subsequent + files are listed in `omitted_files` with reason `byte_cap_reached`. + For larger pulls, pass a higher value explicitly. + - Files matching no `extensions` filter are listed in `omitted_files` + with reason `extension_not_in_filter`. + + Args: + identifier: Full model UUID. + version: Concrete version (preferred) or "latest". Always surfaced + back in `resolved_version`. + extensions: List of file suffixes (with dot) to include. Defaults + to NetLogo + common ABM languages + .md + .txt. + max_total_bytes: Cap on total returned content. + + Returns JSON with: + - `resolved_version` + - `files`: {relpath: {content, full_size, returned_size, truncated}} + - `omitted_files`: [relpath, ...] + - `omitted_reason_by_file`: {relpath: reason} + - `total_returned_bytes` + - `any_truncated` + """ + if not identifier: + raise ToolError("identifier is required") + exts = tuple(extensions or _READ_DEFAULT_EXTS) + cache_root = get_comses_cache_dir() + + # Resolve version (may hit network only if "latest" was passed). + try: + async with _comses.ComsesClient() as client: + resolved = await client.resolve_latest(identifier, version) + except _comses.ComsesError as e: + raise ToolError(f"Could not resolve version {version!r}: {e}") from e + + cache_dir = cache_root / identifier / resolved + if not _comses.is_cache_trusted(cache_dir): + raise ToolError( + f"Cache for {identifier} version {resolved} is missing or " + "incomplete. Call open_comses_model or download_comses_model first." + ) + + # Collect every candidate file with its priority + path. + all_files: list[Path] = [p for p in cache_dir.rglob("*") if p.is_file()] + # Never return the completion marker. + all_files = [p for p in all_files if p.name != _comses.COMPLETION_MARKER] + + selected: list[tuple[int, str, Path]] = [] + omitted: dict[str, str] = {} + for p in all_files: + rel = p.relative_to(cache_dir).as_posix() + if p.suffix not in exts: + omitted[rel] = "extension_not_in_filter" + continue + selected.append((_priority_rank(rel), rel, p)) + selected.sort(key=lambda tup: (tup[0], tup[1])) + + files_out: dict[str, dict[str, object]] = {} + remaining = max_total_bytes + any_truncated = False + total_returned = 0 + + for _, rel, path in selected: + raw = path.read_bytes() + full_size = len(raw) + text = raw.decode("utf-8", errors="replace") + + if remaining <= 0: + omitted[rel] = "byte_cap_reached" + continue + + if full_size <= remaining: + files_out[rel] = { + "content": text, + "full_size": full_size, + "returned_size": full_size, + "truncated": False, + } + remaining -= full_size + total_returned += full_size + continue + + # Truncate to a line boundary within `remaining` bytes. + cut_bytes = raw[:remaining] + cut_text = cut_bytes.decode("utf-8", errors="replace") + last_nl = cut_text.rfind("\n") + if last_nl >= 0: + cut_text = cut_text[: last_nl + 1] + returned_size = len(cut_text.encode("utf-8", errors="replace")) + files_out[rel] = { + "content": cut_text, + "full_size": full_size, + "returned_size": returned_size, + "truncated": True, + } + any_truncated = True + total_returned += returned_size + remaining = 0 + + return json.dumps( + { + "resolved_version": resolved, + "files": files_out, + "omitted_files": sorted(omitted.keys()), + "omitted_reason_by_file": omitted, + "total_returned_bytes": total_returned, + "any_truncated": any_truncated, + }, + indent=2, + ) diff --git a/tests/fixtures/comses/codebase_detail.json b/tests/fixtures/comses/codebase_detail.json new file mode 100644 index 0000000..50d87e8 --- /dev/null +++ b/tests/fixtures/comses/codebase_detail.json @@ -0,0 +1,43 @@ +{ + "identifier": "aaaaaaaa-1111-4aaa-8aaa-111111111111", + "title": "Wolf Sheep Predation", + "description": "Classic predator-prey model adapted from the NetLogo models library. Wolves hunt sheep, sheep eat grass, grass regrows after a delay.", + "summarizedDescription": "Predators eat prey; prey eat grass; grass regrows.", + "allContributors": [ + { + "user": {"name": "Jane Author", "institutionName": "ASU", "orcid": "0000-0001-0000-0000"}, + "givenName": "Jane", + "familyName": "Author" + } + ], + "tags": [{"name": "predator-prey"}, {"name": "NetLogo"}], + "releases": [ + { + "versionNumber": "1.1.0", + "live": true, + "latestVersion": false, + "releaseLanguages": [ + {"programmingLanguage": {"id": 19, "name": "NetLogo"}} + ], + "license": {"name": "MIT", "url": "https://spdx.org/licenses/MIT"}, + "doi": null + }, + { + "versionNumber": "1.2.0", + "live": true, + "latestVersion": true, + "releaseLanguages": [ + {"programmingLanguage": {"id": 19, "name": "NetLogo"}} + ], + "license": {"name": "MIT", "url": "https://spdx.org/licenses/MIT"}, + "doi": "10.25937/example-one" + } + ], + "latestVersionNumber": "1.2.0", + "firstPublishedAt": "2021-01-01", + "lastPublishedOn": "2023-06-15", + "downloadCount": 482, + "peerReviewed": true, + "doi": "10.25937/example-one", + "repositoryUrl": "https://github.com/example/wolf-sheep" +} diff --git a/tests/fixtures/comses/release_detail.json b/tests/fixtures/comses/release_detail.json new file mode 100644 index 0000000..5f94661 --- /dev/null +++ b/tests/fixtures/comses/release_detail.json @@ -0,0 +1,19 @@ +{ + "versionNumber": "1.2.0", + "live": true, + "submittedPackage": "path/to/package.zip", + "license": {"name": "MIT", "url": "https://spdx.org/licenses/MIT"}, + "platforms": [{"name": "NetLogo"}], + "releaseLanguages": [ + {"programmingLanguage": {"id": 19, "name": "NetLogo"}} + ], + "programmingLanguageTags": [{"name": "NetLogo"}], + "os": "platform_independent", + "osDisplay": "Platform Independent", + "dependencies": null, + "citationText": "Author, J. (2023). Wolf Sheep Predation (Version 1.2.0). CoMSES Computational Model Library. https://doi.org/10.25937/example-one", + "outputDataUrl": null, + "doi": "10.25937/example-one", + "embargoEndDate": null, + "reviewStatus": "reviewed" +} diff --git a/tests/fixtures/comses/search_result.json b/tests/fixtures/comses/search_result.json new file mode 100644 index 0000000..07f14f5 --- /dev/null +++ b/tests/fixtures/comses/search_result.json @@ -0,0 +1,70 @@ +{ + "isFirstPage": true, + "isLastPage": false, + "currentPage": 1, + "numResults": 2, + "count": 54, + "numPages": 6, + "query": {"query": "predator-prey"}, + "results": [ + { + "identifier": "aaaaaaaa-1111-4aaa-8aaa-111111111111", + "title": "Wolf Sheep Predation", + "description": "Classic predator-prey model adapted from the NetLogo models library.", + "summarizedDescription": "Predators eat prey; prey eat grass; grass regrows.", + "allContributors": [ + { + "user": {"name": "Jane Author", "institutionName": "ASU", "orcid": "0000-0001-0000-0000"}, + "givenName": "Jane", + "familyName": "Author" + } + ], + "tags": [{"name": "predator-prey"}, {"name": "NetLogo"}], + "releases": [ + { + "versionNumber": "1.2.0", + "absoluteUrl": "/codebases/aaaaaaaa-1111-4aaa-8aaa-111111111111/releases/1.2.0/", + "live": true, + "latestVersion": true, + "releaseLanguages": [ + {"programmingLanguage": {"id": 19, "name": "NetLogo"}} + ] + } + ], + "latestVersionNumber": "1.2.0", + "firstPublishedAt": "2021-01-01", + "lastPublishedOn": "2023-06-15", + "downloadCount": 482, + "peerReviewed": true, + "doi": "10.25937/example-one", + "live": true + }, + { + "identifier": "bbbbbbbb-2222-4bbb-8bbb-222222222222", + "title": "Python Mesa SIR", + "description": "A simple SIR epidemic model in Python using Mesa.", + "summarizedDescription": "Susceptible-Infected-Recovered dynamics over a small-world graph.", + "allContributors": [ + {"user": {"name": "Ali Researcher"}, "givenName": "Ali", "familyName": "Researcher"} + ], + "tags": [{"name": "epidemic"}, {"name": "python"}, {"name": "mesa"}], + "releases": [ + { + "versionNumber": "0.3.0", + "live": true, + "latestVersion": true, + "releaseLanguages": [ + {"programmingLanguage": {"id": 100, "name": "Python"}} + ] + } + ], + "latestVersionNumber": "0.3.0", + "firstPublishedAt": "2022-02-02", + "lastPublishedOn": "2022-02-02", + "downloadCount": 61, + "peerReviewed": false, + "doi": null, + "live": true + } + ] +} diff --git a/tests/test_comses.py b/tests/test_comses.py new file mode 100644 index 0000000..50a2833 --- /dev/null +++ b/tests/test_comses.py @@ -0,0 +1,1077 @@ +"""Tests for the CoMSES Net integration — mocked HTTP, no network, no JVM.""" + +from __future__ import annotations + +import io +import json +import zipfile +from pathlib import Path +from unittest.mock import MagicMock + +import httpx +import pytest + +from netlogo_mcp import comses + +FIXTURES = Path(__file__).parent / "fixtures" / "comses" + + +# ── Fixture loaders ─────────────────────────────────────────────────────────── + + +def _fx(name: str) -> dict: + return json.loads((FIXTURES / name).read_text(encoding="utf-8")) + + +def _make_client(handler) -> comses.ComsesClient: + """Build a ComsesClient whose underlying httpx uses the given handler.""" + transport = httpx.MockTransport(handler) + http = httpx.AsyncClient( + transport=transport, base_url=comses.BASE_URL, follow_redirects=True + ) + return comses.ComsesClient(client=http) + + +# ── API client: search + metadata ───────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_search_builds_correct_url_and_returns_json(): + seen: list[httpx.Request] = [] + + def handler(request: httpx.Request) -> httpx.Response: + seen.append(request) + assert request.url.path == "/codebases/" + assert request.url.params.get("format") == "json" + assert request.url.params.get("query") == "predator-prey" + assert request.url.params.get("page") == "2" + return httpx.Response(200, json=_fx("search_result.json")) + + async with _make_client(handler) as client: + data = await client.search(query="predator-prey", page=2) + + assert data["count"] == 54 + assert len(seen) == 1 + + +@pytest.mark.asyncio +async def test_search_without_query_omits_query_param(): + def handler(request: httpx.Request) -> httpx.Response: + assert "query" not in dict(request.url.params) + return httpx.Response(200, json=_fx("search_result.json")) + + async with _make_client(handler) as client: + await client.search() + + +@pytest.mark.asyncio +async def test_get_codebase_returns_title_and_releases(): + def handler(request: httpx.Request) -> httpx.Response: + assert request.url.path.endswith("/aaaaaaaa-1111-4aaa-8aaa-111111111111/") + return httpx.Response(200, json=_fx("codebase_detail.json")) + + async with _make_client(handler) as client: + data = await client.get_codebase("aaaaaaaa-1111-4aaa-8aaa-111111111111") + assert data["title"] == "Wolf Sheep Predation" + assert len(data["releases"]) == 2 + + +@pytest.mark.asyncio +async def test_get_release_returns_citation_text(): + def handler(request: httpx.Request) -> httpx.Response: + assert "releases/1.2.0" in request.url.path + return httpx.Response(200, json=_fx("release_detail.json")) + + async with _make_client(handler) as client: + rel = await client.get_release("aaaaaaaa-1111-4aaa-8aaa-111111111111", "1.2.0") + assert "Wolf Sheep Predation" in rel["citationText"] + + +@pytest.mark.asyncio +async def test_resolve_latest_uses_latest_version_number(): + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json=_fx("codebase_detail.json")) + + async with _make_client(handler) as client: + assert await client.resolve_latest("xxx", "latest") == "1.2.0" + + +@pytest.mark.asyncio +async def test_resolve_latest_passthrough_for_concrete_version(): + calls = 0 + + def handler(request: httpx.Request) -> httpx.Response: + nonlocal calls + calls += 1 + return httpx.Response(200, json=_fx("codebase_detail.json")) + + async with _make_client(handler) as client: + assert await client.resolve_latest("xxx", "1.0.0") == "1.0.0" + assert calls == 0, "Concrete version must not trigger a codebase fetch" + + +# ── Retry policy ────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_retries_on_503_then_succeeds(monkeypatch): + monkeypatch.setattr(comses.asyncio, "sleep", _fast_sleep) + state = {"calls": 0} + + def handler(request: httpx.Request) -> httpx.Response: + state["calls"] += 1 + if state["calls"] < 2: + return httpx.Response(503) + return httpx.Response(200, json={"count": 0, "results": []}) + + async with _make_client(handler) as client: + await client.search() + assert state["calls"] == 2 + + +@pytest.mark.asyncio +async def test_does_not_retry_on_4xx(): + state = {"calls": 0} + + def handler(request: httpx.Request) -> httpx.Response: + state["calls"] += 1 + return httpx.Response(404) + + async with _make_client(handler) as client: + with pytest.raises(comses.ComsesHTTPError): + await client.search() + assert state["calls"] == 1 + + +@pytest.mark.asyncio +async def test_gives_up_after_max_retries(monkeypatch): + monkeypatch.setattr(comses.asyncio, "sleep", _fast_sleep) + state = {"calls": 0} + + def handler(request: httpx.Request) -> httpx.Response: + state["calls"] += 1 + return httpx.Response(503) + + async with _make_client(handler) as client: + with pytest.raises(comses.ComsesHTTPError): + await client.search() + assert state["calls"] == comses.MAX_RETRIES + 1 + + +@pytest.mark.asyncio +async def test_retries_on_network_error(monkeypatch): + monkeypatch.setattr(comses.asyncio, "sleep", _fast_sleep) + state = {"calls": 0} + + def handler(request: httpx.Request) -> httpx.Response: + state["calls"] += 1 + if state["calls"] < 2: + raise httpx.ConnectError("boom", request=request) + return httpx.Response(200, json={"count": 0, "results": []}) + + async with _make_client(handler) as client: + await client.search() + assert state["calls"] == 2 + + +@pytest.mark.asyncio +async def test_non_json_content_type_is_error(): + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + content=b"interstitial", + headers={"Content-Type": "text/html"}, + ) + + async with _make_client(handler) as client: + with pytest.raises(comses.ComsesHTTPError): + await client.search() + + +# ── MCP tools: search_comses + get_comses_model ─────────────────────────────── + + +@pytest.mark.asyncio +async def test_search_comses_tool_returns_compact_json(monkeypatch): + from netlogo_mcp import tools + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json=_fx("search_result.json")) + + _patch_client_factory(monkeypatch, handler) + + ctx = MagicMock() + raw = await tools.search_comses(ctx, query="predator-prey", page=1) + data = json.loads(raw) + + assert data["count"] == 54 + assert data["page"] == 1 + assert len(data["results"]) == 2 + first = data["results"][0] + assert first["identifier"] == "aaaaaaaa-1111-4aaa-8aaa-111111111111" + assert first["language"] == "NetLogo" + assert first["isPeerReviewed"] is True + assert first["downloads"] == 482 + assert "Jane Author" in first["authors"] + + +@pytest.mark.asyncio +async def test_search_comses_tool_rejects_invalid_page(): + from fastmcp.exceptions import ToolError + + from netlogo_mcp import tools + + ctx = MagicMock() + with pytest.raises(ToolError): + await tools.search_comses(ctx, query="x", page=0) + + +@pytest.mark.asyncio +async def test_get_comses_model_tool_includes_citation(monkeypatch): + from netlogo_mcp import tools + + def handler(request: httpx.Request) -> httpx.Response: + path = request.url.path + if "/releases/1.2.0" in path: + return httpx.Response(200, json=_fx("release_detail.json")) + return httpx.Response(200, json=_fx("codebase_detail.json")) + + _patch_client_factory(monkeypatch, handler) + + ctx = MagicMock() + raw = await tools.get_comses_model( + ctx, identifier="aaaaaaaa-1111-4aaa-8aaa-111111111111" + ) + data = json.loads(raw) + + assert data["title"] == "Wolf Sheep Predation" + assert data["citation_text"].startswith("Author, J.") + assert data["peerReviewed"] is True + # Both releases should come through, with languages and licenses. + assert len(data["releases"]) == 2 + latest = [r for r in data["releases"] if r["versionNumber"] == "1.2.0"][0] + assert latest["language"] == "NetLogo" + assert latest["license"] == "MIT" + assert latest["downloadable"] is True + + +@pytest.mark.asyncio +async def test_get_comses_model_tool_survives_release_fetch_failure(monkeypatch): + """If the /releases// call fails, the codebase response still comes back.""" + from netlogo_mcp import tools + + def handler(request: httpx.Request) -> httpx.Response: + if "/releases/" in request.url.path: + return httpx.Response(500) + return httpx.Response(200, json=_fx("codebase_detail.json")) + + _patch_client_factory(monkeypatch, handler) + + ctx = MagicMock() + raw = await tools.get_comses_model( + ctx, identifier="aaaaaaaa-1111-4aaa-8aaa-111111111111" + ) + data = json.loads(raw) + # No citation (it lives on the release), but the codebase detail still surfaces. + assert data["title"] == "Wolf Sheep Predation" + assert data["citation_text"] == "" + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +async def _fast_sleep(_seconds: float) -> None: + """Replace asyncio.sleep so retry tests don't actually wait.""" + return None + + +def _patch_client_factory(monkeypatch, handler) -> None: + """Patch ComsesClient() in tools.py to use our MockTransport.""" + from netlogo_mcp import tools + + original = comses.ComsesClient + + def factory(*args, **kwargs): + transport = httpx.MockTransport(handler) + http = httpx.AsyncClient( + transport=transport, + base_url=comses.BASE_URL, + follow_redirects=True, + ) + return original(client=http) + + monkeypatch.setattr(tools._comses, "ComsesClient", factory) + + +# ── Zip safety + extraction ────────────────────────────────────────────────── + + +def _make_zip(entries: dict[str, bytes]) -> bytes: + """Build an in-memory zip with the given {path: bytes} entries.""" + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: + for name, data in entries.items(): + zf.writestr(name, data) + return buf.getvalue() + + +def test_rejects_zip_with_path_traversal(tmp_path): + """A zip with `../etc/passwd` must be rejected before extraction.""" + zip_bytes = _make_zip({"../etc/passwd": b"pwned"}) + archive = tmp_path / "evil.zip" + archive.write_bytes(zip_bytes) + + with pytest.raises(comses.ComsesSafetyError): + comses.safe_extract_zip(archive, tmp_path / "final", max_bytes=10_000_000) + assert not (tmp_path / "final").exists() + + +def test_rejects_zip_with_absolute_path_member(tmp_path): + # Drive letter path should be rejected too. + zip_bytes = _make_zip({"C:/windows/evil.txt": b"x"}) + archive = tmp_path / "evil.zip" + archive.write_bytes(zip_bytes) + with pytest.raises(comses.ComsesSafetyError): + comses.safe_extract_zip(archive, tmp_path / "final", max_bytes=10_000_000) + + +def test_rejects_zip_bomb_by_uncompressed_sum(tmp_path): + """Uncompressed total > 2 × cap triggers bomb rejection.""" + # One 500-byte file, cap 100 bytes → 500 > 200. + zip_bytes = _make_zip({"big.bin": b"A" * 500}) + archive = tmp_path / "bomb.zip" + archive.write_bytes(zip_bytes) + + with pytest.raises(comses.ComsesSafetyError): + comses.safe_extract_zip(archive, tmp_path / "final", max_bytes=100) + + +def test_safe_extract_happy_path_writes_marker(tmp_path): + zip_bytes = _make_zip( + { + "codemeta.json": b'{"programmingLanguage": "NetLogo"}', + "code/model.nlogo": b"to setup\nend\n", + "docs/ODD.md": b"# ODD\n\nSome doc.\n", + } + ) + archive = tmp_path / "good.zip" + archive.write_bytes(zip_bytes) + final = tmp_path / "cache" / "xxx" / "1.0.0" + + out = comses.safe_extract_zip(archive, final, max_bytes=10_000_000) + assert out == final + assert (final / comses.COMPLETION_MARKER).is_file() + assert (final / "code" / "model.nlogo").read_text().startswith("to setup") + assert comses.is_cache_trusted(final) + + +def test_cleans_up_temp_on_extract_failure(tmp_path): + zip_bytes = _make_zip({"../escape.txt": b"bad"}) + archive = tmp_path / "evil.zip" + archive.write_bytes(zip_bytes) + final = tmp_path / "final" + + with pytest.raises(comses.ComsesSafetyError): + comses.safe_extract_zip(archive, final, max_bytes=10_000_000) + + # No tmp-*-final directory should linger. + tmp_root = final.parent / ".tmp" + if tmp_root.exists(): + leftover = list(tmp_root.iterdir()) + assert not leftover, f"Leftover temp dirs: {leftover}" + + +def test_is_cache_trusted_requires_marker(tmp_path): + final = tmp_path / "dir" + final.mkdir() + assert not comses.is_cache_trusted(final) + (final / comses.COMPLETION_MARKER).write_text("ok") + assert comses.is_cache_trusted(final) + + +def test_race_orphan_without_marker_is_wiped_and_retried(tmp_path): + """If final_dir exists but has no marker, safe_extract wipes and retries.""" + zip_bytes = _make_zip({"code/ok.nlogo": b"to setup\nend\n"}) + archive = tmp_path / "good.zip" + archive.write_bytes(zip_bytes) + final = tmp_path / "cache" / "xxx" / "1.0.0" + final.mkdir(parents=True) + # Leave a stale unmarked file — simulating a prior failed/interrupted writer. + (final / "stale.txt").write_text("leftover") + + comses.safe_extract_zip(archive, final, max_bytes=10_000_000) + assert (final / comses.COMPLETION_MARKER).is_file() + assert not (final / "stale.txt").exists(), ( + "Orphan should have been wiped before retry" + ) + + +def test_race_peer_writer_with_marker_is_respected(tmp_path): + zip_bytes = _make_zip({"code/ok.nlogo": b"to setup\nend\n"}) + archive = tmp_path / "good.zip" + archive.write_bytes(zip_bytes) + final = tmp_path / "cache" / "xxx" / "1.0.0" + final.mkdir(parents=True) + (final / "peer.txt").write_text("theirs") + (final / comses.COMPLETION_MARKER).write_text("ok") + + comses.safe_extract_zip(archive, final, max_bytes=10_000_000) + # Peer's files preserved. + assert (final / "peer.txt").read_text() == "theirs" + assert (final / comses.COMPLETION_MARKER).is_file() + + +# ── Post-extract inspection ────────────────────────────────────────────────── + + +def test_detect_language_from_codemeta(tmp_path): + (tmp_path / "codemeta.json").write_text( + json.dumps({"programmingLanguage": {"name": "Python"}}) + ) + assert comses.detect_language(tmp_path) == "Python" + + +def test_detect_language_fallback_by_extension(tmp_path): + (tmp_path / "code").mkdir() + (tmp_path / "code" / "a.nlogo").write_text("x") + (tmp_path / "code" / "b.nlogo").write_text("x") + assert comses.detect_language(tmp_path) == "NetLogo" + + +def test_select_netlogo_file_prefers_code_dir_and_nlogox(tmp_path): + root = tmp_path + (root / "top.nlogo").write_text("x") + (root / "code").mkdir() + (root / "code" / "old.nlogo").write_text("x") + (root / "code" / "new.nlogox").write_text("x") + files = comses.find_netlogo_files(root) + selected = comses.select_netlogo_file(files, root) + assert selected is not None + assert selected.name == "new.nlogox" + + +def test_select_netlogo_file_lex_largest_tiebreaker(tmp_path): + root = tmp_path + (root / "code").mkdir() + # Two .nlogox files, lex ordering picks the "larger" one. + (root / "code" / "WolfSheep_2.0.nlogox").write_text("x") + (root / "code" / "WolfSheep_3.0.nlogox").write_text("x") + files = comses.find_netlogo_files(root) + selected = comses.select_netlogo_file(files, root) + assert selected is not None and selected.name == "WolfSheep_3.0.nlogox" + + +def test_select_netlogo_file_none_when_empty(tmp_path): + assert comses.select_netlogo_file([], tmp_path) is None + + +def test_find_odd_doc_prefers_odd_then_readme(tmp_path): + (tmp_path / "docs").mkdir() + (tmp_path / "docs" / "ODD.md").write_text("odd") + (tmp_path / "README.md").write_text("readme") + found = comses.find_odd_doc(tmp_path) + assert found is not None and found.name == "ODD.md" + + +def test_find_odd_doc_binary_picks_up_pdf_with_odd_in_name(tmp_path): + """Real-world case from FNNR-ABM smoke test — PDF ODD is the only doc.""" + (tmp_path / "docs").mkdir() + (tmp_path / "docs" / "FNNR ABM - ODD Protocol with References.pdf").write_bytes( + b"%PDF-1.4\n" + ) + # Text scanner finds nothing. + assert comses.find_odd_doc(tmp_path) is None + # Binary scanner finds the PDF. + found = comses.find_odd_doc_binary(tmp_path) + assert found is not None and found.suffix == ".pdf" + assert "ODD" in found.name + + +def test_find_odd_doc_binary_none_when_only_text_docs(tmp_path): + (tmp_path / "docs").mkdir() + (tmp_path / "docs" / "ODD.md").write_text("odd") + assert comses.find_odd_doc_binary(tmp_path) is None + + +# ── High-level download_release + MCP tool ─────────────────────────────────── + + +@pytest.mark.asyncio +async def test_download_release_uses_cache_when_marker_present(tmp_path): + """If the cache dir already has the marker, no HTTP call is made.""" + identifier = "abc" + version = "1.0.0" + cache_root = tmp_path / "cache" + final = cache_root / identifier / version + final.mkdir(parents=True) + (final / "code").mkdir() + (final / "code" / "model.nlogo").write_text("to setup\nend\n") + (final / comses.COMPLETION_MARKER).write_text("ok") + + def handler(request: httpx.Request) -> httpx.Response: + raise AssertionError( + f"No HTTP call expected when cache is warm; got {request.url}" + ) + + async with _make_client(handler) as client: + outcome = await comses.download_release( + client, identifier, version, cache_root=cache_root, max_bytes=10_000_000 + ) + + assert outcome.cached is True + assert outcome.resolved_version == version + assert outcome.extracted_path == final + assert outcome.selected_netlogo_file is not None + assert outcome.selected_netlogo_file.name == "model.nlogo" + + +@pytest.mark.asyncio +async def test_download_release_resolves_latest_and_extracts(tmp_path): + identifier = "aaaaaaaa-1111-4aaa-8aaa-111111111111" + cache_root = tmp_path / "cache" + archive_bytes = _make_zip( + { + "codemeta.json": b'{"programmingLanguage": "NetLogo"}', + "code/WolfSheep.nlogo": b"to setup\nend\nto-report population\n report 1\nend\n", + "docs/ODD.md": b"# ODD\n", + } + ) + + def handler(request: httpx.Request) -> httpx.Response: + p = request.url.path + if p.endswith(f"/codebases/{identifier}/") and request.method == "GET": + return httpx.Response(200, json=_fx("codebase_detail.json")) + if "/releases/1.2.0/download/" in p and request.method == "HEAD": + return httpx.Response( + 200, + headers={ + "Content-Length": str(len(archive_bytes)), + "Content-Type": "application/zip", + }, + ) + if "/releases/1.2.0/download/" in p and request.method == "GET": + return httpx.Response( + 200, + content=archive_bytes, + headers={"Content-Type": "application/zip"}, + ) + if "/releases/1.2.0/" in p: + return httpx.Response(200, json=_fx("release_detail.json")) + raise AssertionError(f"Unexpected request: {request.method} {p}") + + async with _make_client(handler) as client: + outcome = await comses.download_release( + client, + identifier, + "latest", + cache_root=cache_root, + max_bytes=10_000_000, + ) + + assert outcome.cached is False + assert outcome.resolved_version == "1.2.0" + assert "latest" not in str(outcome.extracted_path) + assert outcome.language == "NetLogo" + assert outcome.selected_netlogo_file is not None + assert outcome.selected_netlogo_file.name == "WolfSheep.nlogo" + assert outcome.odd_doc is not None and outcome.odd_doc.name == "ODD.md" + assert outcome.license_name == "MIT" + assert outcome.title == "Wolf Sheep Predation" + # Second call is a cache hit. + async with _make_client( + lambda r: httpx.Response(200, json=_fx("codebase_detail.json")) + ) as client: + again = await comses.download_release( + client, identifier, "1.2.0", cache_root=cache_root, max_bytes=10_000_000 + ) + assert again.cached is True + + +@pytest.mark.asyncio +async def test_download_release_refuses_oversize_via_head(tmp_path): + identifier = "abc" + + def handler(request: httpx.Request) -> httpx.Response: + p = request.url.path + if request.method == "HEAD": + return httpx.Response( + 200, + headers={ + "Content-Length": str(100_000_000), + "Content-Type": "application/zip", + }, + ) + if "/releases/" in p: + return httpx.Response(200, json=_fx("release_detail.json")) + return httpx.Response(200, json=_fx("codebase_detail.json")) + + async with _make_client(handler) as client: + with pytest.raises(comses.ComsesHTTPError, match="exceeds cap"): + await comses.download_release( + client, + identifier, + "1.2.0", + cache_root=tmp_path / "cache", + max_bytes=1_000_000, + ) + + +@pytest.mark.asyncio +async def test_download_release_refuses_non_zip_response(tmp_path): + """An HTML interstitial response must fail hard, not corrupt the cache.""" + identifier = "abc" + + def handler(request: httpx.Request) -> httpx.Response: + p = request.url.path + if request.method == "HEAD": + return httpx.Response(200, headers={"Content-Type": "application/zip"}) + if "/releases/" in p and request.method == "GET" and "/download/" in p: + return httpx.Response( + 200, + content=b"login form", + headers={"Content-Type": "text/html"}, + ) + if "/releases/" in p: + return httpx.Response(200, json=_fx("release_detail.json")) + return httpx.Response(200, json=_fx("codebase_detail.json")) + + async with _make_client(handler) as client: + with pytest.raises(comses.ComsesHTTPError, match="zip"): + await comses.download_release( + client, + identifier, + "1.2.0", + cache_root=tmp_path / "cache", + max_bytes=10_000_000, + ) + # No cache dir should exist. + assert not (tmp_path / "cache" / identifier / "1.2.0").exists() + + +@pytest.mark.asyncio +async def test_download_release_tolerates_null_submitted_package(tmp_path): + """Real COMSES returns submittedPackage: null even when /download/ works. + + The authoritative gate is HEAD + stream content-type, not this field. + """ + identifier = "abc" + empty_release = dict(_fx("release_detail.json")) + empty_release["submittedPackage"] = None + archive_bytes = _make_zip({"code/ok.nlogo": b"to setup\nend\n"}) + + def handler(request: httpx.Request) -> httpx.Response: + p = request.url.path + if request.method == "HEAD": + return httpx.Response(200, headers={"Content-Type": "application/zip"}) + if "/download/" in p: + return httpx.Response( + 200, + content=archive_bytes, + headers={"Content-Type": "application/zip"}, + ) + if "/releases/" in p: + return httpx.Response(200, json=empty_release) + return httpx.Response(200, json=_fx("codebase_detail.json")) + + async with _make_client(handler) as client: + outcome = await comses.download_release( + client, + identifier, + "1.2.0", + cache_root=tmp_path / "cache", + max_bytes=10_000_000, + ) + assert outcome.cached is False + assert outcome.selected_netlogo_file is not None + + +@pytest.mark.asyncio +async def test_download_comses_model_tool_returns_expected_shape(monkeypatch, tmp_path): + from netlogo_mcp import tools + + identifier = "aaaaaaaa-1111-4aaa-8aaa-111111111111" + archive_bytes = _make_zip( + { + "codemeta.json": b'{"programmingLanguage": "NetLogo"}', + "code/WolfSheep_3.0.nlogox": b"to setup\nend\n", + "code/WolfSheep_2.0.nlogo": b"to setup\nend\n", + "docs/ODD.md": b"# ODD", + } + ) + + def handler(request: httpx.Request) -> httpx.Response: + p = request.url.path + if request.method == "HEAD": + return httpx.Response(200, headers={"Content-Type": "application/zip"}) + if "/download/" in p: + return httpx.Response( + 200, content=archive_bytes, headers={"Content-Type": "application/zip"} + ) + if "/releases/" in p: + return httpx.Response(200, json=_fx("release_detail.json")) + return httpx.Response(200, json=_fx("codebase_detail.json")) + + _patch_client_factory(monkeypatch, handler) + monkeypatch.setattr(tools, "get_comses_cache_dir", lambda: tmp_path / "cache") + monkeypatch.setattr(tools, "get_comses_max_download_mb", lambda: 10.0) + + ctx = MagicMock() + raw = await tools.download_comses_model( + ctx, identifier=identifier, version="latest" + ) + data = json.loads(raw) + + assert data["resolved_version"] == "1.2.0" + assert data["language"] == "NetLogo" + assert data["cached"] is False + assert data["loaded_netlogo_file"] == "code/WolfSheep_3.0.nlogox" + assert sorted(data["all_netlogo_files"]) == [ + "code/WolfSheep_2.0.nlogo", + "code/WolfSheep_3.0.nlogox", + ] + assert data["odd_doc"] == "docs/ODD.md" + assert data["license"] == "MIT" + assert "latest" not in data["extracted_path"] + + +# ── open_comses_model tool ─────────────────────────────────────────────────── + + +def _prime_cache( + cache_root: Path, identifier: str, version: str, files: dict[str, bytes] +) -> Path: + """Write a fully-marked cache directory so read/open tools can skip download.""" + final = cache_root / identifier / version + final.mkdir(parents=True, exist_ok=True) + for rel, data in files.items(): + path = final / rel + path.parent.mkdir(parents=True, exist_ok=True) + path.write_bytes(data) + (final / comses.COMPLETION_MARKER).write_text("ok", encoding="utf-8") + return final + + +@pytest.mark.asyncio +async def test_open_comses_model_loads_netlogo_when_cached( + monkeypatch, tmp_path, mock_context, mock_nl +): + from netlogo_mcp import tools + + identifier = "abc" + version = "1.0.0" + cache_root = tmp_path / "cache" + _prime_cache( + cache_root, + identifier, + version, + { + "code/Wolf.nlogo": b"to setup\nend\nto go\nend\n", + "codemeta.json": b'{"programmingLanguage": "NetLogo"}', + "docs/ODD.md": b"# ODD", + }, + ) + + monkeypatch.setattr(tools, "get_comses_cache_dir", lambda: cache_root) + + # No HTTP should happen at all on a warm cache + concrete version. + def handler(request: httpx.Request) -> httpx.Response: + raise AssertionError(f"Unexpected request: {request.url}") + + _patch_client_factory(monkeypatch, handler) + + raw = await tools.open_comses_model( + mock_context, identifier=identifier, version=version + ) + data = json.loads(raw) + + assert data["status"] == "loaded_netlogo" + assert data["resolved_version"] == version + assert data["cached"] is True + assert data["loaded_netlogo_file"] == "code/Wolf.nlogo" + assert mock_nl._model_loaded is True + assert "Pin resolved_version" in data["message"] + + +@pytest.mark.asyncio +async def test_open_comses_model_non_netlogo_returns_structured_json( + monkeypatch, tmp_path, mock_context, mock_nl +): + from netlogo_mcp import tools + + identifier = "abc" + version = "1.0.0" + cache_root = tmp_path / "cache" + _prime_cache( + cache_root, + identifier, + version, + { + "code/model.py": b"# python\n", + "codemeta.json": b'{"programmingLanguage": {"name": "Python"}}', + # Simulate the FNNR-ABM real case: ODD is a PDF only. + "docs/FNNR ABM - ODD Protocol.pdf": b"%PDF-1.4\n", + }, + ) + + monkeypatch.setattr(tools, "get_comses_cache_dir", lambda: cache_root) + _patch_client_factory( + monkeypatch, lambda r: (_ for _ in ()).throw(AssertionError("no HTTP expected")) + ) + + raw = await tools.open_comses_model( + mock_context, identifier=identifier, version=version + ) + data = json.loads(raw) + # Binary ODD is surfaced so the AI can tell the user where to read it. + assert data["odd_doc"] is None + assert data["odd_doc_binary"] is not None + assert "ODD" in data["odd_doc_binary"] + assert data["odd_doc_binary"].endswith(".pdf") + + assert data["status"] == "not_runnable_in_netlogo" + assert data["language"] == "Python" + assert data["loaded_netlogo_file"] is None + assert mock_nl._model_loaded is False + assert "not automatic" in data["message"] + + +# ── read_comses_files tool ─────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_read_comses_files_priority_odd_first_then_nlogo( + monkeypatch, tmp_path, mock_context +): + from netlogo_mcp import tools + + identifier = "abc" + version = "1.0.0" + cache_root = tmp_path / "cache" + _prime_cache( + cache_root, + identifier, + version, + { + "docs/ODD.md": b"# ODD\nThis is the ODD doc.\n", + "code/Wolf.nlogo": b"to setup\nend\n", + "code/helper.py": b"# python helper\n", + }, + ) + monkeypatch.setattr(tools, "get_comses_cache_dir", lambda: cache_root) + + raw = await tools.read_comses_files( + mock_context, identifier=identifier, version=version + ) + data = json.loads(raw) + + assert data["resolved_version"] == version + keys = list(data["files"].keys()) + # ODD first, then .nlogo, then helper.py. + assert keys.index("docs/ODD.md") < keys.index("code/Wolf.nlogo") + assert keys.index("code/Wolf.nlogo") < keys.index("code/helper.py") + + +@pytest.mark.asyncio +async def test_read_comses_files_respects_byte_cap_and_truncates( + monkeypatch, tmp_path, mock_context +): + from netlogo_mcp import tools + + identifier = "abc" + version = "1.0.0" + cache_root = tmp_path / "cache" + # One ODD doc small, one big .nlogo that will be truncated. + big = ("line\n" * 1000).encode("utf-8") # 5000 bytes + _prime_cache( + cache_root, + identifier, + version, + {"docs/ODD.md": b"short\n", "code/big.nlogo": big}, + ) + monkeypatch.setattr(tools, "get_comses_cache_dir", lambda: cache_root) + + raw = await tools.read_comses_files( + mock_context, identifier=identifier, version=version, max_total_bytes=500 + ) + data = json.loads(raw) + + odd = data["files"]["docs/ODD.md"] + big_entry = data["files"]["code/big.nlogo"] + assert odd["truncated"] is False + assert big_entry["truncated"] is True + assert big_entry["returned_size"] < big_entry["full_size"] + assert data["any_truncated"] is True + # Truncation must land on a line boundary. + assert big_entry["content"].endswith("\n") + + +@pytest.mark.asyncio +async def test_read_comses_files_omits_files_with_reasons( + monkeypatch, tmp_path, mock_context +): + from netlogo_mcp import tools + + identifier = "abc" + version = "1.0.0" + cache_root = tmp_path / "cache" + _prime_cache( + cache_root, + identifier, + version, + { + "data/input.csv": b"col\n1\n", + "code/Wolf.nlogo": b"to setup\nend\n", + }, + ) + monkeypatch.setattr(tools, "get_comses_cache_dir", lambda: cache_root) + + raw = await tools.read_comses_files( + mock_context, identifier=identifier, version=version + ) + data = json.loads(raw) + assert "data/input.csv" in data["omitted_reason_by_file"] + assert data["omitted_reason_by_file"]["data/input.csv"] == "extension_not_in_filter" + + +@pytest.mark.asyncio +async def test_read_comses_files_errors_when_cache_missing( + monkeypatch, tmp_path, mock_context +): + from fastmcp.exceptions import ToolError + + from netlogo_mcp import tools + + monkeypatch.setattr(tools, "get_comses_cache_dir", lambda: tmp_path / "cache") + with pytest.raises(ToolError, match="missing or incomplete"): + await tools.read_comses_files( + mock_context, identifier="unknown", version="1.0.0" + ) + + +@pytest.mark.asyncio +async def test_read_comses_files_zero_match_returns_empty_files( + monkeypatch, tmp_path, mock_context +): + from netlogo_mcp import tools + + identifier = "abc" + version = "1.0.0" + cache_root = tmp_path / "cache" + _prime_cache(cache_root, identifier, version, {"data/input.csv": b"x"}) + monkeypatch.setattr(tools, "get_comses_cache_dir", lambda: cache_root) + + raw = await tools.read_comses_files( + mock_context, + identifier=identifier, + version=version, + extensions=[".nlogo"], + ) + data = json.loads(raw) + assert data["files"] == {} + assert data["total_returned_bytes"] == 0 + assert data["any_truncated"] is False + assert "data/input.csv" in data["omitted_files"] + + +@pytest.mark.asyncio +async def test_read_comses_files_resolves_latest_via_http( + monkeypatch, tmp_path, mock_context +): + from netlogo_mcp import tools + + identifier = "aaaaaaaa-1111-4aaa-8aaa-111111111111" + cache_root = tmp_path / "cache" + # Prime cache at resolved 1.2.0 (from fixture). + _prime_cache( + cache_root, + identifier, + "1.2.0", + {"docs/ODD.md": b"# ODD\n"}, + ) + monkeypatch.setattr(tools, "get_comses_cache_dir", lambda: cache_root) + + def handler(request: httpx.Request) -> httpx.Response: + # Only get_codebase (for latest resolution) should be called. + assert request.url.path.endswith(f"/codebases/{identifier}/") + return httpx.Response(200, json=_fx("codebase_detail.json")) + + _patch_client_factory(monkeypatch, handler) + + raw = await tools.read_comses_files( + mock_context, identifier=identifier, version="latest" + ) + data = json.loads(raw) + assert data["resolved_version"] == "1.2.0" + assert "docs/ODD.md" in data["files"] + + +# ── explore_comses prompt ──────────────────────────────────────────────────── + + +def test_explore_comses_prompt_has_required_rules(): + """The prompt must encode the rules that keep the flow researcher-safe.""" + from netlogo_mcp.prompts import explore_comses + + msgs = explore_comses("rumor spreading") + assert len(msgs) == 1 + body = msgs[0].content.text + + # Topic is interpolated. + assert "rumor spreading" in body + # Pin-the-version rule. + assert "resolved_version" in body + assert 'Never pass "latest" again' in body or 'Never pass "latest"' in body + # Both extensions when reading NetLogo source. + assert ".nlogo" in body and ".nlogox" in body + # Runtime-error stop rule. + assert ( + "do NOT guess alternates" in body or "do not guess alternates" in body.lower() + ) + # Stop-and-ask fallback and no auto-translation. + assert "Stop-and-ask" in body or "stop-and-ask" in body.lower() + assert "Do NOT auto-translate" in body or "not auto-translate" in body.lower() + + +# ── Language hint heuristic (real COMSES search results omit releaseLanguages) + + +def test_language_hint_from_title_and_description(): + from netlogo_mcp.tools import _language_hint_from_text + + assert _language_hint_from_text({"title": "Wolf Sheep Netlogo Model"}) == "NetLogo" + assert ( + _language_hint_from_text({"description": "Implemented in Python using Mesa."}) + == "Python" + ) + assert _language_hint_from_text({"tags": [{"name": "Repast"}]}) == "Repast" + assert _language_hint_from_text({"title": "ecology of wolves"}) is None + + +@pytest.mark.asyncio +async def test_search_comses_falls_back_to_heuristic_when_release_langs_absent( + monkeypatch, +): + """Real COMSES search results don't include releaseLanguages. + + The compact response must still get a language when the text mentions one. + """ + from netlogo_mcp import tools + + stripped = json.loads(json.dumps(_fx("search_result.json"))) + # Strip releaseLanguages to simulate real API. + for r in stripped["results"]: + for rel in r.get("releases") or []: + rel["releaseLanguages"] = [] + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json=stripped) + + _patch_client_factory(monkeypatch, handler) + + ctx = MagicMock() + raw = await tools.search_comses(ctx, query="x", page=1) + data = json.loads(raw) + # "Wolf Sheep Predation" + "NetLogo" tag → picked up via heuristic. + assert data["results"][0]["language"] == "NetLogo" + # Second result has "Python" in tags + description. + assert data["results"][1]["language"] == "Python"