diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f48ea1b..e5e6407 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,7 +10,7 @@ permissions: contents: read jobs: - test: + unit-tests: runs-on: ubuntu-latest steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -20,5 +20,42 @@ jobs: pixi-version: latest environments: dev - - name: Run tests + - name: Run unit tests run: pixi run -e dev test + + integration-tests: + runs-on: ubuntu-latest + services: + minio: + image: bitnamilegacy/minio:latest@sha256:b3d51900e846b92f7503ca6be07d2e8c56ebb6a13a60bc71b8777c716c074bcf + ports: + - 9000:9000 + env: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + MINIO_DEFAULT_BUCKETS: josh-test-bucket:public + MINIO_SCHEME: http + options: >- + --health-cmd "curl -f http://localhost:9000/minio/health/ready || curl -f http://localhost:9000/minio/health/live" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + persist-credentials: false + + - uses: prefix-dev/setup-pixi@a0af7a228712d6121d37aba47adf55c1332c9c2e # v0.9.4 + with: + pixi-version: latest + environments: dev + + - name: Verify MinIO is ready + run: curl -f http://localhost:9000/minio/health/ready + + - name: Download Josh JAR + run: pixi run get-jars + + - name: Run integration tests + run: pixi run -e dev test-integration diff --git a/BATCH_INTEGRATION.md b/BATCH_INTEGRATION.md new file mode 100644 index 0000000..b38fdb0 --- /dev/null +++ b/BATCH_INTEGRATION.md @@ -0,0 +1,344 @@ +# Plan: Batch Remote Execution for joshpy + +Tracking issue: [joshpy#31](https://github.com/SchmidtDSE/joshpy/issues/31) +Companion Java plan: [josh#374](https://github.com/SchmidtDSE/josh/issues/374) +Dependency: [josh#406](https://github.com/SchmidtDSE/josh/issues/406) — `pollBatch` CLI command for async job status polling + +## Context + +joshsim (Java) has added `batchRemote` — a parallel execution path using MinIO staging and target profiles instead of HTTP streaming. PRs 1-7 are merged on the Java side ([josh#374](https://github.com/SchmidtDSE/josh/issues/374)). joshpy needs to wrap these new capabilities and provide efficient Python-level orchestration for parameter sweeps. + +**Immediate motivation:** A production run has 5 of 6 replicate CSVs sitting in MinIO (the 6th OOM'd). The run is registered in the local RunRegistry with a label. We need a way to recover those results NOW — look up the run by label, discover the `minio://` export paths, read the CSVs directly into DuckDB via S3, and load them into the registry. This drives the PR ordering: result ingestion first, then the rest of the batch infrastructure. + +**Access model (Model A):** MinIO/S3 CSVs are the source of truth. The local `.duckdb` is a materialized cache that any machine can rebuild by re-ingesting from S3. DuckDB reads CSVs directly from S3 via `httpfs` — no download, no local disk needed for the CSV data. This supports future access patterns: browser WASM reading S3, serverless aggregators attaching `.duckdb`, multi-machine access. + +**State ownership:** josh is stateless/ephemeral — it dispatches jobs and can check their status, but holds no long-running state. joshpy owns all state via RunRegistry (what was run, parameters, label, job ID). When joshpy dispatches a `--no-wait` batch job, it stores the `batch_job_id` in `job_runs.metadata`. To poll, joshpy calls josh's `pollBatch` CLI command ([josh#406](https://github.com/SchmidtDSE/josh/issues/406)) which knows HOW to check status for each target type (MinIO status file, K8s Job API, etc.). joshpy doesn't know or care about the polling mechanism internals — it just gets back "running" / "complete" / "error". + +**Key design decisions:** +- `batchRemote` has no `--data` flags — files stage to/from MinIO. First positional arg can be a `.josh` file OR a directory. The caller stages data; the worker pulls via `stageFromMinio`. +- Auto-pull results from MinIO after jobs complete, with opt-out for fire-and-forget. Plus a generic "ingest CSVs after the fact" code path that works for both batch remote AND local OOM recovery (DRY). +- Target config system is SHARED between josh and joshpy — joshpy reads AND creates `~/.josh/targets/.json`. +- MinIO cred resolution hierarchy (mirrors joshsim's `HierarchyConfig`): CLI flags > profile JSON > env vars (`MINIO_ENDPOINT`, `MINIO_ACCESS_KEY`, `MINIO_SECRET_KEY`, `MINIO_BUCKET`). Secrets don't need to live in profile JSON. +- K8s targets have a separate `pod_minio_endpoint` — the in-cluster MinIO endpoint pods use, which may differ from the outer `minio_endpoint` used for host-side staging. +- For sweeps: stage shared data (.josh, .jshd) to MinIO ONCE, then per-job stage only the unique .jshc config. joshpy orchestrates staging directly (not via `batchRemote`). +- Dev JARs are outdated — implement against spec, test when updated. + +--- + +## PR Plan + +``` +PR1 (S3-native ingest) → PR2 (target profiles) → PR3 (CLI wrappers) → PR4 (sweep integration) → PR5 (shared staging optimization) → PR6 (polish) +``` + +### Regression gates (every PR) +- `pixi run pytest` passes +- Existing `runRemote` path completely untouched + +--- + +### PR 1: Result Recovery — S3-native `ingest_results()` + +**Solves the immediate need.** Enables recovering results from MinIO into the registry by label. DuckDB reads CSVs directly from S3 via httpfs — no download, no local disk needed for the CSV data. Also provides `download=True` fallback via `stageFromMinio` for users who want local copies. + +#### New utility: `configure_s3()` in `joshpy/registry.py` (or `joshpy/s3.py`) + +Reusable DuckDB S3/MinIO connection setup — the foundation for all future S3 access (serverless aggregators, WASM, multi-machine): + +```python +def configure_s3(conn, endpoint: str, access_key: str, secret_key: str, url_style: str = "path") -> None: + """Configure DuckDB connection for S3/MinIO access via httpfs.""" + conn.execute("INSTALL httpfs; LOAD httpfs;") + conn.execute(f""" + CREATE OR REPLACE SECRET ( + TYPE s3, + KEY_ID '{access_key}', + SECRET '{secret_key}', + ENDPOINT '{endpoint}', + URL_STYLE '{url_style}', + USE_SSL true + ) + """) +``` + +S3 credentials resolve via hierarchy: explicit args > env vars (`MINIO_ENDPOINT`, `MINIO_ACCESS_KEY`, `MINIO_SECRET_KEY`). The function takes explicit args; the caller (`ingest_results`) handles the env var fallback. + +#### Modify `CellDataLoader.load_csv()` in `joshpy/cell_data.py` + +Accept `str` (S3 URL) in addition to `Path`: +```python +def load_csv(self, csv_path: Path | str, run_id: str, run_hash: str, ...) -> int: + if isinstance(csv_path, str) and csv_path.startswith("s3://"): + csv_path_str = csv_path # S3 URL — pass directly to read_csv_auto + else: + csv_path_str = str(Path(csv_path).resolve()) # local path (existing behavior) + # ... rest unchanged — read_csv_auto handles both +``` + +The existing `read_csv_auto()` call works with S3 URLs natively once httpfs is loaded. + +#### New function: `ingest_results()` in `joshpy/sweep.py` + +The core recovery function. Works by label (not by `JobSet`): + +```python +def ingest_results( + cli: JoshCLI, + registry: RunRegistry, + label_or_hash: str, + *, + export_type: str = "patch", + download: bool = False, # if True, download via stageFromMinio instead of S3 read + output_dir: Path | None = None, # download destination (only used when download=True) + minio_bucket: str | None = None, # override bucket (else from ExportFileInfo.host) + quiet: bool = False, +) -> int: +``` + +**Flow:** +1. `registry._resolve_label_or_hash(label_or_hash)` -> `run_hash` +2. `registry.get_config_by_hash(run_hash)` -> `ConfigInfo` (josh_path, josh_content, parameters, label) +3. `registry.get_session(config.session_id)` -> `SessionInfo` (simulation, total_replicates) +4. Get josh source on disk: if `config.josh_path` exists use it; otherwise write `config.josh_content` to a temp file +5. `cli.inspect_exports(script, simulation)` -> `ExportPaths` +6. Get `ExportFileInfo` for `export_type` -> check `info.protocol` +7. If `protocol == "minio"` and NOT `download`: + - Configure S3 on registry connection: `configure_s3(registry.conn, endpoint, access_key, secret_key)` + - Translate `minio://bucket/path` to `s3://bucket/path` for DuckDB +8. If `protocol == "minio"` and `download`: + - Call `cli.stage_from_minio(...)` to download locally (fallback path) +9. `registry._resolve_run_id_for_hash(run_hash)` -> `run_id` for the latest execution +10. For each replicate 0..`total_replicates-1`: + - Build template vars: `{simulation, replicate, **config.parameters, label: config.label}` + - Resolve path template -> concrete path + - If minio (no download): remap to `s3://bucket/resolved_path` + - If minio (download): remap to `output_dir / filename` + - Load via `CellDataLoader.load_csv(csv_path_or_url, run_id, run_hash)` + - If file/object doesn't exist -> skip gracefully (the OOM'd replicate), print which one +11. Return total rows loaded + +#### Also in this PR: `StageFromMinioConfig` + `stage_from_minio()` for `download=True` path + +```python +@dataclass(frozen=True) +class StageFromMinioConfig: + output_dir: Path + prefix: str + minio_endpoint: str | None = None + minio_access_key: str | None = None + minio_secret_key: str | None = None + minio_bucket: str | None = None +``` + +Plus `JoshCLI.stage_from_minio()` method wrapping `stageFromMinio --output-dir=... --prefix=... [--minio-* options]`. + +#### New method: `SweepManager.ingest()` in `joshpy/sweep.py` + +```python +def ingest(self, export_type="patch", download=False, output_dir=None, quiet=False) -> int: + label = getattr(self, '_label', None) or self.job_set.jobs[0].run_hash + return ingest_results(self.cli, self.registry, label, export_type=export_type, + download=download, output_dir=output_dir, quiet=quiet) +``` + +#### Exports: `joshpy/__init__.py` +- Add `StageFromMinioConfig`, `configure_s3` to CLI exports +- Add `ingest_results` to sweep exports + +#### Tests +- `tests/test_cli.py`: `StageFromMinioConfig` defaults, `stage_from_minio()` arg building +- `tests/test_sweep.py`: `ingest_results` with mocked registry + mocked DuckDB (S3 URL construction, download fallback, missing replicate skip, josh_content temp file fallback) + +#### User-facing example (pixi task in josh-models) + +```toml +recover = { cmd = "python scripts/recover.py", env = { JOSH_LABEL = "{{ LABEL }}" }, args = [{ arg = "LABEL" }], description = "Recover results from MinIO: pixi run recover