diff --git a/README.md b/README.md index 3ca6457..8d06277 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ ## News +- [2026/03] Supports [Hermes Agent](https://github.com/NousResearch/hermes-agent) as a native context engine plugin — [guide](docs/guides/hermes.md) - [2026/03] Supports [OpenClaw](https://openclaw.ai) — [guide](docs/guides/openclaw.md) | [benchmark](docs/benchmarks/openclaw.md) - [2026/03] Supports cloud APIs (OpenAI, Anthropic, MiniMax) — [cache sync](docs/guides/cache_sync.md) - [2026/03] ContextPilot now can run on **macOS / Apple Silicon** via [llama.cpp](docs/guides/mac_llama_cpp.md). @@ -30,7 +31,7 @@ Long-context workloads (RAG, memory chat, tool-augmented agents) prepend many co ContextPilot sits between context assembly and inference to maximize prefix reuse and remove duplicates: 1. **Higher throughput & cache hits** — boosts prefill throughput and prefix cache hit ratio via context reuse. -2. **Drop-in solutions** — supports [OpenClaw](https://openclaw.ai) ([guide](docs/guides/openclaw.md)), [PageIndex](https://github.com/VectifyAI/PageIndex), [Mem0](https://github.com/mem0ai/mem0), [LMCache](https://github.com/LMCache/LMCache), [vLLM](https://github.com/vllm-project/vllm), [SGLang](https://github.com/sgl-project/sglang), [llama.cpp](docs/guides/mac_llama_cpp.md), and cloud APIs (OpenAI, Anthropic). +2. **Drop-in solutions** — supports [OpenClaw](https://openclaw.ai) ([guide](docs/guides/openclaw.md)), [Hermes Agent](https://github.com/NousResearch/hermes-agent) ([guide](docs/guides/hermes.md)), [PageIndex](https://github.com/VectifyAI/PageIndex), [Mem0](https://github.com/mem0ai/mem0), [LMCache](https://github.com/LMCache/LMCache), [vLLM](https://github.com/vllm-project/vllm), [SGLang](https://github.com/sgl-project/sglang), [llama.cpp](docs/guides/mac_llama_cpp.md), and cloud APIs (OpenAI, Anthropic). 3. **No compromise in reasoning quality** — can even improve with extremely long contexts. 4. **Widely tested** — validated across diverse RAG and agentic workloads. @@ -120,6 +121,19 @@ Then set OpenClaw's base URL to `http://localhost:8765/v1`. See the [full OpenCl --- +### Hermes Agent + +Native plugin — zero Hermes source changes, zero external dependencies: + +```bash +hermes plugins install EfficientContext/ContextPilot +hermes plugins # select: Context Engine → contextpilot +``` + +Typical savings: **40–50% input tokens** on agentic workloads with repeated file reads. See the [Hermes integration guide](docs/guides/hermes.md) for details. + +--- + ### vLLM / SGLang **From PyPI** — the vLLM and SGLang hooks are installed automatically: diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..44b4214 --- /dev/null +++ b/__init__.py @@ -0,0 +1,763 @@ +"""ContextPilot context engine plugin for Hermes Agent. + +Install: hermes plugins install /ContextPilot +Enable: hermes plugins → Context Engine → contextpilot +""" + +import copy +import hashlib +import importlib +import importlib.util as _ilu +import json +import logging +import os +import subprocess +import sys +from pathlib import Path +from typing import Any, Dict, List, Tuple + +logger = logging.getLogger("contextpilot.hermes_plugin") + +_REPO_ROOT = Path(__file__).resolve().parent +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +try: + from agent.context_engine import ContextEngine + + _HERMES_AVAILABLE = True +except ImportError: + ContextEngine = object + _HERMES_AVAILABLE = False + +def _load_submodule(name: str, file_path: Path): + """Load a .py file directly, bypassing contextpilot/__init__.py.""" + spec = _ilu.spec_from_file_location(name, str(file_path)) + if spec is None or spec.loader is None: + raise ImportError(f"Cannot load {file_path}") + mod = _ilu.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + +dedup_chat_completions = None +dedup_responses_api = None +DedupResult = None +get_format_handler = None +InterceptConfig = None +_CONTEXTPILOT_AVAILABLE = False +_CONTEXTPILOT_IMPORT_ERROR = None + +_has_reorder = None +_intercept_index = None +_hermes_sanitizer_patched = False +_bootstrap_attempted = False + + +def _import_contextpilot_submodules(): + global dedup_chat_completions + global dedup_responses_api + global DedupResult + global get_format_handler + global InterceptConfig + global _CONTEXTPILOT_AVAILABLE + global _CONTEXTPILOT_IMPORT_ERROR + + try: + _cp_root = _REPO_ROOT / "contextpilot" + _dedup_mod = _load_submodule( + "_contextpilot_block_dedup", _cp_root / "dedup" / "block_dedup.py" + ) + _parser_mod = _load_submodule( + "_contextpilot_intercept_parser", _cp_root / "server" / "intercept_parser.py" + ) + + dedup_chat_completions = _dedup_mod.dedup_chat_completions + dedup_responses_api = _dedup_mod.dedup_responses_api + DedupResult = _dedup_mod.DedupResult + get_format_handler = _parser_mod.get_format_handler + InterceptConfig = _parser_mod.InterceptConfig + _CONTEXTPILOT_AVAILABLE = True + _CONTEXTPILOT_IMPORT_ERROR = None + return True + except Exception as e: + dedup_chat_completions = None + dedup_responses_api = None + DedupResult = None + get_format_handler = None + InterceptConfig = None + _CONTEXTPILOT_AVAILABLE = False + _CONTEXTPILOT_IMPORT_ERROR = e + logger.debug("[ContextPilot] Could not import submodules: %s", e) + return False + + +def _bootstrap_contextpilot_install(): + global _bootstrap_attempted + if _bootstrap_attempted or os.environ.get("CONTEXTPILOT_PLUGIN_BOOTSTRAP") == "1": + return False + if not (_REPO_ROOT / "pyproject.toml").exists(): + return False + + _bootstrap_attempted = True + logger.info("[ContextPilot] Installing plugin package into Hermes environment") + env = os.environ.copy() + env["CONTEXTPILOT_PLUGIN_BOOTSTRAP"] = "1" + + def _run(cmd: List[str]): + return subprocess.run( + cmd, + capture_output=True, + text=True, + env=env, + timeout=300, + ) + + try: + result = _run([sys.executable, "-m", "pip", "install", "-e", str(_REPO_ROOT)]) + except Exception as e: + logger.warning("[ContextPilot] Self-install failed: %s", e) + return False + + if result.returncode != 0 and "No module named pip" in ((result.stderr or "") + (result.stdout or "")): + logger.info("[ContextPilot] pip missing in Hermes environment, bootstrapping with ensurepip") + try: + ensurepip_result = _run([sys.executable, "-m", "ensurepip", "--upgrade"]) + except Exception as e: + logger.warning("[ContextPilot] ensurepip failed: %s", e) + return False + if ensurepip_result.returncode != 0: + stderr = (ensurepip_result.stderr or "").strip() + stdout = (ensurepip_result.stdout or "").strip() + detail = stderr or stdout or f"exit code {ensurepip_result.returncode}" + logger.warning("[ContextPilot] ensurepip failed: %s", detail) + return False + result = _run([sys.executable, "-m", "pip", "install", "-e", str(_REPO_ROOT)]) + + if result.returncode != 0: + stderr = (result.stderr or "").strip() + stdout = (result.stdout or "").strip() + detail = stderr or stdout or f"exit code {result.returncode}" + logger.warning("[ContextPilot] Self-install failed: %s", detail) + return False + + importlib.invalidate_caches() + logger.info("[ContextPilot] Self-install completed") + return True + + +def _ensure_contextpilot_available(): + if _CONTEXTPILOT_AVAILABLE: + return True + if _import_contextpilot_submodules(): + return True + if _bootstrap_contextpilot_install() and _import_contextpilot_submodules(): + return True + return False + + +_import_contextpilot_submodules() + + +def _check_reorder(): + global _has_reorder + if _has_reorder is not None: + return _has_reorder + try: + from contextpilot.server.live_index import ContextPilot as _CP # noqa: F401 + + _has_reorder = True + except Exception as e: + if _bootstrap_contextpilot_install(): + try: + from contextpilot.server.live_index import ContextPilot as _CP # noqa: F401 + + _has_reorder = True + return _has_reorder + except Exception as retry_error: + e = retry_error + _has_reorder = False + logger.debug("[ContextPilot] Reorder unavailable, dedup-only mode: %s", e) + return _has_reorder + + +def _hash_text(text: str) -> str: + return hashlib.sha256(text.encode("utf-8", errors="replace")).hexdigest()[:16] + + +def _reorder_docs(docs: List[str], alpha: float = 0.001) -> List[str]: + global _intercept_index + if len(docs) < 2: + return docs + from contextpilot.server.live_index import ContextPilot as CP + + contexts = [docs] + if _intercept_index is None: + _intercept_index = CP(alpha=alpha, use_gpu=False, linkage_method="average") + _intercept_index.build_and_schedule(contexts=contexts) + return docs + result = _intercept_index.build_incremental(contexts=contexts) + reordered = result.get("reordered_contexts", [docs])[0] + doc_to_orig = {} + for i, doc in enumerate(docs): + doc_to_orig.setdefault(doc, []).append(i) + order = [] + used = set() + for doc in reordered: + for idx in doc_to_orig.get(doc, []): + if idx not in used: + order.append(idx) + used.add(idx) + break + return [docs[i] for i in order] + + +def _patch_hermes_sanitizer(): + global _hermes_sanitizer_patched + if _hermes_sanitizer_patched: + return + try: + import run_agent + except Exception as e: + logger.debug("[ContextPilot] Could not import run_agent for patching: %s", e) + return + + AIAgent = getattr(run_agent, "AIAgent", None) + if AIAgent is None: + return + + current = getattr(AIAgent, "_sanitize_api_messages", None) + if current is None: + return + if getattr(current, "_contextpilot_patched", False): + _hermes_sanitizer_patched = True + return + + original = current + + def _patched_sanitize_api_messages(self_or_messages, maybe_messages=None): + if maybe_messages is None: + agent = None + messages = self_or_messages + else: + agent = self_or_messages + messages = maybe_messages + + sanitized = original(messages) + if agent is None: + return sanitized + + engine = getattr(agent, "context_compressor", None) + optimize = getattr(engine, "optimize_api_messages", None) + if not callable(optimize): + return sanitized + + try: + optimized, _stats = optimize( + sanitized, + system_content=getattr(agent, "_cached_system_prompt", "") or "", + ) + except Exception as e: + logger.debug("[ContextPilot] Hermes sanitize hook failed: %s", e) + return sanitized + + return optimized if isinstance(optimized, list) else sanitized + + _patched_sanitize_api_messages._contextpilot_patched = True + _patched_sanitize_api_messages._contextpilot_original = original + AIAgent._sanitize_api_messages = _patched_sanitize_api_messages + _hermes_sanitizer_patched = True + logger.info("[ContextPilot] Installed Hermes API-message hook") + + +class ContextPilotEngine(ContextEngine): + @property + def name(self) -> str: + return "contextpilot" + + def __init__(self): + self._compressor = None + self._cached_messages: list = [] + self._cached_original_messages: list = [] + self._seen_doc_hashes: set = set() + self._single_doc_hashes: dict = {} + self._first_tool_result_done = False + self._system_processed = False + self._total_chars_saved = 0 + self._total_reordered = 0 + self._total_docs_deduped = 0 + self._optimize_count = 0 + self.threshold_percent = 0.75 + + @staticmethod + def is_available() -> bool: + return True + + def _ensure_compressor(self): + if self._compressor is not None: + return + from agent.context_compressor import ContextCompressor + + self._compressor = ContextCompressor( + model=getattr(self, "_model", ""), + base_url=getattr(self, "_base_url", ""), + api_key=getattr(self, "_api_key", ""), + provider=getattr(self, "_provider", ""), + config_context_length=getattr(self, "_config_context_length", None), + quiet_mode=True, + ) + self._sync_compressor_state() + + def _sync_compressor_state(self): + if self._compressor is None: + return + self.threshold_tokens = self._compressor.threshold_tokens + self.context_length = self._compressor.context_length + self.threshold_percent = self._compressor.threshold_percent + self.protect_first_n = self._compressor.protect_first_n + self.protect_last_n = self._compressor.protect_last_n + + def _activate_openai_hook(self): + """Patch OpenAI SDK calls that bypass Hermes' sanitizer path.""" + engine = self + + def _patch_chat_module(module): + for class_name, is_async in ( + ("Completions", False), + ("AsyncCompletions", True), + ): + cls = getattr(module, class_name, None) + if cls is None or getattr(cls, "_contextpilot_patched", False): + continue + original = cls.create + if is_async: + async def _patched(self, *args, _orig=original, **kwargs): + engine._intercept_chat_kwargs(kwargs) + return await _orig(self, *args, **kwargs) + else: + def _patched(self, *args, _orig=original, **kwargs): + engine._intercept_chat_kwargs(kwargs) + return _orig(self, *args, **kwargs) + + cls.create = _patched + cls._contextpilot_patched = True + logger.info("[ContextPilot] Patched OpenAI %s.create", class_name) + + def _patch_responses_module(module): + for class_name, is_async in ( + ("Responses", False), + ("AsyncResponses", True), + ): + cls = getattr(module, class_name, None) + if cls is None or getattr(cls, "_contextpilot_patched", False): + continue + original_create = getattr(cls, "create", None) + original_stream = getattr(cls, "stream", None) + if original_create is not None: + if is_async: + async def _patched_create(self, *args, _orig=original_create, **kwargs): + engine._intercept_responses_kwargs(kwargs) + return await _orig(self, *args, **kwargs) + else: + def _patched_create(self, *args, _orig=original_create, **kwargs): + engine._intercept_responses_kwargs(kwargs) + return _orig(self, *args, **kwargs) + cls.create = _patched_create + if original_stream is not None: + def _patched_stream(self, *args, _orig=original_stream, **kwargs): + engine._intercept_responses_kwargs(kwargs) + return _orig(self, *args, **kwargs) + cls.stream = _patched_stream + cls._contextpilot_patched = True + logger.info("[ContextPilot] Patched OpenAI %s", class_name) + + for module_name, patcher in ( + ("openai.resources.chat.completions", _patch_chat_module), + ("openai.resources.responses.responses", _patch_responses_module), + ("openai.resources.responses", _patch_responses_module), + ): + try: + module = importlib.import_module(module_name) + except Exception: + continue + patcher(module) + + def _matches_cached_optimized_payload(self, api_messages: List[Dict[str, Any]]) -> bool: + if len(api_messages) != len(self._cached_messages): + return False + for current, cached in zip(api_messages, self._cached_messages): + current_h = _hash_text(json.dumps(current, sort_keys=True, default=str)) + cached_h = _hash_text(json.dumps(cached, sort_keys=True, default=str)) + if current_h != cached_h: + return False + return bool(api_messages) + + def _intercept_chat_kwargs(self, kwargs: Dict[str, Any]) -> None: + messages = kwargs.get("messages") + if not messages or not isinstance(messages, list): + return + if self._matches_cached_optimized_payload(messages): + return + try: + optimized, _stats = self.optimize_api_messages(messages, system_content="") + except Exception as e: + logger.debug("[ContextPilot] OpenAI chat hook failed: %s", e) + return + if isinstance(optimized, list): + kwargs["messages"] = optimized + + def _intercept_responses_kwargs(self, kwargs: Dict[str, Any]) -> None: + items = kwargs.get("input") + if not items or not isinstance(items, list) or not callable(dedup_responses_api): + return + system_content = kwargs.get("instructions") + if not isinstance(system_content, str): + system_content = None + try: + result = dedup_responses_api({"input": items}, system_content=system_content) + except Exception as e: + logger.debug("[ContextPilot] OpenAI Responses hook failed: %s", e) + return + if getattr(result, "chars_saved", 0) > 0: + self._total_chars_saved += result.chars_saved + logger.info( + "[ContextPilot] Responses hook: %d chars saved, %d blocks deduped", + result.chars_saved, + result.blocks_deduped, + ) + + def update_from_response(self, usage: Dict[str, Any]) -> None: + self._ensure_compressor() + self._compressor.update_from_response(usage) + self.last_prompt_tokens = self._compressor.last_prompt_tokens + self.last_completion_tokens = self._compressor.last_completion_tokens + + def should_compress(self, prompt_tokens: int = None) -> bool: + self._ensure_compressor() + return self._compressor.should_compress(prompt_tokens) + + def compress( + self, messages: List[Dict[str, Any]], current_tokens: int = None, **kwargs + ) -> List[Dict[str, Any]]: + self._ensure_compressor() + result = self._compressor.compress( + messages, current_tokens=current_tokens, **kwargs + ) + self.compression_count = self._compressor.compression_count + return result + + def optimize_api_messages( + self, + api_messages: List[Dict[str, Any]], + *, + system_content: str = "", + ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: + self._optimize_count += 1 + if self._optimize_count == 1: + logger.info("[ContextPilot] Per-turn API optimizer active") + if self._matches_cached_optimized_payload(api_messages): + return api_messages, { + "chars_saved": 0, + "doc_chars_saved": 0, + "block_chars_saved": 0, + "blocks_deduped": 0, + "blocks_total": 0, + "docs_deduped": self._total_docs_deduped, + "system_blocks_matched": 0, + "cumulative_chars_saved": self._total_chars_saved, + } + has_reorder = _check_reorder() + turn_reordered = 0 + original_messages = copy.deepcopy(api_messages) + replayed_count = 0 + + # Step 1: Prefix replay + old_count = min(len(self._cached_original_messages), len(self._cached_messages)) + if old_count > 0 and len(api_messages) >= old_count: + prefix_ok = True + for i in range(old_count): + cached_h = _hash_text(json.dumps( + self._cached_original_messages[i], + sort_keys=True, + default=str, + )) + current_h = _hash_text( + json.dumps(api_messages[i], sort_keys=True, default=str) + ) + if cached_h != current_h: + prefix_ok = False + break + if prefix_ok: + api_messages[:old_count] = copy.deepcopy(self._cached_messages) + replayed_count = old_count + + # Step 2-4: Extract, reorder & dedup + # Extraction and dedup always run (pure Python, no numpy needed). + # Reordering only runs when has_reorder is True (requires numpy + live_index). + doc_chars_saved = 0 + if _CONTEXTPILOT_AVAILABLE: + try: + def _tool_chars(msgs): + return sum( + len(m.get("content", "") or "") + for m in msgs if isinstance(m, dict) and m.get("role") == "tool" + ) + + config = InterceptConfig( + enabled=True, + mode="auto", + tag="document", + separator="---", + alpha=0.001, + linkage_method="average", + scope="all", + ) + handler = get_format_handler("openai_chat") + body = {"messages": api_messages} + chars_before_extract = _tool_chars(body["messages"]) + multi = handler.extract_all(body, config) + + if has_reorder: + if multi.system_extraction and not self._system_processed: + extraction, sys_idx = multi.system_extraction + if len(extraction.documents) >= 2: + reordered = _reorder_docs(extraction.documents) + if reordered != extraction.documents: + handler.reconstruct_system( + body, extraction, reordered, sys_idx + ) + self._system_processed = True + + for extraction, location in multi.tool_extractions: + if location.msg_index < replayed_count: + continue + if len(extraction.documents) < 2: + continue + if not self._first_tool_result_done: + self._first_tool_result_done = True + if has_reorder: + reordered = _reorder_docs(extraction.documents) + else: + reordered = extraction.documents + for doc in extraction.documents: + self._seen_doc_hashes.add(_hash_text(doc)) + if has_reorder and reordered != extraction.documents: + handler.reconstruct_tool_result( + body, extraction, reordered, location + ) + turn_reordered += len(extraction.documents) + self._total_reordered += len(extraction.documents) + else: + new_docs = [] + deduped = 0 + for doc in extraction.documents: + h = _hash_text(doc) + if h in self._seen_doc_hashes: + deduped += 1 + else: + self._seen_doc_hashes.add(h) + new_docs.append(doc) + if deduped > 0: + self._total_docs_deduped += deduped + if not new_docs: + new_docs = [ + f"[All {deduped} documents identical to a previous tool result. " + f"Refer to the earlier result above.]" + ] + handler.reconstruct_tool_result( + body, extraction, new_docs, location + ) + + for single_doc, location in multi.single_doc_extractions: + if location.msg_index < replayed_count: + continue + if single_doc.content_hash in self._single_doc_hashes: + prev_id = self._single_doc_hashes[single_doc.content_hash] + if ( + single_doc.tool_call_id != prev_id + and handler.tool_call_present(body, prev_id) + ): + self._total_docs_deduped += 1 + handler.replace_single_doc( + body, + location, + ( + f"[Duplicate — identical to previous tool result ({prev_id}). " + f"Refer to the earlier result above.]" + ), + ) + else: + self._single_doc_hashes[single_doc.content_hash] = ( + single_doc.tool_call_id + ) + + api_messages = body["messages"] + doc_chars_saved = chars_before_extract - _tool_chars(api_messages) + except Exception as e: + logger.debug("[ContextPilot] Extract/reorder failed: %s", e) + + # Step 5: Block-level dedup + sys_content = None + for msg in api_messages: + if isinstance(msg, dict) and msg.get("role") == "system": + sc = msg.get("content", "") + if isinstance(sc, str): + sys_content = sc + break + + dedup_result: DedupResult = dedup_chat_completions( + {"messages": api_messages}, + system_content=sys_content, + ) + turn_chars_saved = doc_chars_saved + dedup_result.chars_saved + self._total_chars_saved += turn_chars_saved + + # Step 6: Cache for next turn + self._cached_messages = copy.deepcopy(api_messages) + self._cached_original_messages = original_messages + + if turn_chars_saved > 0: + logger.info( + "[ContextPilot] Turn %d: saved %d chars (~%d tokens) | cumulative: %d chars (~%d tokens)", + self._optimize_count, + turn_chars_saved, + turn_chars_saved // 4, + self._total_chars_saved, + self._total_chars_saved // 4, + ) + + return api_messages, { + "chars_saved": turn_chars_saved, + "doc_chars_saved": doc_chars_saved, + "block_chars_saved": dedup_result.chars_saved, + "blocks_deduped": dedup_result.blocks_deduped, + "blocks_total": dedup_result.blocks_total, + "docs_deduped": self._total_docs_deduped, + "system_blocks_matched": dedup_result.system_blocks_matched, + "cumulative_chars_saved": self._total_chars_saved, + } + + def on_context_compressed(self, old_count: int, new_count: int) -> None: + global _intercept_index + self._cached_messages.clear() + self._cached_original_messages.clear() + self._seen_doc_hashes.clear() + self._single_doc_hashes.clear() + self._first_tool_result_done = False + if _intercept_index is not None: + _intercept_index = None + + def on_session_start(self, session_id: str, **kwargs) -> None: + _patch_hermes_sanitizer() + self._model = kwargs.get("model", "") + self._base_url = "" + self._api_key = "" + self._provider = "" + self._config_context_length = kwargs.get("context_length", None) + self._ensure_compressor() + if self._compressor and hasattr(self._compressor, "on_session_start"): + self._compressor.on_session_start(session_id, **kwargs) + self._activate_openai_hook() + + def on_session_end(self, session_id: str, messages: List[Dict[str, Any]]) -> None: + if self._compressor and hasattr(self._compressor, "on_session_end"): + self._compressor.on_session_end(session_id, messages) + if self._total_chars_saved > 0: + logger.info( + "[ContextPilot] Session %s: %d turns, %d chars saved (~%d tokens)", + session_id, + self._optimize_count, + self._total_chars_saved, + self._total_chars_saved // 4, + ) + + def on_session_reset(self) -> None: + if hasattr(super(), "on_session_reset"): + super().on_session_reset() + if self._compressor: + self._compressor.on_session_reset() + self.on_context_compressed(0, 0) + self._total_chars_saved = 0 + self._total_reordered = 0 + self._total_docs_deduped = 0 + self._optimize_count = 0 + + def update_model( + self, + model: str, + context_length: int, + base_url: str = "", + api_key: str = "", + provider: str = "", + **kwargs, + ) -> None: + self._model = model + self._base_url = base_url + self._api_key = api_key + self._provider = provider + if self._compressor: + self._compressor.update_model( + model=model, + context_length=context_length, + base_url=base_url, + api_key=api_key, + provider=provider, + **kwargs, + ) + self._sync_compressor_state() + else: + self.context_length = context_length + self.threshold_tokens = int(context_length * self.threshold_percent) + + def get_tool_schemas(self) -> List[Dict[str, Any]]: + schemas = [] + if self._compressor: + schemas.extend(self._compressor.get_tool_schemas()) + return schemas + + def handle_tool_call(self, name: str, args: Dict[str, Any], **kwargs) -> str: + if self._compressor: + return self._compressor.handle_tool_call(name, args, **kwargs) + return json.dumps({"error": f"Unknown tool: {name}"}) + + def get_status(self) -> Dict[str, Any]: + status = super().get_status() + status["engine"] = "contextpilot" + status["contextpilot_chars_saved"] = self._total_chars_saved + status["contextpilot_docs_reordered"] = self._total_reordered + return status + + +def _auto_set_context_engine(): + """Set context.engine to 'contextpilot' on first install only.""" + try: + from hermes_cli.config import load_config, save_config + config = load_config() + ctx = config.get("context", {}) + current = ctx.get("engine", "compressor") + if current == "contextpilot": + return # Already set + if current != "compressor": + return # User chose a different engine — don't override + if ctx.get("_contextpilot_offered"): + return # Offered before, user switched back to compressor — respect that + config.setdefault("context", {})["engine"] = "contextpilot" + config["context"]["_contextpilot_offered"] = True + save_config(config) + logger.info("[ContextPilot] Auto-configured as active context engine") + except Exception as e: + logger.debug("[ContextPilot] Could not auto-set config: %s", e) + + +def register(ctx): + """Hermes plugin entry point — called by PluginManager.discover_and_load().""" + if not _HERMES_AVAILABLE: + return + if not _ensure_contextpilot_available(): + logger.warning( + "[ContextPilot] contextpilot package not importable after self-install: %s", + _CONTEXTPILOT_IMPORT_ERROR, + ) + return + _patch_hermes_sanitizer() + _auto_set_context_engine() + ctx.register_context_engine(ContextPilotEngine()) diff --git a/contextpilot/_openai_hook.py b/contextpilot/_openai_hook.py new file mode 100644 index 0000000..a64e684 --- /dev/null +++ b/contextpilot/_openai_hook.py @@ -0,0 +1,499 @@ +""" +OpenAI Client Monkey-Patch for ContextPilot. + +Patches the OpenAI Python SDK's chat.completions.create() with the full +ContextPilot intercept pipeline: + + 1. Restore cached prefix — replace old messages with previously modified + copies so KV cache prefix stays identical across turns + 2. Extract documents — from system prompt and tool results + 3. Reorder — documents for maximal prefix cache sharing + 4. Cross-turn dedup — single-doc tool results (repeated file reads) + 5. Block-level dedup — content-defined chunking within tool results + 6. Cache modified messages — for next turn's prefix replay + +Works with any OpenAI-SDK-based agent: Hermes, OpenHands, Aider, etc. + +Activation: + CONTEXTPILOT=1 hermes chat + CONTEXTPILOT=1 python my_agent.py + +Manual activation: + import contextpilot._openai_hook +""" + +import copy +import hashlib +import importlib +import importlib.abc +import importlib.util +import json +import logging +import os +import sys +from dataclasses import dataclass, field as dc_field +from typing import Any, Dict, List + +logger = logging.getLogger("contextpilot.openai_hook") + +_ENABLED = os.environ.get("CONTEXTPILOT", "").lower() in ( + "1", + "true", + "yes", +) or os.environ.get("CONTEXTPILOT_DEDUP", "").lower() in ("1", "true", "yes") + + +# --------------------------------------------------------------------------- +# Per-session conversation state (mirrors http_server._InterceptConvState) +# --------------------------------------------------------------------------- + + +@dataclass +class _ConvState: + cached_messages: list = dc_field(default_factory=list) + cached_original_messages: list = dc_field(default_factory=list) + first_tool_result_done: bool = False + seen_doc_hashes: set = dc_field(default_factory=set) + single_doc_hashes: dict = dc_field(default_factory=dict) + system_processed: bool = False + last_message_count: int = 0 + + +_sessions: Dict[str, _ConvState] = {} +_MAX_SESSIONS = 64 + +_total_chars_saved = 0 +_total_reordered = 0 +_total_calls = 0 + +# Lazy-initialized reorder index (needs numpy) +_intercept_index = None +_has_reorder = None # None = not checked yet + + +def _hash_text(text: str) -> str: + return hashlib.sha256(text.encode("utf-8", errors="replace")).hexdigest()[:16] + + +def _session_fingerprint(messages: list) -> str: + parts = [] + for msg in messages[:5]: + if not isinstance(msg, dict): + continue + role = msg.get("role", "") + if role == "system": + parts.append(str(msg.get("content", ""))[:500]) + elif role == "user": + content = msg.get("content", "") + if isinstance(content, list): + content = "".join( + p.get("text", "") for p in content if isinstance(p, dict) + ) + parts.append(str(content)[:500]) + break + if not parts: + return _hash_text(json.dumps(messages[:2], sort_keys=True, default=str)) + return _hash_text("\x00".join(parts)) + + +def _get_state(messages: list) -> _ConvState: + key = _session_fingerprint(messages) + msg_count = len(messages) + state = _sessions.get(key) + + if state is None: + state = _ConvState() + if len(_sessions) >= _MAX_SESSIONS: + oldest = next(iter(_sessions)) + del _sessions[oldest] + _sessions[key] = state + elif msg_count < state.last_message_count: + # Message count dropped → compaction happened, reset state + state = _ConvState() + _sessions[key] = state + + state.last_message_count = msg_count + return state + + +# --------------------------------------------------------------------------- +# Reorder support (graceful degradation if numpy unavailable) +# --------------------------------------------------------------------------- + + +def _check_reorder(): + global _has_reorder + if _has_reorder is not None: + return _has_reorder + try: + from contextpilot.server.live_index import ContextPilot as _CP # noqa: F401 + from contextpilot.server.intercept_parser import get_format_handler # noqa: F401 + + _has_reorder = True + except (ImportError, Exception): + _has_reorder = False + logger.info("[ContextPilot] Reorder unavailable (numpy?), dedup-only mode") + return _has_reorder + + +def _reorder_docs(docs: List[str], alpha: float = 0.001) -> List[str]: + global _intercept_index + if len(docs) < 2: + return docs + + from contextpilot.server.live_index import ContextPilot as CP + + contexts = [docs] + + if _intercept_index is None: + _intercept_index = CP(alpha=alpha, use_gpu=False, linkage_method="average") + _intercept_index.build_and_schedule(contexts=contexts) + return docs # First call → build only, no reorder + + result = _intercept_index.build_incremental(contexts=contexts) + reordered = result.get("reordered_contexts", [docs])[0] + + # Map back to original doc objects + doc_to_orig = {} + for i, doc in enumerate(docs): + doc_to_orig.setdefault(doc, []).append(i) + order = [] + used = set() + for doc in reordered: + for idx in doc_to_orig.get(doc, []): + if idx not in used: + order.append(idx) + used.add(idx) + break + + return [docs[i] for i in order] + + +# --------------------------------------------------------------------------- +# Full intercept pipeline +# --------------------------------------------------------------------------- + + +def _optimize_messages(kwargs): + """Full ContextPilot pipeline: prefix replay → extract → reorder → dedup.""" + global _total_chars_saved, _total_reordered, _total_calls + + messages = kwargs.get("messages") + if not messages or not isinstance(messages, list): + return + + _total_calls += 1 + state = _get_state(messages) + has_reorder = _check_reorder() + chars_saved = 0 + docs_reordered = 0 + original_messages = copy.deepcopy(messages) + replayed_count = 0 + + # ── Step 1: Prefix replay (replace old msgs with cached modified copies) ── + old_count = min(len(state.cached_original_messages), len(state.cached_messages)) + if old_count > 0 and len(messages) >= old_count: + prefix_ok = True + for i in range(old_count): + cached_h = _hash_text( + json.dumps( + state.cached_original_messages[i], + sort_keys=True, + default=str, + ) + ) + current_h = _hash_text(json.dumps(messages[i], sort_keys=True, default=str)) + if cached_h != current_h: + prefix_ok = False + break + if prefix_ok: + messages[:old_count] = copy.deepcopy(state.cached_messages) + kwargs["messages"] = messages + replayed_count = old_count + + # ── Step 2-4: Extract & reorder (if intercept_parser available) ── + if has_reorder: + try: + from contextpilot.server.intercept_parser import ( + get_format_handler, + InterceptConfig, + ) + + config = InterceptConfig( + enabled=True, + mode="auto", + tag="document", + separator="---", + alpha=0.001, + linkage_method="average", + scope="all", + ) + handler = get_format_handler("openai_chat") + body = {"messages": messages} + multi = handler.extract_all(body, config) + + # Reorder system docs (first turn only) + if multi.system_extraction and not state.system_processed: + extraction, sys_idx = multi.system_extraction + if len(extraction.documents) >= 2: + reordered = _reorder_docs(extraction.documents) + if reordered != extraction.documents: + handler.reconstruct_system(body, extraction, reordered, sys_idx) + docs_reordered += len(extraction.documents) + state.system_processed = True + + # Reorder/dedup tool results + for extraction, location in multi.tool_extractions: + if location.msg_index < replayed_count: + continue + if len(extraction.documents) < 2: + continue + + if not state.first_tool_result_done: + state.first_tool_result_done = True + reordered = _reorder_docs(extraction.documents) + for doc in extraction.documents: + state.seen_doc_hashes.add(_hash_text(doc)) + if reordered != extraction.documents: + handler.reconstruct_tool_result( + body, extraction, reordered, location + ) + docs_reordered += len(extraction.documents) + else: + # Dedup against previously seen docs + new_docs = [] + deduped = 0 + for doc in extraction.documents: + h = _hash_text(doc) + if h in state.seen_doc_hashes: + deduped += 1 + else: + state.seen_doc_hashes.add(h) + new_docs.append(doc) + if deduped > 0: + if not new_docs: + orig_chars = len(extraction.original_content) + new_docs = [ + f"[All {deduped} documents identical to a " + f"previous tool result ({orig_chars} chars). " + f"Refer to the earlier result above.]" + ] + handler.reconstruct_tool_result( + body, extraction, new_docs, location + ) + + # Cross-turn single-doc dedup + for single_doc, location in multi.single_doc_extractions: + if location.msg_index < replayed_count: + continue + if single_doc.content_hash in state.single_doc_hashes: + prev_id = state.single_doc_hashes[single_doc.content_hash] + if single_doc.tool_call_id != prev_id and handler.tool_call_present( + body, prev_id + ): + hint = ( + f"[Duplicate content — identical to a previous " + f"tool result ({prev_id}). " + f"Refer to the earlier result above.]" + ) + handler.replace_single_doc(body, location, hint) + else: + state.single_doc_hashes[single_doc.content_hash] = ( + single_doc.tool_call_id + ) + + # Sync messages back (handler mutates body["messages"] in-place) + kwargs["messages"] = body["messages"] + messages = kwargs["messages"] + + except Exception as e: + logger.debug("[ContextPilot] Extract/reorder failed: %s", e) + + # ── Step 5: Block-level dedup ── + from contextpilot.dedup import dedup_chat_completions + + system_content = None + for msg in messages: + if isinstance(msg, dict) and msg.get("role") == "system": + sc = msg.get("content", "") + if isinstance(sc, str): + system_content = sc + break + + dedup_result = dedup_chat_completions( + {"messages": messages}, + system_content=system_content, + ) + chars_saved += dedup_result.chars_saved + + # ── Step 6: Cache for next turn ── + state.cached_messages = copy.deepcopy(messages) + state.cached_original_messages = original_messages + + _total_chars_saved += chars_saved + _total_reordered += docs_reordered + + if chars_saved > 0 or docs_reordered > 0: + logger.info( + "[ContextPilot] Call #%d: %d chars saved, %d blocks deduped, " + "%d docs reordered (cumulative: %d chars ≈ %d tokens)", + _total_calls, + chars_saved, + dedup_result.blocks_deduped, + docs_reordered, + _total_chars_saved, + _total_chars_saved // 4, + ) + + +def _optimize_responses(kwargs): + """Apply block-level dedup to OpenAI Responses API input items.""" + global _total_chars_saved, _total_calls + + items = kwargs.get("input") + if not items or not isinstance(items, list): + return + + _total_calls += 1 + from contextpilot.dedup import dedup_responses_api + + system_content = kwargs.get("instructions") + if not isinstance(system_content, str): + system_content = None + + dedup_result = dedup_responses_api( + {"input": items}, + system_content=system_content, + ) + _total_chars_saved += dedup_result.chars_saved + if dedup_result.chars_saved > 0: + logger.info( + "[ContextPilot] Responses call #%d: %d chars saved, %d blocks deduped " + "(cumulative: %d chars ≈ %d tokens)", + _total_calls, + dedup_result.chars_saved, + dedup_result.blocks_deduped, + _total_chars_saved, + _total_chars_saved // 4, + ) + + +# --------------------------------------------------------------------------- +# OpenAI SDK monkey-patch (identical pattern to _sglang_hook / _vllm_hook) +# --------------------------------------------------------------------------- + + +def _apply_openai_patches(module): + Completions = getattr(module, "Completions", None) + AsyncCompletions = getattr(module, "AsyncCompletions", None) + Responses = getattr(module, "Responses", None) + AsyncResponses = getattr(module, "AsyncResponses", None) + + if Completions and not getattr(Completions, "_contextpilot_patched", False): + _orig_create = Completions.create + + def _patched_create(self, *args, **kwargs): + _optimize_messages(kwargs) + return _orig_create(self, *args, **kwargs) + + Completions.create = _patched_create + Completions._contextpilot_patched = True + logger.info("[ContextPilot] Patched openai Completions.create") + + if AsyncCompletions and not getattr( + AsyncCompletions, "_contextpilot_patched", False + ): + _orig_async_create = AsyncCompletions.create + + async def _patched_async_create(self, *args, **kwargs): + _optimize_messages(kwargs) + return await _orig_async_create(self, *args, **kwargs) + + AsyncCompletions.create = _patched_async_create + AsyncCompletions._contextpilot_patched = True + logger.info("[ContextPilot] Patched openai AsyncCompletions.create") + + if Responses and not getattr(Responses, "_contextpilot_patched", False): + _orig_responses_create = Responses.create + _orig_responses_stream = getattr(Responses, "stream", None) + + def _patched_responses_create(self, *args, **kwargs): + _optimize_responses(kwargs) + return _orig_responses_create(self, *args, **kwargs) + + Responses.create = _patched_responses_create + if _orig_responses_stream is not None: + def _patched_responses_stream(self, *args, **kwargs): + _optimize_responses(kwargs) + return _orig_responses_stream(self, *args, **kwargs) + + Responses.stream = _patched_responses_stream + Responses._contextpilot_patched = True + logger.info("[ContextPilot] Patched openai Responses") + + if AsyncResponses and not getattr(AsyncResponses, "_contextpilot_patched", False): + _orig_async_responses_create = AsyncResponses.create + _orig_async_responses_stream = getattr(AsyncResponses, "stream", None) + + async def _patched_async_responses_create(self, *args, **kwargs): + _optimize_responses(kwargs) + return await _orig_async_responses_create(self, *args, **kwargs) + + AsyncResponses.create = _patched_async_responses_create + if _orig_async_responses_stream is not None: + def _patched_async_responses_stream(self, *args, **kwargs): + _optimize_responses(kwargs) + return _orig_async_responses_stream(self, *args, **kwargs) + + AsyncResponses.stream = _patched_async_responses_stream + AsyncResponses._contextpilot_patched = True + logger.info("[ContextPilot] Patched openai AsyncResponses") + + +if _ENABLED: + + class _PatchingLoader(importlib.abc.Loader): + def __init__(self, original_loader): + self._original = original_loader + + def create_module(self, spec): + if hasattr(self._original, "create_module"): + return self._original.create_module(spec) + return None + + def exec_module(self, module): + self._original.exec_module(module) + _apply_openai_patches(module) + + class _OpenAIImportHook(importlib.abc.MetaPathFinder): + _targets = { + "openai.resources.chat.completions", + "openai.resources.responses.responses", + } + _done = set() + + def find_spec(self, fullname, path, target=None): + if fullname not in self._targets or fullname in self._done: + return None + self._done.add(fullname) + sys.meta_path.remove(self) + try: + real_spec = importlib.util.find_spec(fullname) + finally: + sys.meta_path.insert(0, self) + if real_spec is None: + return None + real_spec.loader = _PatchingLoader(real_spec.loader) + return real_spec + + sys.meta_path.insert(0, _OpenAIImportHook()) + logger.debug("[ContextPilot] OpenAI import hook registered (CONTEXTPILOT=1)") + + # Patch eagerly if openai was already imported before us + for _module_name in ( + "openai.resources.chat.completions", + "openai.resources.responses.responses", + ): + _already_loaded = sys.modules.get(_module_name) + if _already_loaded is not None: + _apply_openai_patches(_already_loaded) diff --git a/contextpilot/dedup/block_dedup.py b/contextpilot/dedup/block_dedup.py index 826966f..073ee37 100644 --- a/contextpilot/dedup/block_dedup.py +++ b/contextpilot/dedup/block_dedup.py @@ -6,13 +6,16 @@ logger = logging.getLogger(__name__) -MIN_BLOCK_CHARS = 80 -MIN_CONTENT_CHARS = 500 +MIN_BLOCK_CHARS = 40 +MIN_CONTENT_CHARS = 200 CHUNK_MODULUS = 13 -CHUNK_MIN_LINES = 5 +CHUNK_MIN_LINES = 3 CHUNK_MAX_LINES = 40 +# Matches line-number prefixes like " 1|", " 42|", " 100|" etc. +_LINE_NUM_PREFIX_RE = re.compile(r"^\s*\d+\|") + @dataclass class DedupResult: @@ -50,6 +53,11 @@ def _build_tool_name_map_responses(items: list) -> Dict[str, str]: return mapping +def _strip_line_prefix(line: str) -> str: + """Strip line-number prefixes (e.g. ' 1|') for normalization.""" + return _LINE_NUM_PREFIX_RE.sub("", line) + + def _content_defined_chunking( text: str, chunk_modulus: int = CHUNK_MODULUS ) -> List[str]: @@ -62,8 +70,12 @@ def _content_defined_chunking( for line in lines: current.append(line) + # Strip line-number prefixes before hashing for boundary detection + # so the same source line produces the same boundary decision + # regardless of its position in different files. + normalized_line = _strip_line_prefix(line).strip() line_hash = int.from_bytes( - hashlib.md5(line.strip().encode("utf-8", errors="replace")).digest()[:4], + hashlib.md5(normalized_line.encode("utf-8", errors="replace")).digest()[:4], "little", ) is_boundary = ( @@ -83,7 +95,13 @@ def _content_defined_chunking( def _hash_block(block: str) -> str: - normalized = block.strip() + """Hash a block for dedup comparison. + + Strips line-number prefixes before hashing so identical source code + at different line offsets produces the same hash. + """ + lines = block.strip().split("\n") + normalized = "\n".join(_strip_line_prefix(line) for line in lines) return hashlib.sha256(normalized.encode("utf-8", errors="replace")).hexdigest()[:20] @@ -177,6 +195,43 @@ def _prescan_system_blocks( return pre_seen +def _extract_text_for_dedup(content: str) -> Tuple[Optional[str], Optional[str]]: + """Extract the dedupable text from a tool result. + + Many tools return JSON like {"content": "...", "path": "..."}. + The content field is the actual multi-line text we want to dedup. + + Returns (text_to_dedup, json_key) or (None, None) if not JSON-wrapped. + """ + stripped = content.strip() + if not stripped.startswith("{"): + return None, None + try: + import json as _json + obj = _json.loads(stripped) + if not isinstance(obj, dict): + return None, None + # Look for the primary text field + for key in ("content", "output", "result", "text", "stdout"): + val = obj.get(key) + if isinstance(val, str) and len(val) >= MIN_CONTENT_CHARS: + return val, key + except (ValueError, TypeError): + pass + return None, None + + +def _rebuild_json_content(original: str, key: str, new_text: str) -> str: + """Replace the text field in a JSON tool result with deduped version.""" + import json as _json + try: + obj = _json.loads(original) + obj[key] = new_text + return _json.dumps(obj, ensure_ascii=False) + except (ValueError, TypeError): + return original + + def dedup_chat_completions( body: dict, min_block_chars: int = MIN_BLOCK_CHARS, @@ -204,8 +259,12 @@ def dedup_chat_completions( tc_id = msg.get("tool_call_id", "") fn_name = tool_names.get(tc_id, msg.get("name", "")) or "tool" + # Extract text from JSON-wrapped tool results for proper chunking + extracted_text, json_key = _extract_text_for_dedup(content) + dedup_target = extracted_text if extracted_text else content + new_content = _dedup_text( - content, + dedup_target, seen_blocks, idx, fn_name, @@ -215,9 +274,15 @@ def dedup_chat_completions( pre_seen=pre_seen, ) if new_content is not None: - original_len = len(content) - msg["content"] = new_content - new_len = len(new_content) + if json_key and extracted_text: + # Rebuild the JSON with shortened content field + original_len = len(content) + msg["content"] = _rebuild_json_content(content, json_key, new_content) + new_len = len(msg["content"]) + else: + original_len = len(content) + msg["content"] = new_content + new_len = len(new_content) result.chars_before += original_len result.chars_after += new_len result.chars_saved += original_len - new_len @@ -345,8 +410,11 @@ def dedup_responses_api( call_id = item.get("call_id", "") fn_name = fn_names.get(call_id, call_id) or "tool" + extracted_text, json_key = _extract_text_for_dedup(output) + dedup_target = extracted_text if extracted_text else output + new_output = _dedup_text( - output, + dedup_target, seen_blocks, idx, fn_name, @@ -356,9 +424,14 @@ def dedup_responses_api( pre_seen=pre_seen, ) if new_output is not None: - original_len = len(output) - item["output"] = new_output - new_len = len(new_output) + if json_key and extracted_text: + original_len = len(output) + item["output"] = _rebuild_json_content(output, json_key, new_output) + new_len = len(item["output"]) + else: + original_len = len(output) + item["output"] = new_output + new_len = len(new_output) result.chars_before += original_len result.chars_after += new_len result.chars_saved += original_len - new_len diff --git a/contextpilot/retriever/__init__.py b/contextpilot/retriever/__init__.py index c7efc76..e3ae711 100644 --- a/contextpilot/retriever/__init__.py +++ b/contextpilot/retriever/__init__.py @@ -1,4 +1,9 @@ -from .bm25 import BM25Retriever +try: + from .bm25 import BM25Retriever + BM25_AVAILABLE = True +except ImportError: + BM25Retriever = None + BM25_AVAILABLE = False # FAISS is optional - only import if available try: @@ -26,6 +31,7 @@ __all__ = [ "BM25Retriever", + "BM25_AVAILABLE", "FAISSRetriever", "FAISS_AVAILABLE", "Mem0Retriever", @@ -33,4 +39,4 @@ "MEM0_AVAILABLE", "PageIndexRetriever", "PAGEINDEX_AVAILABLE", -] \ No newline at end of file +] diff --git a/contextpilot/server/intercept_parser.py b/contextpilot/server/intercept_parser.py index 9dfa19e..d2c2848 100644 --- a/contextpilot/server/intercept_parser.py +++ b/contextpilot/server/intercept_parser.py @@ -31,8 +31,8 @@ _SEPARATOR_PATTERNS = ["---", "==="] # Minimum content length to track a single-doc tool_result for dedup. -# Skips tiny results like "ok", error messages, short status outputs. -_SINGLE_DOC_MIN_CHARS = 200 +# Skips tiny results like "ok", short error messages. +_SINGLE_DOC_MIN_CHARS = 100 @dataclass diff --git a/contextpilot_hook.pth b/contextpilot_hook.pth index 0a03894..18e24c1 100644 --- a/contextpilot_hook.pth +++ b/contextpilot_hook.pth @@ -1,2 +1,3 @@ import contextpilot._sglang_hook import contextpilot._vllm_hook +import contextpilot._openai_hook diff --git a/docs/guides/hermes.md b/docs/guides/hermes.md new file mode 100644 index 0000000..d13f293 --- /dev/null +++ b/docs/guides/hermes.md @@ -0,0 +1,106 @@ +# ContextPilot + Hermes Agent Integration Guide + +## Overview + +ContextPilot integrates with [Hermes Agent](https://github.com/NousResearch/hermes-agent) as a native context engine plugin — zero changes to Hermes source code required. It intercepts every LLM call, reorders documents for prefix cache sharing, deduplicates repeated tool results, and deduplicates content blocks across turns. + +Typical savings: **40–50% input tokens** on agentic workloads with repeated file reads. + +## Installation + +```bash +hermes plugins install EfficientContext/ContextPilot +``` + +This clones ContextPilot into `~/.hermes/plugins/ContextPilot/`. Hermes's plugin system discovers the `plugin.yaml` manifest and loads the context engine via the standard `register(ctx)` entry point. + +## Activation + +After installing, enable the plugin in the Hermes plugins menu: + +```bash +hermes plugins +``` + +Navigate to **General Plugins** → toggle **contextpilot** to enabled. + +Restart Hermes. On startup you'll see: + +``` +Plugin 'contextpilot' registered context engine: contextpilot +``` + +> **Note:** The context engine TUI submenu may show "contextpilot (not found)" — this is cosmetic. The engine is fully functional. + +## What it does + +Every API call, before messages are sent to the LLM, ContextPilot runs a six-step pipeline: + +| Step | Operation | Benefit | +|------|-----------|---------| +| 1 | Prefix replay | KV cache prefix stays identical across turns | +| 2 | Extract documents | Parse tool results into document arrays | +| 3 | Reorder | Cluster similar documents for prefix sharing | +| 4 | Cross-turn dedup | Replace repeated file reads with a pointer | +| 5 | Block-level dedup | Content-defined chunking within tool results | +| 6 | Cache | Store modified messages for next turn's prefix replay | + +Steps 2–3 require `numpy` (already a Hermes dependency). If numpy is unavailable, ContextPilot falls back to dedup-only mode. + +## Verifying it works + +After a session with repeated tool calls (e.g. reading the same file twice), check the Hermes log: + +``` +[ContextPilot] Turn 14: saved 4408 chars (~1102 tokens) | cumulative: 19574 chars (~4893 tokens) +``` + +Or query the engine status programmatically: + +```python +from hermes_cli.plugins import get_plugin_manager +engine = get_plugin_manager()._context_engine +print(engine.get_status()) +# {'engine': 'contextpilot', 'contextpilot_chars_saved': 18420, ...} +``` + +## Disabling + +```bash +hermes plugins disable contextpilot +``` + +Or reset the context engine to the built-in compressor: + +```yaml +context: + engine: compressor +``` + +## Uninstalling + +```bash +hermes plugins remove contextpilot +``` + +## How it differs from Hermes's built-in ContextCompressor + +Hermes ships with `ContextCompressor`, a threshold-based LLM-summarization engine. ContextPilot wraps and extends it: + +| | Built-in compressor | ContextPilot | +|---|---|---| +| Trigger | Token threshold (75% of context) | Every API call | +| Approach | Lossy LLM summarization | Lossless dedup + reorder | +| Cache-friendly | No | Yes — preserves prefix for KV cache | +| Cost | One summarization LLM call per compression | Zero extra LLM calls | +| Fallback | N/A | Delegates to built-in compressor when compression is actually needed | + +ContextPilot runs *before* the threshold-based compressor, reducing how often the expensive summarization path is hit. + +## Troubleshooting + +**Plugin not discovered after install.** Check `~/.hermes/plugins/ContextPilot/plugin.yaml` exists and contains `type: context_engine`. Run `hermes plugins list` to confirm. + +**No token savings logged.** Dedup only fires when the LLM reads the same file content more than once in a session. On first reads, content is indexed but not deduplicated. + +**`ModuleNotFoundError: No module named 'numpy'`.** Reorder requires numpy. If unavailable, ContextPilot silently falls back to dedup-only mode. diff --git a/plugin.yaml b/plugin.yaml new file mode 100644 index 0000000..544361d --- /dev/null +++ b/plugin.yaml @@ -0,0 +1,4 @@ +name: contextpilot +type: context_engine +version: "0.1.0" +description: "Per-turn context optimization — prefix alignment, document reordering, and block-level deduplication for KV cache sharing." diff --git a/tests/test_hermes_plugin_patch.py b/tests/test_hermes_plugin_patch.py new file mode 100644 index 0000000..9a372d8 --- /dev/null +++ b/tests/test_hermes_plugin_patch.py @@ -0,0 +1,186 @@ +import importlib.util +from types import SimpleNamespace +import sys +import types +from pathlib import Path + + +def _load_plugin_module(monkeypatch): + agent_pkg = types.ModuleType("agent") + context_engine_mod = types.ModuleType("agent.context_engine") + context_compressor_mod = types.ModuleType("agent.context_compressor") + + class FakeContextEngine: + last_prompt_tokens = 0 + last_completion_tokens = 0 + last_total_tokens = 0 + threshold_tokens = 0 + context_length = 0 + compression_count = 0 + threshold_percent = 0.75 + protect_first_n = 3 + protect_last_n = 6 + + def get_status(self): + return {} + + def on_session_reset(self): + return None + + class FakeContextCompressor(FakeContextEngine): + def __init__(self, **kwargs): + self.threshold_tokens = 0 + self.context_length = kwargs.get("config_context_length") or 0 + self.threshold_percent = 0.75 + self.protect_first_n = 3 + self.protect_last_n = 6 + self.compression_count = 0 + + def update_from_response(self, usage): + self.last_prompt_tokens = usage.get("prompt_tokens", 0) + self.last_completion_tokens = usage.get("completion_tokens", 0) + self.last_total_tokens = usage.get("total_tokens", 0) + + def should_compress(self, prompt_tokens=None): + return False + + def compress(self, messages, current_tokens=None, **kwargs): + return messages + + def on_session_start(self, session_id, **kwargs): + return None + + def update_model(self, **kwargs): + self.context_length = kwargs.get("context_length", self.context_length) + self.threshold_tokens = int(self.context_length * self.threshold_percent) + + def get_tool_schemas(self): + return [] + + def handle_tool_call(self, name, args, **kwargs): + return "{}" + + context_engine_mod.ContextEngine = FakeContextEngine + context_compressor_mod.ContextCompressor = FakeContextCompressor + agent_pkg.context_engine = context_engine_mod + agent_pkg.context_compressor = context_compressor_mod + + monkeypatch.setitem(sys.modules, "agent", agent_pkg) + monkeypatch.setitem(sys.modules, "agent.context_engine", context_engine_mod) + monkeypatch.setitem(sys.modules, "agent.context_compressor", context_compressor_mod) + + run_agent_mod = types.ModuleType("run_agent") + + class FakeAIAgent: + @staticmethod + def _sanitize_api_messages(messages): + return messages + [{"role": "tool", "content": "sanitized"}] + + run_agent_mod.AIAgent = FakeAIAgent + monkeypatch.setitem(sys.modules, "run_agent", run_agent_mod) + + module_path = Path(__file__).resolve().parents[1] / "__init__.py" + spec = importlib.util.spec_from_file_location("contextpilot_hermes_plugin_test", module_path) + module = importlib.util.module_from_spec(spec) + assert spec is not None and spec.loader is not None + spec.loader.exec_module(module) + return module, run_agent_mod + + +def test_patch_routes_instance_sanitization_through_contextpilot(monkeypatch): + module, run_agent_mod = _load_plugin_module(monkeypatch) + module._patch_hermes_sanitizer() + + engine = module.ContextPilotEngine() + calls = [] + + def optimize(messages, **kwargs): + calls.append((messages, kwargs)) + return messages + [{"role": "assistant", "content": "optimized"}], {"chars_saved": 1} + + engine.optimize_api_messages = optimize + + agent = run_agent_mod.AIAgent() + agent.context_compressor = engine + agent._cached_system_prompt = "system prompt" + + out = agent._sanitize_api_messages([{"role": "user", "content": "hello"}]) + + assert out == [ + {"role": "user", "content": "hello"}, + {"role": "tool", "content": "sanitized"}, + {"role": "assistant", "content": "optimized"}, + ] + assert calls == [ + ( + [ + {"role": "user", "content": "hello"}, + {"role": "tool", "content": "sanitized"}, + ], + {"system_content": "system prompt"}, + ) + ] + + +def test_patch_preserves_class_level_sanitizer_usage(monkeypatch): + module, run_agent_mod = _load_plugin_module(monkeypatch) + module._patch_hermes_sanitizer() + + out = run_agent_mod.AIAgent._sanitize_api_messages([{"role": "user", "content": "hello"}]) + + assert out == [ + {"role": "user", "content": "hello"}, + {"role": "tool", "content": "sanitized"}, + ] + + +def test_prefix_replay_matches_original_and_reuses_optimized_prefix(monkeypatch): + module, _ = _load_plugin_module(monkeypatch) + monkeypatch.setattr(module, "_check_reorder", lambda: False) + monkeypatch.setattr(module, "_CONTEXTPILOT_AVAILABLE", False) + + calls = [] + + def dedup(body, **kwargs): + messages = body["messages"] + calls.append([m.copy() for m in messages]) + saved = 0 + for msg in messages: + if msg.get("role") == "tool" and msg.get("content") == "FULL TOOL RESULT": + msg["content"] = "DEDUPED TOOL RESULT" + saved += len("FULL TOOL RESULT") - len("DEDUPED TOOL RESULT") + return SimpleNamespace( + chars_saved=saved, + blocks_deduped=1 if saved else 0, + blocks_total=1, + system_blocks_matched=0, + ) + + monkeypatch.setattr(module, "dedup_chat_completions", dedup) + + engine = module.ContextPilotEngine() + first = [ + {"role": "user", "content": "read file"}, + {"role": "tool", "tool_call_id": "call_1", "content": "FULL TOOL RESULT"}, + ] + first_out, _ = engine.optimize_api_messages(first) + + assert first_out[1]["content"] == "DEDUPED TOOL RESULT" + assert engine._cached_original_messages[1]["content"] == "FULL TOOL RESULT" + assert engine._cached_messages[1]["content"] == "DEDUPED TOOL RESULT" + + already_optimized = {"messages": [m.copy() for m in first_out]} + engine._intercept_chat_kwargs(already_optimized) + assert engine._cached_original_messages[1]["content"] == "FULL TOOL RESULT" + assert already_optimized["messages"][1]["content"] == "DEDUPED TOOL RESULT" + + second = [ + {"role": "user", "content": "read file"}, + {"role": "tool", "tool_call_id": "call_1", "content": "FULL TOOL RESULT"}, + {"role": "user", "content": "now summarize it"}, + ] + second_out, _ = engine.optimize_api_messages(second) + + assert second_out[1]["content"] == "DEDUPED TOOL RESULT" + assert second_out[2]["content"] == "now summarize it" + assert calls[-1][1]["content"] == "DEDUPED TOOL RESULT"