diff --git a/perfkitbenchmarker/benchmark_spec.py b/perfkitbenchmarker/benchmark_spec.py index 26b5f1a69a..1574af534b 100644 --- a/perfkitbenchmarker/benchmark_spec.py +++ b/perfkitbenchmarker/benchmark_spec.py @@ -65,6 +65,7 @@ from perfkitbenchmarker.configs import vm_group_decoders from perfkitbenchmarker.resources import ai_agent_service from perfkitbenchmarker.resources import base_job +from perfkitbenchmarker.resources import agent_sandbox from perfkitbenchmarker.resources import example_resource from perfkitbenchmarker.resources import managed_ai_model from perfkitbenchmarker.resources.container_service import container_cluster @@ -202,6 +203,7 @@ def __init__( self.base_job = None self.edw_service = None self.edw_compute_resource = None + self.agent_sandbox = None self.example_resource = None self.multi_attach_disk = None self.nfs_service = None @@ -337,6 +339,7 @@ def ConstructResources(self): # Put registry first, as it can be needed by cluster. self.ConstructContainerRegistry() self.ConstructContainerCluster() + self.ConstructAgentSandbox() # dpb service needs to go first, because it adds some vms. self.ConstructDpbService() self.ConstructCluster() @@ -589,6 +592,19 @@ def ConstructExampleResource(self): ) # pytype: disable=not-instantiable self.resources.append(self.example_resource) + def ConstructAgentSandbox(self): + """Create the agent_sandbox object (requires a container_cluster).""" + if self.config.agent_sandbox is None: + return + if self.container_cluster is None: + raise errors.Config.InvalidValue( + 'agent_sandbox requires a container_cluster to be configured.') + self.agent_sandbox = agent_sandbox.GetAgentSandbox( + self.config.agent_sandbox, self.container_cluster + ) + if self.agent_sandbox: + self.resources.append(self.agent_sandbox) + def ConstructBaseJob(self): """Create an instance of the base job.It is also called from pkb.py.""" if self.config.base_job is None: @@ -1057,6 +1073,8 @@ def Provision(self): if self.container_cluster: self.container_cluster.Create() + if self.agent_sandbox: + self.agent_sandbox.Create() # do after network setup but before VM created if self.nfs_service and self.nfs_service.CLOUD != nfs_service.UNMANAGED: @@ -1207,6 +1225,8 @@ def Delete(self): self.edw_service.Delete() if hasattr(self, 'edw_compute_resource') and self.edw_compute_resource: self.edw_compute_resource.Delete() + if self.agent_sandbox: + self.agent_sandbox.Delete() if self.example_resource: self.example_resource.Delete() if self.base_job: diff --git a/perfkitbenchmarker/configs/benchmark_config_spec.py b/perfkitbenchmarker/configs/benchmark_config_spec.py index 32993184e9..ef6f3bc3a1 100644 --- a/perfkitbenchmarker/configs/benchmark_config_spec.py +++ b/perfkitbenchmarker/configs/benchmark_config_spec.py @@ -39,6 +39,7 @@ from perfkitbenchmarker.configs import spec from perfkitbenchmarker.configs import vm_group_decoders from perfkitbenchmarker.resources import ai_agent_service_spec +from perfkitbenchmarker.resources import agent_sandbox_spec from perfkitbenchmarker.resources import example_resource_spec from perfkitbenchmarker.resources import jobs_setter from perfkitbenchmarker.resources import managed_ai_model_spec @@ -1488,6 +1489,10 @@ def _GetOptionDecoderConstructions(cls): 'tpu_groups': (_TpuGroupsDecoder, {'default': {}}), 'edw_compute_resource': (_EdwComputeResourceDecoder, {'default': None}), 'edw_service': (_EdwServiceDecoder, {'default': None}), + 'agent_sandbox': ( + agent_sandbox_spec.AgentSandboxConfigDecoder, + {'default': None, 'none_ok': True}, + ), 'example_resource': (_ExampleResourceDecoder, {'default': None}), 'base_job': (_BaseJobDecoder, {'default': None}), 'memory_store': (_MemoryStoreDecoder, {'default': None}), diff --git a/perfkitbenchmarker/data/agent_sandbox/gvisor-installer/daemonset.yaml b/perfkitbenchmarker/data/agent_sandbox/gvisor-installer/daemonset.yaml new file mode 100644 index 0000000000..63cce89b6c --- /dev/null +++ b/perfkitbenchmarker/data/agent_sandbox/gvisor-installer/daemonset.yaml @@ -0,0 +1,65 @@ +# Privileged DaemonSet that runs an init container to install runsc and the +# containerd-runsc shim onto the host, then sleeps as a pause container. +# +# The actual install logic comes from a ConfigMap named gvisor-installer-script +# (created by install_gvisor() in resources/kubernetes/k8s_agent_sandbox.py from +# data/agent_sandbox/gvisor-installer/install.sh before this DaemonSet is +# applied). The ConfigMap key is "install.sh", mounted at /scripts. +# +# nodeSelector and tolerations are injected at apply time (see +# _render_gvisor_daemonset in resources/kubernetes/k8s_agent_sandbox.py) so the +# DaemonSet targets the sandbox node pool via the pkb_nodepool label. +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: gvisor-installer + namespace: kube-system + labels: + app.kubernetes.io/name: gvisor-installer +spec: + selector: + matchLabels: + app.kubernetes.io/name: gvisor-installer + template: + metadata: + labels: + app.kubernetes.io/name: gvisor-installer + spec: + hostPID: true + initContainers: + - name: install + image: docker.io/library/ubuntu:24.04 + imagePullPolicy: IfNotPresent + securityContext: + privileged: true + env: + - name: GVISOR_VERSION + # Pinned for benchmarking. Update in lockstep across all envs. + # Verify available releases at https://gvisor.dev/docs/user_guide/install/ + value: "20260511" + command: ["/bin/bash", "/scripts/install.sh"] + volumeMounts: + - name: host + mountPath: /host + - name: script + mountPath: /scripts + readOnly: true + containers: + # Pause container keeps the DaemonSet "Running" after install completes. + - name: pause + image: registry.k8s.io/pause:3.9 + resources: + requests: + cpu: 10m + memory: 16Mi + limits: + cpu: 50m + memory: 64Mi + volumes: + - name: host + hostPath: + path: / + - name: script + configMap: + name: gvisor-installer-script + defaultMode: 0755 diff --git a/perfkitbenchmarker/data/agent_sandbox/gvisor-installer/install.sh b/perfkitbenchmarker/data/agent_sandbox/gvisor-installer/install.sh new file mode 100644 index 0000000000..b34bc302da --- /dev/null +++ b/perfkitbenchmarker/data/agent_sandbox/gvisor-installer/install.sh @@ -0,0 +1,81 @@ +#!/bin/bash +set -euxo pipefail + +: "${GVISOR_VERSION:?must be set}" +HOST=/host +ARCH=$(uname -m) +URL="https://storage.googleapis.com/gvisor/releases/release/${GVISOR_VERSION}/${ARCH}" + +apt-get update -qq +apt-get install -y -qq curl util-linux + +NEEDS_RESTART=0 + +# On COS nodes /usr/local/bin is read-only; binaries live on the writable +# stateful partition at /home/kubernetes/bin. On all other nodes (Ubuntu, +# Amazon Linux) /usr/local/bin is writable and already on PATH. +if [ -d "${HOST}/home/kubernetes" ]; then + INSTALL_DIR="${HOST}/home/kubernetes/bin" + NEEDS_PATH_DROPIN=1 +else + INSTALL_DIR="${HOST}/usr/local/bin" + NEEDS_PATH_DROPIN=0 +fi +mkdir -p "${INSTALL_DIR}" + +for bin in runsc containerd-shim-runsc-v1; do + TARGET="${INSTALL_DIR}/${bin}" + if [ ! -x "${TARGET}" ]; then + curl -fsSL "${URL}/${bin}" -o "${TARGET}.new" + chmod +x "${TARGET}.new" + mv "${TARGET}.new" "${TARGET}" + NEEDS_RESTART=1 + fi +done + +# On COS, /home/kubernetes/bin is not on systemd's default PATH; drop in a +# unit override for containerd so the shim is found. Not needed on non-COS +# nodes where /usr/local/bin is already on PATH. +if [ "${NEEDS_PATH_DROPIN}" -eq 1 ]; then + DROPIN_DIR="${HOST}/etc/systemd/system/containerd.service.d" + DROPIN="${DROPIN_DIR}/10-runsc-path.conf" + mkdir -p "${DROPIN_DIR}" + if [ ! -f "${DROPIN}" ]; then + cat > "${DROPIN}" <<'EOF' +[Service] +Environment="PATH=/home/kubernetes/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" +EOF + NEEDS_RESTART=1 + fi +fi + +# Register the runsc runtime with containerd. +CONFIG="${HOST}/etc/containerd/config.toml" +if [ ! -f "${CONFIG}" ]; then + mkdir -p "$(dirname "${CONFIG}")" + nsenter -t 1 -m -u -i -n -p -- containerd config default > "${CONFIG}" +fi +if ! grep -q 'io.containerd.runsc.v1' "${CONFIG}"; then + # containerd v2+ uses config version 3 where the CRI runtime plugin moved + # from io.containerd.grpc.v1.cri to io.containerd.cri.v1.runtime. + # Appending to the wrong section is silently ignored, leaving runsc + # unconfigured even though the binary is installed. + if grep -q 'version = 3' "${CONFIG}"; then + CRI_PLUGIN='io.containerd.cri.v1.runtime' + else + CRI_PLUGIN='io.containerd.grpc.v1.cri' + fi + cat >>"${CONFIG}" <0 holds the sandbox so peak_concurrency ' + 'reflects simultaneously-alive sandboxes and exec/lifecycle metrics emit.') + +BENCHMARK_NAME = 'agent_sandbox' +BENCHMARK_CONFIG = """ +agent_sandbox: + description: > + Submit SandboxClaims at a target QPS and measure provisioning latency on + the agent-sandbox stack. + container_cluster: + cloud: GCP + type: Kubernetes + vm_count: 1 + vm_spec: + GCP: + machine_type: c4-standard-4 + zone: us-central1-a + AWS: + machine_type: m8i.xlarge + zone: us-east-1a + nodepools: + sandbox: + vm_count: 4 + vm_spec: + GCP: + machine_type: c4-standard-16 + zone: us-central1-a + AWS: + machine_type: m8i.4xlarge + zone: us-east-1a + node_taints: + - sandbox.gke.io/runtime=runsc:NoSchedule + agent_sandbox: + type: Kubernetes +""" + +# K8s object names for the in-pod load runner. +_LOAD_RUNNER_CONFIGMAP = 'agent-sandbox-load-runner-script' +_LOAD_RUNNER_JOB = 'agent-sandbox-load-runner' +_LOAD_RUNNER_RBAC_MANIFEST = 'agent_sandbox/load_runner_rbac.yaml.j2' +_LOAD_RUNNER_JOB_MANIFEST = 'agent_sandbox/load_runner_job.yaml.j2' +_RESULTS_SENTINEL = '---RESULTS---' + + +def GetConfig(user_config: dict[str, Any]) -> dict[str, Any]: + """Loads the benchmark config and merges user overrides.""" + config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) + config['container_cluster']['cloud'] = FLAGS.cloud + return config + + +def _create_loadgen_configmap(namespace: str) -> None: + """Creates or updates the load-runner-script ConfigMap from the loadgen source. + + Uses --dry-run=client -o yaml to render the ConfigMap (idempotent on + re-runs), writes it to a temp file, and applies it. + """ + script_path = inspect.getsourcefile(agent_sandbox_loadgen) + yaml_out, _, _ = kubectl.RunKubectlCommand([ + 'create', + 'configmap', + _LOAD_RUNNER_CONFIGMAP, + f'--from-file=load_runner.py={script_path}', + '--namespace', namespace, + '--dry-run=client', + '-o', 'yaml', + ]) + tmpfile = tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) + with tmpfile as tmp: + tmp.write(yaml_out) + tmp_path = tmp.name + try: + kubectl.RunKubectlCommand(['apply', '-f', tmp_path]) + finally: + os.unlink(tmp_path) + + +def Prepare(benchmark_spec: Any) -> None: + """Installs the controller, sandbox template, warm pool, and load runner RBAC. + + CRDs, RBAC, and gVisor are installed during provision. The controller stack + is installed here so it can be re-applied against an existing cluster with + `--run_stage=prepare` to iterate on controller settings without recreating + the cluster. + """ + sandbox = benchmark_spec.agent_sandbox + if sandbox is None: + raise errors.Benchmarks.PrepareException( + 'agent_sandbox must be configured for this benchmark') + if stages.PROVISION not in FLAGS.run_stage: + sandbox.RefreshSpecFromFlags() + sandbox.InstallWorkload() + + namespace = sandbox.spec.namespace + _create_loadgen_configmap(namespace) + kubernetes_commands.ApplyManifest( + _LOAD_RUNNER_RBAC_MANIFEST, + namespace=namespace, + ) + + +def Run(benchmark_spec: Any) -> list[sample.Sample]: + """Runs the load generator Job inside the cluster and returns latency samples.""" + sandbox = benchmark_spec.agent_sandbox + spec = sandbox.spec + namespace = spec.namespace + + shape = agent_sandbox_loadgen.resolve_run_shape( + qps=_QPS.value, + duration=_DURATION.value if _TOTAL.value is None else None, + total=_TOTAL.value) + + # Delete any previous job so kubectl apply works cleanly. + kubectl.RunKubectlCommand( + ['delete', 'job', _LOAD_RUNNER_JOB, '--namespace', namespace, + '--ignore-not-found'], + ) + + # Render and apply the Job manifest. + kubernetes_commands.ApplyManifest( + _LOAD_RUNNER_JOB_MANIFEST, + namespace=namespace, + template_name=k8s_agent_sandbox.SANDBOX_NAME, + qps=shape.qps, + total=shape.total, + duration=shape.duration, + max_concurrent=_MAX_CONCURRENT.value, + workload_duration=_WORKLOAD_DURATION.value, + ready_timeout=_READY_TIMEOUT.value, + ) + + # Derive a generous wait timeout: submission window + per-claim timeout + + # workload duration + margin. + wait_timeout = int( + shape.duration + _READY_TIMEOUT.value + _WORKLOAD_DURATION.value + 120 + ) + + # Wait for the Job to complete OR fail (whichever comes first). + condition = kubernetes_commands.WaitForResourceForMultiConditions( + f'job/{_LOAD_RUNNER_JOB}', + conditions=['condition=Complete', 'condition=Failed'], + namespace=namespace, + timeout=wait_timeout, + ) + + # Fetch logs regardless; we need them for results or the error message. + pod_out, _, _ = kubectl.RunKubectlCommand([ + 'get', 'pods', + '--namespace', namespace, + '-l', f'job-name={_LOAD_RUNNER_JOB}', + '-o', 'jsonpath={.items[0].metadata.name}', + ]) + pod_name = pod_out.strip() + + logs_out, _, _ = kubectl.RunKubectlCommand( + ['logs', pod_name, '--namespace', namespace], + ) + + if condition == 'condition=Failed': + raise errors.Benchmarks.RunError( + f'Load runner Job {_LOAD_RUNNER_JOB!r} failed.\n' + f'Pod logs:\n{logs_out}' + ) + + # Split on the LAST ---RESULTS--- sentinel. + parts = logs_out.split(_RESULTS_SENTINEL) + if len(parts) < 2: + raise errors.Benchmarks.RunError( + f'No {_RESULTS_SENTINEL!r} sentinel found in pod logs.\n' + f'Pod logs:\n{logs_out}' + ) + jsonl_section = parts[-1].strip() + + records = [] + peak_concurrency = None + for line in jsonl_section.splitlines(): + line = line.strip() + if not line: + continue + obj = json.loads(line) + if 'summary' in obj: + peak_concurrency = obj['summary']['peak_concurrency'] + else: + records.append(agent_sandbox_loadgen.ClaimRecord(**obj)) + + if peak_concurrency is None: + logging.warning('No summary line found; defaulting peak_concurrency to 0') + peak_concurrency = 0 + + metadata = { + 'target_qps': shape.qps, + 'duration': shape.duration, + 'total_claims': shape.total, + 'warmpool_replicas': spec.sandbox_warmpool.replicas, + 'runtime_class': spec.sandbox_template.runtime_class, + } + metadata.update(benchmark_spec.container_cluster.GetResourceMetadata()) + return agent_sandbox_metrics.build_samples( + records, peak_concurrency, metadata) + + +def Cleanup(benchmark_spec: Any) -> None: + """No-op: cluster teardown reclaims the agent sandbox stack.""" + del benchmark_spec diff --git a/perfkitbenchmarker/linux_benchmarks/agent_sandbox_loadgen.py b/perfkitbenchmarker/linux_benchmarks/agent_sandbox_loadgen.py new file mode 100644 index 0000000000..7214dffbc2 --- /dev/null +++ b/perfkitbenchmarker/linux_benchmarks/agent_sandbox_loadgen.py @@ -0,0 +1,853 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Host-side load generator for the agent_sandbox benchmark. + +Creates SandboxClaim custom resources at a target QPS and measures the time +from claim creation to the claim reporting a Ready status condition. + +Design: a single shared Watch stream (stream_claim_events) records readiness +from status events rather than per-claim polling, so we never flood the +apiserver with individual status GETs under concurrency. +""" + +from __future__ import annotations + +import argparse +import dataclasses +import json +import logging +import sys +import threading +import time +from concurrent import futures +from typing import Any, Callable, Iterator, Optional, cast + + +# Default connection pool size for kubernetes ApiClient instances. Callers that +# know their concurrency level should pass an explicit value instead. +_CONNECTION_POOL_MAXSIZE = 64 + +# 429 retry parameters for create(). +_CREATE_MAX_RETRIES = 5 +_CREATE_RETRY_AFTER_DEFAULT = 1.0 # seconds +_CREATE_RETRY_AFTER_CAP = 5.0 # seconds + + +@dataclasses.dataclass(frozen=True) +class RunShape: + """A fully resolved load shape: rate, window, and total claim count.""" + + qps: float + duration: float + total: int + + +def resolve_run_shape( + qps: Optional[float] = None, + duration: Optional[float] = None, + total: Optional[int] = None, +) -> RunShape: + """Resolves any two of qps, duration, total into a complete RunShape. + + Args: + qps: Target claims per second. + duration: Submission window in seconds. + total: Total number of claims to submit. + + Returns: + A RunShape with all three fields populated. + + Raises: + ValueError: If fewer than two values are provided. + """ + if qps is not None and duration is not None and total is not None: + # All three provided; respect them as-is. + pass + elif qps is not None and duration is not None: + total = int(round(qps * duration)) + elif qps is not None and total is not None: + duration = total / qps + elif duration is not None and total is not None: + qps = total / duration + else: + raise ValueError('Provide at least two of qps, duration, total.') + return RunShape(qps=float(qps), duration=float(duration), total=int(total)) + + +CLAIM_API_GROUP = 'extensions.agents.x-k8s.io' +CLAIM_API_VERSION = 'v1beta1' +CLAIM_PLURAL = 'sandboxclaims' +CLAIM_KIND = 'SandboxClaim' + + +def _load_kube_config() -> None: + """Loads kubeconfig for the current PKB run. Isolated for test mocking.""" + from kubernetes import config # pylint: disable=import-error,no-name-in-module + + config.load_kube_config() + + +def _register_bearer_token_auth(cfg: Any) -> None: + """Registers the exec-plugin bearer token under the key the client expects. + + kubernetes-client>=36 auth_settings() looks for the token under the + 'BearerToken' api_key, but load_kube_config() stores exec-plugin tokens + under 'authorization'. Without this remap the Authorization header is never + added to requests and every call goes out unauthenticated. + """ + token = cfg.api_key.get('authorization', '') + if token and 'BearerToken' not in cfg.api_key: + if token.startswith('Bearer '): + token = token[len('Bearer ') :] + cfg.api_key['BearerToken'] = token + cfg.api_key_prefix['BearerToken'] = 'Bearer' + + +def _new_api_client(connection_pool_maxsize: int = _CONNECTION_POOL_MAXSIZE) -> Any: + """Builds a Kubernetes ApiClient with a sized connection pool and working + bearer-token auth. + + Every API client in this module needs the same two things, so they are + applied once here rather than in each constructor: + - connection_pool_maxsize, so concurrent requests don't serialize behind + a small urllib3 pool. + - the exec-plugin bearer-token remap (see _register_bearer_token_auth). + """ + from kubernetes import client # pylint: disable=import-error,no-name-in-module + + cfg = client.Configuration.get_default_copy() + cfg.connection_pool_maxsize = connection_pool_maxsize + _register_bearer_token_auth(cfg) + return client.ApiClient(cfg) + + +def _make_custom_objects_api(connection_pool_maxsize: int = _CONNECTION_POOL_MAXSIZE) -> Any: + """Builds a CustomObjectsApi client with a sized connection pool.""" + from kubernetes import client # pylint: disable=import-error,no-name-in-module + + return client.CustomObjectsApi(_new_api_client(connection_pool_maxsize)) + + +def _make_core_v1_api(connection_pool_maxsize: int = _CONNECTION_POOL_MAXSIZE) -> Any: + """Builds a CoreV1Api client with a sized connection pool (for pod exec).""" + from kubernetes import client # pylint: disable=import-error,no-name-in-module + + return client.CoreV1Api(_new_api_client(connection_pool_maxsize)) + + +@dataclasses.dataclass +class ClaimRecord: + """Timing record for one SandboxClaim.""" + + name: str + requested_at: float + ready_at: Optional[float] = None + error: Optional[str] = None + warm_served: Optional[bool] = None + exec_started_at: Optional[float] = None + exec_completed_at: Optional[float] = None + released_at: Optional[float] = None + + @property + def startup_time_s(self) -> Optional[float]: + if self.ready_at is None: + return None + return self.ready_at - self.requested_at + + @property + def exec_duration_s(self) -> Optional[float]: + if self.exec_started_at is None or self.exec_completed_at is None: + return None + return self.exec_completed_at - self.exec_started_at + + @property + def total_lifecycle_s(self) -> Optional[float]: + if self.released_at is None: + return None + return self.released_at - self.requested_at + + +def _busy_loop_code(workload_duration: float) -> str: + """Python source for a CPU busy-loop that runs ~workload_duration seconds.""" + return ( + 'import time\n' + 'start = time.monotonic()\n' + 'count = 0\n' + f'while time.monotonic() - start < {workload_duration}:\n' + ' _ = (count * 2) + (count ** 2)\n' + ' count += 1\n' + 'print(f"iterations={count}")\n' + ) + + +class WorkloadExecutor: + """Runs a workload inside a Ready sandbox and blocks until it completes.""" + + def prepare(self, _claim_name: str, _status: dict[str, Any]) -> Any: + """Called eagerly in the watch consumer when the claim becomes Ready. + + Returns any pre-fetched data that execute() needs. The return value is + passed through as the `prepared` argument to execute(). Default: None.""" + return None + + def execute(self, _claim_name: str, _status: Optional[dict[str, Any]], _prepared: Any) -> None: + raise NotImplementedError + + +class NoopExecutor(WorkloadExecutor): + """No workload: used when workload_duration == 0 (immediate release).""" + + def execute(self, _claim_name: str, _status: Optional[dict[str, Any]], _prepared: Any) -> None: + return + + +class StreamExecExecutor(WorkloadExecutor): + """Execs the workload into the sandbox pod via the Kubernetes API server + (connect_get_namespaced_pod_exec). Works from any host with kubeconfig.""" + + def __init__( + self, + core_v1_api: Any, + custom_objects_api: Any, + namespace: str, + workload_duration: float, + sandbox_group: str = 'agents.x-k8s.io', + sandbox_version: str = 'v1beta1', + sandbox_plural: str = 'sandboxes', + ) -> None: + self._core = core_v1_api + self._co = custom_objects_api + self._namespace = namespace + self._workload_duration = workload_duration + self._sandbox_group = sandbox_group + self._sandbox_version = sandbox_version + self._sandbox_plural = sandbox_plural + + def _resolve_pod_name(self, status: dict[str, Any]) -> Optional[str]: + """Ready claim -> sandbox pod name. status.sandbox.name is the Sandbox CR + name; the actual pod is that name UNLESS a warm-pool pod was adopted, in + which case the pod name is in the Sandbox's agents.x-k8s.io/pod-name + annotation (per the controller's resolvePodName).""" + sandbox = status.get('sandbox') or {} + sandbox_name = sandbox.get('name') + if not sandbox_name: + return None + sb = self._co.get_namespaced_custom_object( + self._sandbox_group, + self._sandbox_version, + self._namespace, + self._sandbox_plural, + sandbox_name, + ) + annotations = (sb.get('metadata') or {}).get('annotations') or {} + return annotations.get('agents.x-k8s.io/pod-name') or sandbox_name + + def prepare(self, claim_name: str, status: dict[str, Any]) -> Optional[str]: + """Resolves the pod name while the Sandbox CR is guaranteed to exist.""" + return self._resolve_pod_name(status) + + def execute(self, claim_name: str, _status: Optional[dict[str, Any]], prepared: Optional[str]) -> None: + from kubernetes.stream import stream # pylint: disable=import-error,no-name-in-module + + pod_name = prepared + if not pod_name: + raise RuntimeError(f'no sandbox pod bound for claim {claim_name}') + ws = stream( + self._core.connect_get_namespaced_pod_exec, + pod_name, + self._namespace, + command=['python3', '-c', _busy_loop_code(self._workload_duration)], + stderr=True, + stdin=False, + stdout=True, + tty=False, + _request_timeout=self._workload_duration + 60, + _preload_content=False, + ) + try: + ws.run_forever(timeout=self._workload_duration + 60) + finally: + ws.close() + + +class ClaimDriver: + """Creates, watches, and deletes SandboxClaim custom resources.""" + + def __init__( + self, + namespace: str, + template_name: str, + warmpool_name: Optional[str] = None, + group: str = CLAIM_API_GROUP, + version: str = CLAIM_API_VERSION, + plural: str = CLAIM_PLURAL, + claim_ttl_seconds: Optional[int] = None, + max_concurrent: int = 64, + load_config: bool = True, + ) -> None: + if load_config: + _load_kube_config() + pool_size = max_concurrent + 50 + self._api = _make_custom_objects_api(pool_size) + self._delete_api = _make_custom_objects_api(pool_size) + self._namespace = namespace + self._template_name = template_name + self._warmpool_name = warmpool_name + self._group = group + self._version = version + self._plural = plural + self._claim_ttl_seconds = claim_ttl_seconds or 0 + + def _body(self, name: str) -> dict[str, Any]: + lifecycle: dict[str, Any] = {'shutdownPolicy': 'Delete'} + if self._claim_ttl_seconds: + lifecycle['ttlSecondsAfterFinished'] = self._claim_ttl_seconds + # Post-#899 SandboxClaim API: a claim references a SandboxWarmPool + # directly via warmPoolRef; the controller resolves the template through + # the pool's sandboxTemplateRef. The old sandboxTemplateRef/warmpool + # fields were removed upstream. + spec = { + 'warmPoolRef': {'name': self._warmpool_name}, + 'lifecycle': lifecycle, + } + return { + 'apiVersion': f'{self._group}/{self._version}', + 'kind': CLAIM_KIND, + 'metadata': {'name': name}, + 'spec': spec, + } + + def create(self, name: str, sleeper: Callable[[float], None] = time.sleep) -> None: + """Creates a SandboxClaim, retrying on 429 up to _CREATE_MAX_RETRIES.""" + from kubernetes.client.rest import ApiException # pylint: disable=import-error,no-name-in-module + + attempt = 0 + while True: + try: + self._api.create_namespaced_custom_object( + self._group, + self._version, + self._namespace, + self._plural, + body=self._body(name), + ) + return + except ApiException as exc: + if exc.status == 429 and attempt < _CREATE_MAX_RETRIES: + attempt += 1 + retry_after = _CREATE_RETRY_AFTER_DEFAULT + try: + if exc.headers and exc.headers.get('Retry-After'): + retry_after = float(exc.headers['Retry-After']) + except (TypeError, ValueError): + pass + retry_after = min(retry_after, _CREATE_RETRY_AFTER_CAP) + logging.warning( + 'Claim %s create got 429; retry %d/%d after %.1fs', + name, + attempt, + _CREATE_MAX_RETRIES, + retry_after, + ) + sleeper(retry_after) + else: + raise + + def is_ready(self, status: dict[str, Any]) -> bool: + for condition in status.get('conditions', []): + if condition.get('type') == 'Ready' and condition.get('status') == 'True': + return True + return False + + def served_warm(self, status: dict[str, Any]) -> Optional[bool]: + """Warm-served iff the bound sandbox name carries the warm pool prefix. + + Cold-provisioned sandboxes are named after the claim; warm-served ones + are named after the warm pool. Returns None if no sandbox is bound yet. + """ + sandbox = status.get('sandbox') or {} + name = sandbox.get('name') + if not name: + return None + return bool(self._warmpool_name) and name.startswith(self._warmpool_name) + + def stream_claim_events( + self, + stop_event: threading.Event, + timeout_seconds: int = 60, + ) -> Iterator[dict[str, Any]]: + """Generates {'name': str, 'status': dict} dicts from a Watch stream. + + Re-establishes the watch if it times out, until stop_event is set. + Resumes from the last seen resourceVersion so no events are missed across + reconnects (BOOKMARK events advance the cursor without being yielded). + Isolated as a method so tests can substitute a fake generator. + + Args: + stop_event: threading.Event; when set the generator stops after the + current watch iteration completes. + timeout_seconds: Per-watch-call timeout forwarded to the Watch stream. + + Yields: + {'name': str, 'status': dict} for every SandboxClaim event. + """ + from kubernetes import watch as kwatch # pylint: disable=import-error,no-name-in-module + from kubernetes.client.rest import ApiException # pylint: disable=import-error,no-name-in-module + + resource_version = None + while not stop_event.is_set(): + w = kwatch.Watch() + try: + stream_kwargs = dict( + timeout_seconds=timeout_seconds, + allow_watch_bookmarks=True, + ) + if resource_version is not None: + stream_kwargs['resource_version'] = resource_version + for _raw_event in w.stream( + self._api.list_namespaced_custom_object, + self._group, + self._version, + self._namespace, + self._plural, + **stream_kwargs, + ): + event: dict[str, Any] = cast(dict[str, Any], _raw_event) + if stop_event.is_set(): + w.stop() + return + obj = event['object'] + rv = (obj.get('metadata') or {}).get('resourceVersion') + if rv: + resource_version = rv + if event.get('type') == 'BOOKMARK': + continue + if event.get('type') == 'ERROR': + code = ( + obj.get('code') + if isinstance(obj, dict) + else getattr(obj, 'code', None) + ) + logging.warning('Watch stream ERROR event (code=%s): %s', code, obj) + resource_version = None + w.stop() + break + name = obj['metadata']['name'] + status = obj.get('status', {}) or {} + yield {'name': name, 'status': status} + except ApiException as exc: + if stop_event.is_set(): + return + if exc.status == 410: + logging.warning( + 'Watch resourceVersion too old (410 Gone); re-listing: %s', exc + ) + resource_version = None + else: + logging.warning('Watch stream API error (will reconnect): %s', exc) + time.sleep(1) + except Exception as exc: # noqa: BLE001 watch reconnect on any error + if stop_event.is_set(): + return + logging.warning('Watch stream error (will reconnect): %s', exc) + time.sleep(1) + + def delete(self, name: str) -> None: + self._delete_api.delete_namespaced_custom_object( + self._group, self._version, self._namespace, self._plural, name + ) + + @property + def custom_objects_api(self): + return self._api + + +class LoadGenerator: + """Submits claims at a target QPS; records readiness via a Watch stream. + + Architecture: + - A background thread consumes stream_claim_events() and records ready_at + timestamps in shared maps (no per-claim API polling). + - A bounded ThreadPoolExecutor submits creates only (one API call per task, + no blocking wait inside the task). + - After all creates are submitted, the run() method drains locally by + polling the in-memory maps until all claims are accounted for or the + ready_timeout elapses. + - Cleanup deletes all created claims best-effort after the run. + """ + + def __init__( + self, + driver: ClaimDriver, + ready_timeout: float, + max_concurrent: int, + workload_executor: Optional[WorkloadExecutor] = None, + workload_duration: float = 0, + ) -> None: + self._driver = driver + self._ready_timeout = ready_timeout + self._max_concurrent = max_concurrent + self._workload_duration = workload_duration + self._executor = workload_executor or NoopExecutor() + self._lock = threading.Lock() + self._in_flight = 0 + self._peak = 0 + self._clock = time.monotonic + self._hold_pool = None + + @property + def peak_concurrency(self): + return self._peak + + def _watch_consumer( + self, + stop_event, + ready_at_map, + warm_map, + created_set, + released, + exec_started_map, + exec_completed_map, + released_at_map, + ): + """Background thread: consume watch events and record readiness.""" + for event in self._driver.stream_claim_events(stop_event): + name = event['name'] + status = event['status'] + if not self._driver.is_ready(status): + continue + with self._lock: + if name not in created_set: + continue + if name in ready_at_map: + continue # already recorded + ready_at_map[name] = self._clock() + warm_map[name] = self._driver.served_warm(status) + try: + prepared = self._executor.prepare(name, status) + except Exception as exc: # noqa: BLE001 + logging.warning( + 'Failed to prepare workload for claim %s: %s', name, exc + ) + prepared = None + hold_pool = self._hold_pool + if hold_pool is not None: + hold_pool.submit( + self._hold_task, + name, + prepared, + exec_started_map, + exec_completed_map, + released_at_map, + released, + ) + + def _hold_task( + self, + name, + prepared, + exec_started_map, + exec_completed_map, + released_at_map, + released, + ): + """Worker thread: run workload then delete the claim.""" + exec_started = self._clock() + try: + self._executor.execute(name, None, prepared) + except Exception as exc: # noqa: BLE001 workload failure must not abort the run + logging.warning('Workload exec failed for claim %s: %s', name, exc) + exec_started = None # leave exec timings unset so a bad exec does not skew exec_duration + finally: + exec_completed = self._clock() if exec_started is not None else None + try: + self._driver.delete(name) + except Exception as exc: # noqa: BLE001 best-effort release + logging.warning('Failed to release claim %s: %s', name, exc) + with self._lock: + exec_started_map[name] = exec_started + exec_completed_map[name] = exec_completed + released_at_map[name] = self._clock() + released.add(name) + self._in_flight -= 1 + + def run( + self, + shape: RunShape, + clock: Callable[[], float] = time.monotonic, + sleeper: Callable[[float], None] = time.sleep, + ) -> list[ClaimRecord]: + """Run the load shape, returning a list of ClaimRecord in submission order. + + Args: + shape: RunShape describing qps, duration, and total claims. + clock: Callable returning a float timestamp (injectable for tests). + sleeper: Callable(seconds) for pacing sleeps (injectable for tests). + + Returns: + List of ClaimRecord, one per submitted claim index, in order. + """ + self._clock = clock + + requested_at_map = {} + ready_at_map = {} + warm_map = {} + errors_map = {} + created_set = set() + released = set() + exec_started_map = {} + exec_completed_map = {} + released_at_map = {} + submitted_names = [] + + stop_event = threading.Event() + + # Initialize hold_pool before starting watch_thread so that a Ready event + # from a pre-existing claim cannot race against self._hold_pool being None. + hold_pool = futures.ThreadPoolExecutor( + max_workers=self._max_concurrent, thread_name_prefix='claim-hold' + ) + self._hold_pool = hold_pool + + watch_thread = threading.Thread( + target=self._watch_consumer, + args=( + stop_event, + ready_at_map, + warm_map, + created_set, + released, + exec_started_map, + exec_completed_map, + released_at_map, + ), + daemon=True, + name='claim-watch-consumer', + ) + watch_thread.start() + + def _create_task(name): + # Record requested_at BEFORE create() so the watch consumer always finds + # the entry even if a Ready event arrives before create() returns. + with self._lock: + requested_at_map[name] = clock() + try: + self._driver.create(name) + with self._lock: + created_set.add(name) + self._in_flight += 1 + self._peak = max(self._peak, self._in_flight) + except Exception as exc: # noqa: BLE001 record and continue + logging.warning( + 'Claim %s create failed: %s: %s', name, type(exc).__name__, exc + ) + with self._lock: + errors_map[name] = type(exc).__name__ + + start = clock() + last_create_time = start + create_futs = [] + + try: + with futures.ThreadPoolExecutor(max_workers=self._max_concurrent) as pool: + for idx in range(shape.total): + name = f'claim-{idx}' + submitted_names.append(name) + deadline = start + (idx / shape.qps) + now = clock() + if now < deadline: + sleeper(deadline - now) + last_create_time = clock() + create_futs.append(pool.submit(_create_task, name)) + # Wait for all create tasks to complete before starting drain. + for fut in create_futs: + fut.result() + + # Drain: wait until every created claim is accounted for, or until the + # deadline elapses since the last create. "Accounted" means the claim + # appears in released_at_map (i.e. _hold_task completed and recorded + # released_at) or in errors_map (create or ready failed). + drain_window = self._ready_timeout + self._workload_duration + drain_deadline = last_create_time + drain_window + while True: + now = clock() + with self._lock: + accounted = len(released_at_map) + sum( + 1 for n in created_set if n in errors_map + ) + outstanding = len(created_set) - accounted + if outstanding <= 0: + break + if now >= drain_deadline: + # Mark remaining created-but-never-Ready claims as Timeout. + # Claims that already reached Ready (name in ready_at_map) must + # never be Timeout'd: their startup_time is already captured and + # their hold will finish in the finally block below. + with self._lock: + for name in list(created_set): + if ( + name not in released + and name not in errors_map + and name not in ready_at_map + ): + errors_map[name] = 'Timeout' + break + sleeper(0.1) + + finally: + # Always stop the watch thread and delete any created-but-not-released + # claims. Runs on normal completion AND on KeyboardInterrupt/exception + # so Ctrl-C or crashes do not leak SandboxClaims (and their pods). + stop_event.set() + # Join the watch thread first so any in-progress _watch_consumer + # iteration finishes submitting hold tasks before we shut down the pool. + watch_thread.join(timeout=2.0) + if watch_thread.is_alive(): + logging.warning( + 'Watch consumer thread did not exit within 2s; ' + 'proceeding with hold pool shutdown.' + ) + # Drain all in-flight hold tasks before nulling the pool reference so + # that a still-running watch consumer can still submit tasks. + hold_pool.shutdown(wait=True) + self._hold_pool = None + + with self._lock: + stragglers = [n for n in created_set if n not in released] + for name in stragglers: + try: + self._driver.delete(name) + except Exception as exc: # noqa: BLE001 best-effort cleanup + logging.warning('Failed to delete claim %s: %s', name, exc) + + # Build ordered result list. + records = [] + for name in submitted_names: + released_at = released_at_map.get(name) + error = errors_map.get(name) + # A claim that completed its hold (released_at set) is a success. + # If a stale 'Timeout' error was recorded before the hold finished, + # drop it so released_at is authoritative. Other error types (e.g. + # create failures, exec errors) are kept regardless. + if released_at is not None and error == 'Timeout': + error = None + rec = ClaimRecord( + name=name, + requested_at=requested_at_map.get(name, 0.0), + ready_at=ready_at_map.get(name), + error=error, + warm_served=warm_map.get(name), + exec_started_at=exec_started_map.get(name), + exec_completed_at=exec_completed_map.get(name), + released_at=released_at, + ) + records.append(rec) + return records + + +def main(argv: Optional[list[str]] = None) -> None: + """Entry point when running as an in-pod script. + + Parses CLI arguments, runs the load generator, and writes one JSON line per + ClaimRecord to --output followed by a summary line with peak_concurrency. + """ + parser = argparse.ArgumentParser( + description='Agent sandbox in-pod load runner.') + parser.add_argument('--namespace', required=True, + help='Kubernetes namespace.') + parser.add_argument('--template-name', required=True, + help='SandboxWarmPool / SandboxTemplate name.') + parser.add_argument('--qps', type=float, default=None, + help='Target claims per second.') + parser.add_argument('--total', type=int, default=None, + help='Total number of claims to submit.') + parser.add_argument('--duration', type=float, default=None, + help='Submission window in seconds.') + parser.add_argument('--max-concurrent', type=int, required=True, + help='Maximum in-flight claims.') + parser.add_argument('--workload-duration', type=float, default=0, + help='Seconds to hold each sandbox after Ready (0 = immediate release).') + parser.add_argument('--ready-timeout', type=float, default=180, + help='Per-claim ready timeout in seconds.') + parser.add_argument('--output', required=True, + help='Path to write JSONL results.') + args = parser.parse_args(argv) + + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s', + stream=sys.stderr, + ) + + # Load in-cluster config with fallback to kubeconfig for local runs. + from kubernetes import config as k8s_config # pylint: disable=import-error,no-name-in-module + try: + k8s_config.load_incluster_config() + except Exception: # noqa: BLE001 + k8s_config.load_kube_config() + + # Apply bearer-token fix (needed for both in-cluster and kubeconfig paths + # with kubernetes-client>=36). + from kubernetes import client as k8s_client # pylint: disable=import-error,no-name-in-module + cfg = k8s_client.Configuration.get_default_copy() + _register_bearer_token_auth(cfg) + k8s_client.Configuration.set_default(cfg) + + shape = resolve_run_shape( + qps=args.qps, + duration=args.duration, + total=args.total, + ) + + workload_duration = args.workload_duration + if workload_duration > 0: + pool_size = args.max_concurrent + 50 + executor = StreamExecExecutor( + core_v1_api=_make_core_v1_api(pool_size), + custom_objects_api=_make_custom_objects_api(pool_size), + namespace=args.namespace, + workload_duration=workload_duration, + ) + else: + executor = None + + driver = ClaimDriver( + namespace=args.namespace, + template_name=args.template_name, + warmpool_name=args.template_name, + max_concurrent=args.max_concurrent, + load_config=False, + ) + + generator = LoadGenerator( + driver=driver, + ready_timeout=args.ready_timeout, + max_concurrent=args.max_concurrent, + workload_executor=executor, + workload_duration=workload_duration, + ) + + records = generator.run(shape) + peak_concurrency = generator.peak_concurrency + + with open(args.output, 'w') as f: + for rec in records: + f.write(json.dumps(dataclasses.asdict(rec)) + '\n') + f.write(json.dumps({'summary': {'peak_concurrency': peak_concurrency}}) + '\n') + + +if __name__ == '__main__': + main() diff --git a/perfkitbenchmarker/linux_benchmarks/agent_sandbox_metrics.py b/perfkitbenchmarker/linux_benchmarks/agent_sandbox_metrics.py new file mode 100644 index 0000000000..2e7577b855 --- /dev/null +++ b/perfkitbenchmarker/linux_benchmarks/agent_sandbox_metrics.py @@ -0,0 +1,160 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Aggregates load-generator records into PerfKitBenchmarker samples.""" + +import collections +import math +from typing import Any + +from perfkitbenchmarker import sample + +_PERCENTILES = (50, 90, 95, 99) + + + +def percentile(values: list[float], pct: float) -> float: + """Linear-interpolated percentile. Returns 0.0 for an empty input.""" + if not values: + return 0.0 + ordered = sorted(values) + rank = (len(ordered) - 1) * (pct / 100.0) + low = math.floor(rank) + high = math.ceil(rank) + if low == high: + return float(ordered[int(rank)]) + return float(ordered[low] + (ordered[high] - ordered[low]) * (rank - low)) + + +def build_samples( + records: list[Any], + peak_concurrency: int, + metadata: dict[str, Any], +) -> list[sample.Sample]: + """Builds the sample list for one benchmark Run. + + Args: + records: List of ClaimRecord, one per submitted claim. + peak_concurrency: Maximum in-flight claim count observed. + metadata: Dict attached to every emitted sample. + + Returns: + A list of sample.Sample. + """ + successes = [r for r in records if r.error is None and r.ready_at is not None] + startup_times = [r.startup_time_s for r in successes] + errors = collections.Counter(r.error for r in records if r.error is not None) + + samples = [] + for pct in _PERCENTILES: + samples.append( + sample.Sample( + f'startup_time_p{pct}', + percentile(startup_times, pct), + 'seconds', + metadata, + ) + ) + samples.append( + sample.Sample( + 'startup_time_max', + max(startup_times) if startup_times else 0.0, + 'seconds', + metadata, + ) + ) + + exec_durations = sorted( + r.exec_duration_s + for r in successes + if getattr(r, 'exec_duration_s', None) is not None + ) + if exec_durations: + for pct in _PERCENTILES: + samples.append( + sample.Sample( + f'exec_duration_s_p{pct}', + percentile(exec_durations, pct), + 'seconds', + metadata, + ) + ) + samples.append( + sample.Sample( + 'exec_duration_s_max', exec_durations[-1], 'seconds', metadata + ) + ) + + lifecycles = sorted( + r.total_lifecycle_s + for r in successes + if getattr(r, 'total_lifecycle_s', None) is not None + ) + if lifecycles: + for pct in _PERCENTILES: + samples.append( + sample.Sample( + f'total_lifecycle_s_p{pct}', + percentile(lifecycles, pct), + 'seconds', + metadata, + ) + ) + samples.append( + sample.Sample( + 'total_lifecycle_s_max', lifecycles[-1], 'seconds', metadata + ) + ) + + request_times = [r.requested_at for r in records] + ready_times = [r.ready_at for r in successes] + # Submit QPS over the request window; 0.0 fallback when all requests + # share a timestamp. + submit_span = (max(request_times) - min(request_times)) if records else 0.0 + submit_qps = (len(records) / submit_span) if submit_span > 0 else 0.0 + # Completion QPS over the full experiment wall clock: first request to + # last ready. + completion_span = ( + (max(ready_times) - min(request_times)) if ready_times else 0.0 + ) + completion_qps = ( + (len(successes) / completion_span) if completion_span > 0 else 0.0 + ) + + warm = [r for r in successes if r.warm_served] + warm_fraction = (len(warm) / len(successes)) if successes else 0.0 + + samples.append(sample.Sample('submit_qps', submit_qps, 'count/sec', metadata)) + samples.append( + sample.Sample('completion_qps', completion_qps, 'count/sec', metadata) + ) + samples.append( + sample.Sample('peak_concurrency', peak_concurrency, 'count', metadata) + ) + samples.append( + sample.Sample('warm_served_fraction', warm_fraction, 'fraction', metadata) + ) + samples.append( + sample.Sample('success_count', len(successes), 'count', metadata) + ) + samples.append( + sample.Sample('error_count', sum(errors.values()), 'count', metadata) + ) + for error_type, count in errors.items(): + error_metadata = dict(metadata, error_type=error_type) + samples.append( + sample.Sample( + f'error_count_{error_type}', count, 'count', error_metadata + ) + ) + return samples diff --git a/perfkitbenchmarker/resources/agent_sandbox.py b/perfkitbenchmarker/resources/agent_sandbox.py new file mode 100644 index 0000000000..a30b80912d --- /dev/null +++ b/perfkitbenchmarker/resources/agent_sandbox.py @@ -0,0 +1,54 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Base class for an agent sandbox resource installed on a Kubernetes cluster. + +An agent sandbox is a stack (CRDs, controller, runtime class, warm pools) +installed onto a Kubernetes cluster. Concrete subclasses provide the install +logic, for example the open-source kubernetes-sigs/agent-sandbox stack or a +cloud-managed offering. The resource is modeled on the kubernetes inference +server pattern and is attached to a cluster as cluster.agent_sandbox. +""" + +from __future__ import annotations + +from typing import Optional + +from perfkitbenchmarker import errors +from perfkitbenchmarker import resource as pkb_resource + + +class BaseAgentSandbox(pkb_resource.BaseResource): + """Base class for an agent sandbox resource.""" + + RESOURCE_TYPE = 'BaseAgentSandbox' + REQUIRED_ATTRS = ['SANDBOX_TYPE'] + + def __init__(self, spec, cluster): + super().__init__() + self.spec = spec + self.cluster = cluster + if self.cluster is None: + raise errors.Resource.CreationError( + 'A kubernetes cluster is required for an agent sandbox resource.' + ) + + +def GetAgentSandbox(spec, cluster) -> Optional[BaseAgentSandbox]: + """Returns an agent sandbox resource for the given spec, or None.""" + if not spec: + return None + agent_sandbox_class: type[BaseAgentSandbox] = pkb_resource.GetResourceClass( + BaseAgentSandbox, SANDBOX_TYPE=spec.type + ) + return agent_sandbox_class(spec, cluster) diff --git a/perfkitbenchmarker/resources/agent_sandbox_spec.py b/perfkitbenchmarker/resources/agent_sandbox_spec.py new file mode 100644 index 0000000000..113d0941f7 --- /dev/null +++ b/perfkitbenchmarker/resources/agent_sandbox_spec.py @@ -0,0 +1,75 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Spec for an agent sandbox resource.""" + +from perfkitbenchmarker import errors +from perfkitbenchmarker.configs import option_decoders +from perfkitbenchmarker.configs import spec + +DEFAULT_SANDBOX_TYPE = 'Kubernetes' + + +class BaseAgentSandboxConfigSpec(spec.BaseSpec): + """Spec for agent sandbox configuration. + + Attributes: + type: Type of the agent sandbox (for example Kubernetes or a managed offering). + """ + + SPEC_TYPE = 'BaseAgentSandboxConfigSpec' + SPEC_ATTRS = ['SANDBOX_TYPE'] + + def __init__(self, component_full_name, flag_values=None, **kwargs): + self.type: str + super().__init__(component_full_name, flag_values=flag_values, **kwargs) + + @classmethod + def _GetOptionDecoderConstructions(cls): + """Gets decoder classes and constructor args for each configurable option.""" + result = super()._GetOptionDecoderConstructions() + result.update({ + 'type': ( + option_decoders.StringDecoder, + {'default': DEFAULT_SANDBOX_TYPE, 'none_ok': False}, + ), + }) + return result + + +class AgentSandboxConfigDecoder(option_decoders.TypeVerifier): + """Decodes an agent sandbox configuration block.""" + + def __init__(self, **kwargs): + super().__init__((dict,), **kwargs) + + def Decode(self, value, component_full_name, flag_values): + super().Decode(value, component_full_name, flag_values) + sandbox_type = value['type'] if 'type' in value else DEFAULT_SANDBOX_TYPE + config_spec_class = GetAgentSandboxConfigSpecClass(sandbox_type) + if config_spec_class is None: + raise errors.Config.UnrecognizedOption( + 'Unrecognized agent sandbox type: {}.'.format(sandbox_type) + ) + return config_spec_class( + self._GetOptionFullName(component_full_name), + flag_values=flag_values, + **value + ) + + +def GetAgentSandboxConfigSpecClass(sandbox_type): + """Gets the AgentSandboxConfigSpec class for the given type.""" + return spec.GetSpecClass( + BaseAgentSandboxConfigSpec, SANDBOX_TYPE=sandbox_type + ) diff --git a/perfkitbenchmarker/resources/kubernetes/k8s_agent_sandbox.py b/perfkitbenchmarker/resources/kubernetes/k8s_agent_sandbox.py new file mode 100644 index 0000000000..551fc87d02 --- /dev/null +++ b/perfkitbenchmarker/resources/kubernetes/k8s_agent_sandbox.py @@ -0,0 +1,518 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Kubernetes implementation of an agent sandbox. + +Installs the open-source kubernetes-sigs/agent-sandbox stack (CRDs, RBAC, +controller, gVisor runtime class, SandboxTemplate, and SandboxWarmPool). +""" + +import os +import tempfile + +import yaml +from absl import flags +from absl import logging + +from perfkitbenchmarker import data +from perfkitbenchmarker import vm_util +from perfkitbenchmarker.resources import agent_sandbox +from perfkitbenchmarker.resources import agent_sandbox_spec +from perfkitbenchmarker.resources.kubernetes import k8s_agent_sandbox_spec +from perfkitbenchmarker.resources.container_service import kubectl +from perfkitbenchmarker.resources.container_service import kubernetes_commands + +FLAGS = flags.FLAGS + + +# Mapping from tuning dict keys to the controller CLI flag strings. +_TUNING_ARG_MAP = ( + ('claim_workers', '--sandbox-claim-concurrent-workers={}'), + ('sandbox_workers', '--sandbox-concurrent-workers={}'), + ('warmpool_workers', '--sandbox-warm-pool-concurrent-workers={}'), + ('warmpool_max_batch_size', '--sandbox-warm-pool-max-batch-size={}'), + ('kube_api_burst', '--kube-api-burst={}'), + ('kube_api_qps', '--kube-api-qps={}'), +) + +_DEFAULT_CPU_REQUEST = '500m' +_DEFAULT_CPU_LIMIT = '2' +_DEFAULT_MEMORY_REQUEST = '256Mi' +_DEFAULT_MEMORY_LIMIT = '1Gi' + +_GVISOR_DAEMONSET = 'agent_sandbox/gvisor-installer/daemonset.yaml' +_GVISOR_RUNTIMECLASS = 'agent_sandbox/gvisor-installer/runtimeclass.yaml' +_GVISOR_INSTALLER_SCRIPT = 'agent_sandbox/gvisor-installer/install.sh' +_GVISOR_CONFIGMAP_NAME = 'gvisor-installer-script' +_TEMPLATE_MANIFEST = 'agent_sandbox/sandbox-template.yaml.j2' +_WARMPOOL_MANIFEST = 'agent_sandbox/sandbox-warmpool.yaml.j2' + +_RELEASE_BASE = ( + 'https://raw.githubusercontent.com/kubernetes-sigs/agent-sandbox' +) + +_CRD_FILES = ( + 'crds/agents.x-k8s.io_sandboxes.yaml', + 'crds/extensions.agents.x-k8s.io_sandboxclaims.yaml', + 'crds/extensions.agents.x-k8s.io_sandboxtemplates.yaml', + 'crds/extensions.agents.x-k8s.io_sandboxwarmpools.yaml', +) + +_RBAC_FILES = ( + 'rbac.generated.yaml', + 'extensions-rbac.generated.yaml', + 'extensions.yaml', +) + +_CORE_FILE = 'controller.yaml' +_CONTROLLER_FILE = 'extensions.controller.yaml' + +SANDBOX_NAME = 'agent-sandbox' + +# Config key of the nodepool the sandbox workload runs on (matches the +# `nodepools:` block in linux_benchmarks/agent_sandbox_benchmark.py). +_SANDBOX_NODEPOOL = 'sandbox' + +# Taint fencing the sandbox nodepool. PKB does not yet apply nodepool taints to +# nodes (that wiring lands in PR #6741), so keep the canonical taint string here +# and derive the pod toleration from it. Keep this in lockstep with the +# `node_taints` entry in agent_sandbox_benchmark.py BENCHMARK_CONFIG. +# TODO(#6741): read this from cluster.nodepools[_SANDBOX_NODEPOOL].node_taints +# once NodepoolConfig carries node_taints, and delete this constant. +_SANDBOX_TAINT = 'sandbox.gke.io/runtime=runsc:NoSchedule' + + +def _crd_name(filename): + """Derives a CRD resource name from its manifest filename. + + Example: 'crds/agents.x-k8s.io_sandboxes.yaml' -> 'sandboxes.agents.x-k8s.io'. + """ + base = filename.split('/')[-1].removesuffix('.yaml') + group, plural = base.rsplit('_', 1) + return f'{plural}.{group}' + + +def _url(ref, filename): + return f'{_RELEASE_BASE}/{ref}/k8s/{filename}' + + +def _apply_url(url): + """Applies a manifest from a URL. Isolated for test mocking.""" + kubectl.RunKubectlCommand(['apply', '-f', url]) + + +def _wait_warmpool_ready(warmpool_name, replicas, timeout=600): + """Polls the SandboxWarmPool until readyReplicas matches the target.""" + kubernetes_commands.WaitForResource( + f'sandboxwarmpool/{warmpool_name}', + f'jsonpath={{.status.readyReplicas}}={replicas}', + condition_type='', + timeout=timeout, + ) + + +def _taint_to_toleration(taint): + """Converts a 'key=value:Effect' or 'key:Effect' taint to a toleration dict. + + Parsing mirrors the planned EKS taint parser (PR #6744) but emits a + Kubernetes toleration so the node taint and the pod toleration stay symmetric. + """ + spec, sep_effect, effect = taint.rpartition(':') + if not sep_effect: + raise ValueError( + f'Malformed taint, expected key[=value]:Effect: {taint!r}' + ) + key, sep_val, value = spec.partition('=') + toleration = {'key': key} + if sep_val: + toleration['operator'] = 'Equal' + toleration['value'] = value + else: + toleration['operator'] = 'Exists' + toleration['effect'] = effect + return toleration + + +def _sandbox_scheduling(nodepool_name): + """Returns (node_selector, tolerations) targeting the sandbox nodepool. + + The selector rendezvous is the pkb_nodepool label PKB stamps on every node + pool (GKE _AddNodeParamsToCmd, EKS _RenderNodeGroupJson), so there is no + bespoke label to keep in sync. The toleration is derived from _SANDBOX_TAINT. + """ + node_selector = {'pkb_nodepool': nodepool_name} + tolerations = [_taint_to_toleration(_SANDBOX_TAINT)] + return node_selector, tolerations + + +def _render_gvisor_daemonset(node_selector, tolerations): + """Loads the installer DaemonSet and injects the sandbox scheduling target.""" + with open(data.ResourcePath(_GVISOR_DAEMONSET)) as manifest_file: + manifest = yaml.safe_load(manifest_file) + pod_spec = manifest['spec']['template']['spec'] + pod_spec['nodeSelector'] = node_selector + pod_spec['tolerations'] = tolerations + return yaml.dump(manifest, default_flow_style=False) + + +def _render_template_manifest(template_spec, node_selector, tolerations): + """Renders the SandboxTemplate and injects the sandbox scheduling target. + + nodeSelector/tolerations are set in Python (not the .j2) so the scheduling + target has a single source of truth. runtimeClassName stays in the template: + it is runtime identity, not scheduling. + """ + labels = template_spec.labels or {'sandbox': 'python-sandbox-bench'} + rendered = vm_util.ReadAndRenderJinja2Template( + _TEMPLATE_MANIFEST, + trim_spaces=False, + name=SANDBOX_NAME, + runtime_class=template_spec.runtime_class, + image=template_spec.image, + cpu_request=template_spec.cpu_request, + cpu_limit=template_spec.cpu_limit, + memory_request=template_spec.memory_request, + memory_limit=template_spec.memory_limit, + labels=labels, + ) + manifest = yaml.safe_load(rendered) + pod_spec = manifest['spec']['podTemplate']['spec'] + pod_spec['nodeSelector'] = node_selector + pod_spec['tolerations'] = tolerations + return yaml.dump(manifest, default_flow_style=False) + + +def install_gvisor(node_selector, tolerations): + """Installs gVisor onto cluster nodes via the installer DaemonSet. + + Creates the gvisor-installer-script ConfigMap (from install.sh) in + kube-system before applying the DaemonSet. The DaemonSet mounts that + ConfigMap at /scripts; the init container runs /scripts/install.sh. + The ConfigMap must exist before the DaemonSet pods schedule. The DaemonSet's + nodeSelector/tolerations are injected so it lands on the sandbox nodepool. + """ + _create_installer_configmap() + _apply_yaml(_render_gvisor_daemonset(node_selector, tolerations)) + kubernetes_commands.ApplyManifest(_GVISOR_RUNTIMECLASS) + kubernetes_commands.WaitForRollout( + 'daemonset/gvisor-installer', namespace='kube-system' + ) + + +def _create_installer_configmap(): + """Creates or updates the gvisor-installer-script ConfigMap in kube-system. + + Uses --dry-run=client -o yaml to render the ConfigMap manifest (idempotent + on re-runs), writes it to a temp file, then applies it. A plain + 'kubectl create configmap' would fail if the resource already exists. + """ + script_path = data.ResourcePath(_GVISOR_INSTALLER_SCRIPT) + # Render the ConfigMap manifest without hitting the cluster. + # Use --from-file=install.sh= so the ConfigMap has exactly one key + # named install.sh, instead of pulling the whole directory (which would + # also include daemonset.yaml and runtimeclass.yaml as keys). + yaml_out, _, _ = kubectl.RunKubectlCommand([ + 'create', + 'configmap', + _GVISOR_CONFIGMAP_NAME, + f'--from-file=install.sh={script_path}', + '--namespace', + 'kube-system', + '--dry-run=client', + '-o', + 'yaml', + ]) + # Write the rendered manifest to a temp file and apply it. + tmpfile = tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) + with tmpfile as tmp: + tmp.write(yaml_out) + tmp_path = tmp.name + try: + kubectl.RunKubectlCommand(['apply', '-f', tmp_path]) + finally: + os.unlink(tmp_path) + + +def _apply_yaml(yaml_str): + """Writes yaml_str to a temp file and applies it with kubectl apply.""" + tmpfile = tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) + with tmpfile as tmp: + tmp.write(yaml_str) + tmp_path = tmp.name + try: + kubectl.RunKubectlCommand(['apply', '-f', tmp_path]) + finally: + os.unlink(tmp_path) + + +def _configure_controller_manifest(manifest_yaml, controller_image, tuning): + """Injects image and all tuning into a controller Deployment manifest dict. + + Returns the modified manifest as a YAML string ready to pipe to kubectl apply. + """ + manifest = yaml.safe_load(manifest_yaml) + container = manifest['spec']['template']['spec']['containers'][0] + + # Image. + if controller_image: + container['image'] = controller_image + + # Resources (Burstable QoS; base manifest ships with nothing -> BestEffort). + container['resources'] = { + 'requests': { + 'cpu': tuning.get('cpu_request', _DEFAULT_CPU_REQUEST), + 'memory': tuning.get('memory_request', _DEFAULT_MEMORY_REQUEST), + }, + 'limits': { + 'cpu': tuning.get('cpu_limit', _DEFAULT_CPU_LIMIT), + 'memory': tuning.get('memory_limit', _DEFAULT_MEMORY_LIMIT), + }, + } + + # Leader election: base manifest sets --leader-elect=true at args[0]; replace + # it so exactly one effective flag exists. + args = container.setdefault('args', []) + le_flag = ( + '--leader-elect=true' + if tuning.get('leader_elect') + else '--leader-elect=false' + ) + if args and args[0].startswith('--leader-elect'): + args[0] = le_flag + else: + args.insert(0, le_flag) + + # Controller tuning args (appended; order doesn't matter). + for key, arg_fmt in _TUNING_ARG_MAP: + value = tuning.get(key) + if value is not None: + args.append(arg_fmt.format(value)) + if tuning.get('enable_tracing'): + args.append('--enable-tracing=true') + + # OTEL env vars (kubectl set env handles absent env array; we do it inline). + if tuning.get('enable_tracing') and tuning.get('otel_endpoint'): + env = container.setdefault('env', []) + env.append({ + 'name': 'OTEL_EXPORTER_OTLP_ENDPOINT', + 'value': tuning['otel_endpoint'], + }) + env.append({'name': 'OTEL_EXPORTER_OTLP_INSECURE', 'value': 'true'}) + + return yaml.dump(manifest, default_flow_style=False) + + +def install_crds_and_rbac(controller_ref): + """Applies the agent-sandbox CRDs and cluster-scoped RBAC. + + Run in the provision stage. CRDs and RBAC are cluster-scoped, slow to + establish, and unaffected by controller image or tuning changes, so they are + not re-applied when the controller is reinstalled in the prepare stage. + + Args: + controller_ref: Git ref (tag or SHA) for upstream raw asset URLs. + """ + for filename in _CRD_FILES: + _apply_url(_url(controller_ref, filename)) + for filename in _CRD_FILES: + kubernetes_commands.WaitForResource( + f'crd/{_crd_name(filename)}', 'established' + ) + for filename in _RBAC_FILES: + _apply_url(_url(controller_ref, filename)) + + +def install_controller( + controller_ref, controller_image, controller_tuning=None +): + """Installs the controller core resources and Deployment from upstream. + + CRDs and RBAC must already be installed (see install_crds_and_rbac). Applies + the non-Deployment resources from controller.yaml (Namespace, ServiceAccount, + ClusterRoleBinding, Service), then the image- and tuning-configured + Deployment, and waits for the rollout. Safe to re-run against an existing + cluster: kubectl apply is idempotent and a changed image or args triggers a + rolling update. + + Args: + controller_ref: Git ref (tag or SHA) for upstream raw asset URLs. + controller_image: Optional controller container image override. Ref-based + manifests ship a ko:// placeholder image that is not pullable; callers + must supply a real image when installing from a ref rather than a tagged + release (which bundles a resolved image in the manifest). + controller_tuning: Optional dict. Supported keys: + claim_workers, sandbox_workers, kube_api_burst, kube_api_qps, + enable_tracing, otel_endpoint (applied after manifests and image patch); + leader_elect (bool, default False), cpu_request, cpu_limit, + memory_request, memory_limit (strings, default values) which control + the unconditional leader-election and resources patch. + """ + tuning = controller_tuning or {} + # Apply everything in controller.yaml except the Deployment. The base + # Deployment in that file has a placeholder image; skipping it here means + # the controller Deployment is created exactly once, fully configured, below. + raw_core, _, _ = vm_util.IssueCommand( + ['curl', '-fsSL', _url(controller_ref, _CORE_FILE)] + ) + non_deployment = [ + doc + for doc in yaml.safe_load_all(raw_core) + if doc and doc.get('kind') != 'Deployment' + ] + if non_deployment: + _apply_yaml(yaml.dump_all(non_deployment)) + # Download extensions.controller.yaml, inject all configuration in memory, + # and apply in one shot -- one rollout, correct config from the first apply. + controller_url = _url(controller_ref, _CONTROLLER_FILE) + raw_manifest, _, _ = vm_util.IssueCommand(['curl', '-fsSL', controller_url]) + configured = _configure_controller_manifest( + raw_manifest, controller_image, tuning + ) + logging.info('Applying controller deployment (image=%s)', controller_image) + _apply_yaml(configured) + kubernetes_commands.WaitForRollout( + 'deployment/agent-sandbox-controller', namespace='agent-sandbox-system' + ) + + +def apply_template(template_spec, node_selector, tolerations): + """Applies the SandboxTemplate rendered from the template spec.""" + _apply_yaml( + _render_template_manifest(template_spec, node_selector, tolerations) + ) + + +def install_warmpool(replicas): + """Applies a SandboxWarmPool and waits for it to reach the target size. + + If replicas is 0, skips both the manifest apply and the readiness wait. + Waiting for readyReplicas=0 is ill-defined (the controller may never update + status on an empty pool), and applying a zero-replica warmpool is unnecessary + for cold-start benchmarks. + """ + if replicas == 0: + logging.info('Warm pool replicas=0; skipping warm pool install.') + return + kubernetes_commands.ApplyManifest( + _WARMPOOL_MANIFEST, + name=SANDBOX_NAME, + replicas=replicas, + ) + _wait_warmpool_ready(SANDBOX_NAME, replicas) + + +class K8sAgentSandbox(agent_sandbox.BaseAgentSandbox): + """Installs the open-source kubernetes-sigs/agent-sandbox stack.""" + + SANDBOX_TYPE = agent_sandbox_spec.DEFAULT_SANDBOX_TYPE + + def _Create(self): + """Provision-stage install: gVisor, CRDs, and RBAC. + + The controller, sandbox template, and warm pool are installed in the + prepare stage (see InstallWorkload) so they can be re-applied against an + existing cluster without re-provisioning. + """ + self._InstallGvisor() + self._InstallCrdsAndRbac() + + def InstallWorkload(self): + """Prepare-stage install: controller, sandbox template, and warm pool. + + Idempotent: safe to re-run against an existing cluster to iterate on + controller settings (a changed image or tuning triggers a rolling update). + """ + self._InstallController() + self._ApplyTemplate() + self._InstallWarmpool() + + def RefreshSpecFromFlags(self): + """Rebuilds the prepare-stage sub-specs from current command-line flags. + + The benchmark spec is pickled during provision and unpickled without + re-applying flags, so on a `--run_stage=prepare` resume the controller, + template, and warm pool config would otherwise reflect the provision-time + flags. Rebuilding from the current flags lets controller settings be + iterated against an existing cluster. The rebuild starts from flag defaults, + so re-run with the full flag set used at provision plus whatever is being + changed. + """ + self.spec.controller = k8s_agent_sandbox_spec.ControllerSpec( + 'agent_sandbox.controller', flag_values=FLAGS + ) + self.spec.sandbox_template = k8s_agent_sandbox_spec.SandboxTemplateSpec( + 'agent_sandbox.sandbox_template', flag_values=FLAGS + ) + self.spec.sandbox_warmpool = k8s_agent_sandbox_spec.SandboxWarmPoolSpec( + 'agent_sandbox.sandbox_warmpool', flag_values=FLAGS + ) + if 'agent_sandbox_namespace' in FLAGS and FLAGS['agent_sandbox_namespace'].present: + self.spec.namespace = FLAGS.agent_sandbox_namespace + if FLAGS['agent_sandbox_manifest_ref'].present: + self.spec.manifest_ref = FLAGS.agent_sandbox_manifest_ref + + def _Delete(self): + """No-op: the ephemeral cluster teardown reclaims the sandbox stack.""" + pass + + def _InstallGvisor(self): + node_selector, tolerations = self._SandboxScheduling() + install_gvisor(node_selector, tolerations) + + def _InstallCrdsAndRbac(self): + install_crds_and_rbac(self.spec.manifest_ref) + + def _InstallController(self): + install_controller( + controller_ref=self.spec.manifest_ref, + controller_image=self.spec.controller.image, + controller_tuning=self._BuildTuning(), + ) + + def _ApplyTemplate(self): + node_selector, tolerations = self._SandboxScheduling() + apply_template(self.spec.sandbox_template, node_selector, tolerations) + + def _SandboxScheduling(self): + """Resolves (node_selector, tolerations) from the sandbox nodepool.""" + nodepool = self.cluster.nodepools.get(_SANDBOX_NODEPOOL) + if nodepool is None: + raise ValueError( + f'Agent sandbox requires a nodepool named {_SANDBOX_NODEPOOL!r}; ' + 'add it to the benchmark container_cluster nodepools config.' + ) + return _sandbox_scheduling(nodepool.name) + + def _InstallWarmpool(self): + install_warmpool(self.spec.sandbox_warmpool.replicas) + + def _BuildTuning(self): + """Builds the controller_tuning dict from the controller sub-spec.""" + c = self.spec.controller + tuning = { + 'enable_tracing': c.enable_tracing, + 'leader_elect': c.leader_elect, + 'cpu_request': c.cpu_request, + 'cpu_limit': c.cpu_limit, + 'memory_request': c.memory_request, + 'memory_limit': c.memory_limit, + } + for key in ( + 'claim_workers', 'sandbox_workers', 'warmpool_workers', + 'warmpool_max_batch_size', 'kube_api_burst', 'kube_api_qps', + 'otel_endpoint', + ): + value = getattr(c, key) + if value is not None: + tuning[key] = value + return tuning diff --git a/perfkitbenchmarker/resources/kubernetes/k8s_agent_sandbox_spec.py b/perfkitbenchmarker/resources/kubernetes/k8s_agent_sandbox_spec.py new file mode 100644 index 0000000000..f471c15e53 --- /dev/null +++ b/perfkitbenchmarker/resources/kubernetes/k8s_agent_sandbox_spec.py @@ -0,0 +1,224 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Spec for the Kubernetes agent sandbox.""" + +from absl import flags + +from perfkitbenchmarker.configs import option_decoders +from perfkitbenchmarker.configs import spec +from perfkitbenchmarker.resources import agent_sandbox_spec + +flags.DEFINE_string( + 'agent_sandbox_manifest_ref', None, + 'agent-sandbox release ref (tag or SHA) for CRD, RBAC, and controller ' + 'manifests.') +flags.DEFINE_string( + 'agent_sandbox_runtime_class', None, 'RuntimeClass for sandbox pods.') +flags.DEFINE_integer( + 'agent_sandbox_warmpool_replicas', None, + 'SandboxWarmPool size to provision in Prepare.') + +_DEFAULT_MANIFEST_REF = '98952fdc8b17523e1534db6626e9f9542b0792e5' +_DEFAULT_CONTROLLER_IMAGE = ( + 'us-central1-docker.pkg.dev/k8s-staging-images/agent-sandbox/' + 'agent-sandbox-controller:v20260527-v0.4.6-31-gd43447b-main') +_DEFAULT_SANDBOX_IMAGE = ( + 'registry.k8s.io/agent-sandbox/python-runtime-sandbox:v0.4.6') + + +class ControllerSpec(spec.BaseSpec): + """Config for the agent-sandbox controller deployment.""" + + def __init__(self, *args, **kwargs): + self.image: str + self.claim_workers: int | None + self.sandbox_workers: int | None + self.warmpool_workers: int | None + self.warmpool_max_batch_size: int | None + self.kube_api_burst: int | None + self.kube_api_qps: int | None + self.enable_tracing: bool + self.otel_endpoint: str | None + self.leader_elect: bool + self.cpu_request: str + self.cpu_limit: str + self.memory_request: str + self.memory_limit: str + super().__init__(*args, **kwargs) + + @classmethod + def _GetOptionDecoderConstructions(cls): + result = super()._GetOptionDecoderConstructions() + result.update({ + 'image': (option_decoders.StringDecoder, + {'default': _DEFAULT_CONTROLLER_IMAGE}), + 'claim_workers': (option_decoders.IntDecoder, + {'default': None, 'none_ok': True}), + 'sandbox_workers': (option_decoders.IntDecoder, + {'default': None, 'none_ok': True}), + 'warmpool_workers': (option_decoders.IntDecoder, + {'default': None, 'none_ok': True}), + 'warmpool_max_batch_size': (option_decoders.IntDecoder, + {'default': None, 'none_ok': True}), + 'kube_api_burst': (option_decoders.IntDecoder, + {'default': None, 'none_ok': True}), + 'kube_api_qps': (option_decoders.IntDecoder, + {'default': None, 'none_ok': True}), + 'enable_tracing': (option_decoders.BooleanDecoder, {'default': False}), + 'otel_endpoint': (option_decoders.StringDecoder, + {'default': None, 'none_ok': True}), + 'leader_elect': (option_decoders.BooleanDecoder, {'default': False}), + 'cpu_request': (option_decoders.StringDecoder, {'default': '500m'}), + 'cpu_limit': (option_decoders.StringDecoder, {'default': '2'}), + 'memory_request': (option_decoders.StringDecoder, {'default': '256Mi'}), + 'memory_limit': (option_decoders.StringDecoder, {'default': '1Gi'}), + }) + return result + + +class SandboxTemplateSpec(spec.BaseSpec): + """Config for the SandboxTemplate (models SandboxTemplateSpec). + + Fields rendered into the template: runtime_class, image, resources, labels. + """ + + def __init__(self, *args, **kwargs): + self.runtime_class: str + self.image: str + self.cpu_request: str + self.cpu_limit: str + self.memory_request: str + self.memory_limit: str + self.labels: dict | None + super().__init__(*args, **kwargs) + + @classmethod + def _GetOptionDecoderConstructions(cls): + result = super()._GetOptionDecoderConstructions() + result.update({ + 'runtime_class': (option_decoders.StringDecoder, {'default': 'runsc'}), + 'image': (option_decoders.StringDecoder, + {'default': _DEFAULT_SANDBOX_IMAGE}), + 'cpu_request': (option_decoders.StringDecoder, {'default': '100m'}), + 'cpu_limit': (option_decoders.StringDecoder, {'default': '500m'}), + 'memory_request': (option_decoders.StringDecoder, {'default': '256Mi'}), + 'memory_limit': (option_decoders.StringDecoder, {'default': '1Gi'}), + 'labels': (option_decoders.TypeVerifier, + {'default': None, 'none_ok': True}), + }) + return result + + @classmethod + def _ApplyFlags(cls, config_values, flag_values): + super()._ApplyFlags(config_values, flag_values) + if flag_values['agent_sandbox_runtime_class'].present: + config_values['runtime_class'] = flag_values.agent_sandbox_runtime_class + + +class SandboxWarmPoolSpec(spec.BaseSpec): + """Config for the SandboxWarmPool (models SandboxWarmPoolSpec).""" + + def __init__(self, *args, **kwargs): + self.replicas: int + super().__init__(*args, **kwargs) + + @classmethod + def _GetOptionDecoderConstructions(cls): + result = super()._GetOptionDecoderConstructions() + result.update({ + 'replicas': (option_decoders.IntDecoder, {'default': 0, 'min': 0}), + }) + return result + + @classmethod + def _ApplyFlags(cls, config_values, flag_values): + super()._ApplyFlags(config_values, flag_values) + if flag_values['agent_sandbox_warmpool_replicas'].present: + config_values['replicas'] = flag_values.agent_sandbox_warmpool_replicas + + +class _ControllerDecoder(option_decoders.TypeVerifier): + """Decodes the controller config block into a ControllerSpec.""" + + def Decode(self, value, component_full_name, flag_values): + super().Decode(value, component_full_name, flag_values) + return ControllerSpec( + self._GetOptionFullName(component_full_name), + flag_values=flag_values, **value) + + +class _SandboxTemplateDecoder(option_decoders.TypeVerifier): + """Decodes the sandbox_template config block into a SandboxTemplateSpec.""" + + def Decode(self, value, component_full_name, flag_values): + super().Decode(value, component_full_name, flag_values) + return SandboxTemplateSpec( + self._GetOptionFullName(component_full_name), + flag_values=flag_values, **value) + + +class _SandboxWarmPoolDecoder(option_decoders.TypeVerifier): + """Decodes the sandbox_warmpool config block into a SandboxWarmPoolSpec.""" + + def Decode(self, value, component_full_name, flag_values): + super().Decode(value, component_full_name, flag_values) + return SandboxWarmPoolSpec( + self._GetOptionFullName(component_full_name), + flag_values=flag_values, **value) + + +class K8sAgentSandboxConfigSpec(agent_sandbox_spec.BaseAgentSandboxConfigSpec): + """Config spec for the Kubernetes agent sandbox.""" + + SANDBOX_TYPE = agent_sandbox_spec.DEFAULT_SANDBOX_TYPE + + def __init__(self, component_full_name, flag_values=None, **kwargs): + self.manifest_ref: str + self.namespace: str + self.controller: ControllerSpec + self.sandbox_template: SandboxTemplateSpec + self.sandbox_warmpool: SandboxWarmPoolSpec + super().__init__(component_full_name, flag_values=flag_values, **kwargs) + if self.controller is None: + self.controller = ControllerSpec( + '{}.controller'.format(component_full_name), flag_values=flag_values) + if self.sandbox_template is None: + self.sandbox_template = SandboxTemplateSpec( + '{}.sandbox_template'.format(component_full_name), + flag_values=flag_values) + if self.sandbox_warmpool is None: + self.sandbox_warmpool = SandboxWarmPoolSpec( + '{}.sandbox_warmpool'.format(component_full_name), + flag_values=flag_values) + + @classmethod + def _GetOptionDecoderConstructions(cls): + result = super()._GetOptionDecoderConstructions() + result.update({ + 'manifest_ref': (option_decoders.StringDecoder, + {'default': _DEFAULT_MANIFEST_REF}), + 'namespace': (option_decoders.StringDecoder, {'default': 'default'}), + 'controller': (_ControllerDecoder, {'default': None, 'none_ok': True}), + 'sandbox_template': (_SandboxTemplateDecoder, + {'default': None, 'none_ok': True}), + 'sandbox_warmpool': (_SandboxWarmPoolDecoder, + {'default': None, 'none_ok': True}), + }) + return result + + @classmethod + def _ApplyFlags(cls, config_values, flag_values): + super()._ApplyFlags(config_values, flag_values) + if flag_values['agent_sandbox_manifest_ref'].present: + config_values['manifest_ref'] = flag_values.agent_sandbox_manifest_ref diff --git a/tests/linux_benchmarks/agent_sandbox_loadgen_test.py b/tests/linux_benchmarks/agent_sandbox_loadgen_test.py new file mode 100644 index 0000000000..30c4cb5e38 --- /dev/null +++ b/tests/linux_benchmarks/agent_sandbox_loadgen_test.py @@ -0,0 +1,242 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the agent_sandbox load generator.""" + +import threading +import unittest + +from perfkitbenchmarker.linux_benchmarks import agent_sandbox_loadgen +from tests import pkb_common_test_case + + +class ResolveRunShapeTest(pkb_common_test_case.PkbCommonTestCase): + + def testQpsAndDurationDeriveTotal(self): + shape = agent_sandbox_loadgen.resolve_run_shape(qps=10.0, duration=5.0) + self.assertEqual(shape.qps, 10.0) + self.assertEqual(shape.duration, 5.0) + self.assertEqual(shape.total, 50) + + def testQpsAndTotalDeriveDuration(self): + shape = agent_sandbox_loadgen.resolve_run_shape(qps=10.0, total=100) + self.assertEqual(shape.total, 100) + self.assertAlmostEqual(shape.duration, 10.0) + + def testDurationAndTotalDeriveQps(self): + shape = agent_sandbox_loadgen.resolve_run_shape(duration=4.0, total=20) + self.assertEqual(shape.total, 20) + self.assertEqual(shape.duration, 4.0) + self.assertAlmostEqual(shape.qps, 5.0) + + def testAllThreeProvidedPassesThrough(self): + shape = agent_sandbox_loadgen.resolve_run_shape( + qps=2.0, duration=3.0, total=999 + ) + self.assertEqual(shape.qps, 2.0) + self.assertEqual(shape.duration, 3.0) + self.assertEqual(shape.total, 999) + + def testFewerThanTwoArgsRaises(self): + with self.assertRaises(ValueError): + agent_sandbox_loadgen.resolve_run_shape(qps=5.0) + + def testNoArgsRaises(self): + with self.assertRaises(ValueError): + agent_sandbox_loadgen.resolve_run_shape() + + def testTotalIsRoundedInt(self): + # qps=3, duration=2 -> 6.0 exact + shape = agent_sandbox_loadgen.resolve_run_shape(qps=3.0, duration=2.0) + self.assertIsInstance(shape.total, int) + self.assertEqual(shape.total, 6) + + def testQpsAndDurationRoundsTotal(self): + # 10 * 0.33 = 3.3 -> rounds to 3 + shape = agent_sandbox_loadgen.resolve_run_shape(qps=10.0, duration=0.33) + self.assertIsInstance(shape.total, int) + self.assertEqual(shape.total, 3) + + +class ClaimRecordTest(pkb_common_test_case.PkbCommonTestCase): + + def testStartupTimeWhenReadyAtSet(self): + rec = agent_sandbox_loadgen.ClaimRecord( + name='c0', requested_at=100.0, ready_at=102.5 + ) + self.assertAlmostEqual(rec.startup_time_s, 2.5) + + def testStartupTimeIsNoneWhenReadyAtUnset(self): + rec = agent_sandbox_loadgen.ClaimRecord(name='c0', requested_at=100.0) + self.assertIsNone(rec.startup_time_s) + + def testExecDuration(self): + rec = agent_sandbox_loadgen.ClaimRecord( + name='c1', + requested_at=100.0, + exec_started_at=103.0, + exec_completed_at=108.0, + ) + self.assertAlmostEqual(rec.exec_duration_s, 5.0) + + def testExecDurationNoneWhenStartedAtUnset(self): + rec = agent_sandbox_loadgen.ClaimRecord( + name='c1', requested_at=100.0, exec_completed_at=108.0 + ) + self.assertIsNone(rec.exec_duration_s) + + def testExecDurationNoneWhenCompletedAtUnset(self): + rec = agent_sandbox_loadgen.ClaimRecord( + name='c1', requested_at=100.0, exec_started_at=103.0 + ) + self.assertIsNone(rec.exec_duration_s) + + def testTotalLifecycle(self): + rec = agent_sandbox_loadgen.ClaimRecord( + name='c2', + requested_at=100.0, + released_at=110.0, + ) + self.assertAlmostEqual(rec.total_lifecycle_s, 10.0) + + def testTotalLifecycleNoneWhenReleasedAtUnset(self): + rec = agent_sandbox_loadgen.ClaimRecord(name='c2', requested_at=100.0) + self.assertIsNone(rec.total_lifecycle_s) + + def testAllTimingsTogether(self): + rec = agent_sandbox_loadgen.ClaimRecord( + name='c3', + requested_at=100.0, + ready_at=102.0, + exec_started_at=103.0, + exec_completed_at=107.0, + released_at=109.0, + ) + self.assertAlmostEqual(rec.startup_time_s, 2.0) + self.assertAlmostEqual(rec.exec_duration_s, 4.0) + self.assertAlmostEqual(rec.total_lifecycle_s, 9.0) + + +class LoadGeneratorRunTest(pkb_common_test_case.PkbCommonTestCase): + """Tests LoadGenerator.run() using a fake in-memory driver. + + The fake driver: + - create(): records the name and immediately marks it ready (via the + stream_claim_events generator that the watch thread drains). + - stream_claim_events(): yields one Ready event per created claim, then + blocks on the stop_event so the watch thread exits cleanly. + - is_ready(): always True. + - served_warm(): always False. + - delete(): no-op. + """ + + def _make_driver(self, ready_at_delay=0.0): + """Builds a fake ClaimDriver-like object.""" + driver = _FakeDriver(ready_at_delay=ready_at_delay) + return driver + + def testRunReturnOneRecordPerClaim(self): + driver = self._make_driver() + lg = agent_sandbox_loadgen.LoadGenerator( + driver=driver, + ready_timeout=5.0, + max_concurrent=4, + ) + counter = [0.0] + + def clock(): + counter[0] += 0.01 + return counter[0] + + shape = agent_sandbox_loadgen.resolve_run_shape(qps=10.0, total=3) + records = lg.run(shape, clock=clock, sleeper=lambda _: None) + + self.assertLen(records, 3) + self.assertEqual([r.name for r in records], ['claim-0', 'claim-1', 'claim-2']) + + def testRunRecordsReadyAt(self): + # Asserting ready_at is recorded depends on the watch-consumer thread + # observing the Ready event before the drain loop times out. A mocked clock + # plus a no-op sleeper lets the main thread blow through the logical drain + # deadline in microseconds of wall-clock time, before the real watch thread + # records ready_at (a test-only race). Use the real monotonic clock and real + # sleep (the run() defaults) so the drain window is a real 5s, which the + # watch thread reaches in milliseconds. The drain loop still exits early once + # both claims are accounted, so this runs in a fraction of a second. + driver = self._make_driver() + lg = agent_sandbox_loadgen.LoadGenerator( + driver=driver, + ready_timeout=5.0, + max_concurrent=4, + ) + + shape = agent_sandbox_loadgen.resolve_run_shape(qps=10.0, total=2) + records = lg.run(shape) + + for rec in records: + self.assertIsNotNone( + rec.ready_at, + msg=f'ready_at should be set for {rec.name}', + ) + self.assertIsNone(rec.error) + + +class _FakeDriver: + """Minimal in-memory fake for ClaimDriver. + + Implements the interface consumed by LoadGenerator: + - create(name) + - stream_claim_events(stop_event) -> iterator of {'name', 'status'} + - is_ready(status) -> bool + - served_warm(status) -> bool or None + - delete(name) + """ + + _READY_STATUS = {'conditions': [{'type': 'Ready', 'status': 'True'}]} + + def __init__(self, ready_at_delay=0.0): + self._ready_at_delay = ready_at_delay + self._queue = [] + self._queue_cv = threading.Condition() + + def create(self, name): + with self._queue_cv: + self._queue.append(name) + self._queue_cv.notify_all() + + def stream_claim_events(self, stop_event): + seen = 0 + while not stop_event.is_set(): + with self._queue_cv: + while len(self._queue) <= seen and not stop_event.is_set(): + self._queue_cv.wait(timeout=0.05) + if stop_event.is_set(): + return + batch = self._queue[seen:] + seen += len(batch) + for name in batch: + yield {'name': name, 'status': self._READY_STATUS} + + def is_ready(self, status): + return True + + def served_warm(self, status): + return False + + def delete(self, name): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/linux_benchmarks/agent_sandbox_metrics_test.py b/tests/linux_benchmarks/agent_sandbox_metrics_test.py new file mode 100644 index 0000000000..c66277bd37 --- /dev/null +++ b/tests/linux_benchmarks/agent_sandbox_metrics_test.py @@ -0,0 +1,241 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for the agent_sandbox metrics.""" + +import unittest + +from perfkitbenchmarker.linux_benchmarks import agent_sandbox_loadgen +from perfkitbenchmarker.linux_benchmarks import agent_sandbox_metrics +from tests import pkb_common_test_case + + +def _record( + name, + requested_at, + ready_at=None, + error=None, + warm_served=None, + exec_started_at=None, + exec_completed_at=None, + released_at=None, +): + return agent_sandbox_loadgen.ClaimRecord( + name=name, + requested_at=requested_at, + ready_at=ready_at, + error=error, + warm_served=warm_served, + exec_started_at=exec_started_at, + exec_completed_at=exec_completed_at, + released_at=released_at, + ) + + +class PercentileTest(pkb_common_test_case.PkbCommonTestCase): + + def testPercentileLinearInterpolation(self): + # [1,2,3,4], p50 -> rank = 3*0.5 = 1.5, low=1,high=2 -> 2 + (3-2)*0.5 = 2.5 + self.assertAlmostEqual( + agent_sandbox_metrics.percentile([1.0, 2.0, 3.0, 4.0], 50), 2.5 + ) + + def testPercentileExactBoundary(self): + # [1,2,3], p100 -> rank=2.0 (int) -> 3.0 + self.assertAlmostEqual( + agent_sandbox_metrics.percentile([1.0, 2.0, 3.0], 100), 3.0 + ) + + def testPercentileEmptyReturnsZero(self): + self.assertAlmostEqual(agent_sandbox_metrics.percentile([], 50), 0.0) + + def testPercentileSingleElement(self): + self.assertAlmostEqual(agent_sandbox_metrics.percentile([7.0], 99), 7.0) + + + +class BuildSamplesTest(pkb_common_test_case.PkbCommonTestCase): + + def _two_success_records(self): + return [ + _record('c0', requested_at=100.0, ready_at=101.0, warm_served=False), + _record('c1', requested_at=100.0, ready_at=103.0, warm_served=True), + ] + + def testStartupPercentileMetricsPresent(self): + records = self._two_success_records() + samples = agent_sandbox_metrics.build_samples( + records, peak_concurrency=2, metadata={'target_qps': 10.0} + ) + by_name = {s.metric: s for s in samples} + for pct in (50, 90, 95, 99): + self.assertIn(f'startup_time_p{pct}', by_name) + self.assertIn('startup_time_max', by_name) + + def testStartupTimeValues(self): + records = self._two_success_records() + samples = agent_sandbox_metrics.build_samples( + records, peak_concurrency=2, metadata={} + ) + by_name = {s.metric: s for s in samples} + # startup times are 1.0 and 3.0; p50 = (1+3)/2 = 2.0 + self.assertAlmostEqual(by_name['startup_time_p50'].value, 2.0) + self.assertAlmostEqual(by_name['startup_time_max'].value, 3.0) + + def testStartupTimeUnits(self): + records = self._two_success_records() + samples = agent_sandbox_metrics.build_samples( + records, peak_concurrency=1, metadata={} + ) + by_name = {s.metric: s for s in samples} + self.assertEqual(by_name['startup_time_p50'].unit, 'seconds') + self.assertEqual(by_name['startup_time_max'].unit, 'seconds') + + def testCountsAndConcurrency(self): + records = self._two_success_records() + samples = agent_sandbox_metrics.build_samples( + records, peak_concurrency=2, metadata={} + ) + by_name = {s.metric: s for s in samples} + self.assertEqual(by_name['peak_concurrency'].value, 2) + self.assertEqual(by_name['success_count'].value, 2) + self.assertEqual(by_name['error_count'].value, 0) + + def testErrorCount(self): + records = [ + _record('c0', requested_at=100.0, ready_at=101.0, warm_served=False), + _record('c1', requested_at=100.0, ready_at=None, error='Timeout'), + _record('c2', requested_at=100.0, ready_at=None, error='Timeout'), + ] + samples = agent_sandbox_metrics.build_samples( + records, peak_concurrency=1, metadata={} + ) + by_name = {s.metric: s for s in samples} + self.assertEqual(by_name['error_count'].value, 2) + self.assertEqual(by_name['success_count'].value, 1) + self.assertIn('error_count_Timeout', by_name) + self.assertEqual(by_name['error_count_Timeout'].value, 2) + + def testMetadataAttachedToAllSamples(self): + records = self._two_success_records() + meta = {'target_qps': 10.0, 'run_id': 'abc'} + samples = agent_sandbox_metrics.build_samples( + records, peak_concurrency=2, metadata=meta + ) + for s in samples: + self.assertEqual(s.metadata.get('target_qps'), 10.0) + + def testWarmServedFraction(self): + records = self._two_success_records() + samples = agent_sandbox_metrics.build_samples( + records, peak_concurrency=2, metadata={} + ) + by_name = {s.metric: s for s in samples} + self.assertIn('warm_served_fraction', by_name) + self.assertAlmostEqual(by_name['warm_served_fraction'].value, 0.5) + self.assertEqual(by_name['warm_served_fraction'].unit, 'fraction') + + def testQpsMetricsPresent(self): + records = self._two_success_records() + samples = agent_sandbox_metrics.build_samples( + records, peak_concurrency=2, metadata={} + ) + by_name = {s.metric: s for s in samples} + self.assertIn('submit_qps', by_name) + self.assertIn('completion_qps', by_name) + self.assertEqual(by_name['submit_qps'].unit, 'count/sec') + self.assertEqual(by_name['completion_qps'].unit, 'count/sec') + + def testExecDurationAbsentWhenNotSet(self): + # Records have no exec_started_at / exec_completed_at -> no exec metrics + records = self._two_success_records() + samples = agent_sandbox_metrics.build_samples( + records, peak_concurrency=2, metadata={} + ) + by_name = {s.metric: s for s in samples} + for pct in (50, 90, 95, 99): + self.assertNotIn(f'exec_duration_s_p{pct}', by_name) + self.assertNotIn('exec_duration_s_max', by_name) + + def testTotalLifecycleAbsentWhenNotSet(self): + # Records have no released_at -> no lifecycle metrics + records = self._two_success_records() + samples = agent_sandbox_metrics.build_samples( + records, peak_concurrency=2, metadata={} + ) + by_name = {s.metric: s for s in samples} + for pct in (50, 90, 95, 99): + self.assertNotIn(f'total_lifecycle_s_p{pct}', by_name) + self.assertNotIn('total_lifecycle_s_max', by_name) + + def testExecDurationPresentWhenSet(self): + records = [ + _record( + 'c0', + requested_at=100.0, + ready_at=101.0, + warm_served=False, + exec_started_at=102.0, + exec_completed_at=104.0, + ), + _record( + 'c1', + requested_at=100.0, + ready_at=103.0, + warm_served=True, + exec_started_at=104.0, + exec_completed_at=107.0, + ), + ] + samples = agent_sandbox_metrics.build_samples( + records, peak_concurrency=2, metadata={} + ) + by_name = {s.metric: s for s in samples} + self.assertIn('exec_duration_s_p50', by_name) + self.assertIn('exec_duration_s_max', by_name) + # durations: 2.0 and 3.0; p50 = 2.5, max = 3.0 + self.assertAlmostEqual(by_name['exec_duration_s_p50'].value, 2.5) + self.assertAlmostEqual(by_name['exec_duration_s_max'].value, 3.0) + self.assertEqual(by_name['exec_duration_s_p50'].unit, 'seconds') + + def testTotalLifecyclePresentWhenSet(self): + records = [ + _record( + 'c0', + requested_at=100.0, + ready_at=101.0, + warm_served=False, + released_at=110.0, + ), + _record( + 'c1', + requested_at=100.0, + ready_at=103.0, + warm_served=True, + released_at=115.0, + ), + ] + samples = agent_sandbox_metrics.build_samples( + records, peak_concurrency=2, metadata={} + ) + by_name = {s.metric: s for s in samples} + self.assertIn('total_lifecycle_s_p50', by_name) + self.assertIn('total_lifecycle_s_max', by_name) + # lifecycles: 10.0 and 15.0; p50 = 12.5, max = 15.0 + self.assertAlmostEqual(by_name['total_lifecycle_s_p50'].value, 12.5) + self.assertAlmostEqual(by_name['total_lifecycle_s_max'].value, 15.0) + self.assertEqual(by_name['total_lifecycle_s_p50'].unit, 'seconds') + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/resources/kubernetes/k8s_agent_sandbox_test.py b/tests/resources/kubernetes/k8s_agent_sandbox_test.py new file mode 100644 index 0000000000..4a44e8cdc9 --- /dev/null +++ b/tests/resources/kubernetes/k8s_agent_sandbox_test.py @@ -0,0 +1,325 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for the Kubernetes agent sandbox spec and resource.""" + +import unittest +from unittest import mock + +import yaml +from absl import flags +from perfkitbenchmarker.resources import agent_sandbox +from perfkitbenchmarker.resources import agent_sandbox_spec +from perfkitbenchmarker.resources.kubernetes import k8s_agent_sandbox +from perfkitbenchmarker.resources.kubernetes import k8s_agent_sandbox_spec +from tests import pkb_common_test_case + +FLAGS = flags.FLAGS + +_COMPONENT = 'test_component' + + +class K8sAgentSandboxSpecTest(pkb_common_test_case.PkbCommonTestCase): + + def _Decode(self, **overrides): + config = {'type': 'Kubernetes'} + config.update(overrides) + return k8s_agent_sandbox_spec.K8sAgentSandboxConfigSpec( + _COMPONENT, flag_values=FLAGS, **config + ) + + def testDefaults(self): + spec = self._Decode() + self.assertEqual(spec.type, 'Kubernetes') + self.assertEqual(spec.namespace, 'default') + self.assertIsInstance(spec.controller, k8s_agent_sandbox_spec.ControllerSpec) + self.assertIsInstance( + spec.sandbox_template, k8s_agent_sandbox_spec.SandboxTemplateSpec) + self.assertIsInstance( + spec.sandbox_warmpool, k8s_agent_sandbox_spec.SandboxWarmPoolSpec) + self.assertEqual(spec.sandbox_template.runtime_class, 'runsc') + self.assertEqual(spec.sandbox_warmpool.replicas, 0) + self.assertFalse(spec.controller.leader_elect) + + def testNestedOverrides(self): + spec = self._Decode( + manifest_ref='abc123', + controller={'claim_workers': 8, 'leader_elect': True}, + sandbox_template={'runtime_class': 'gvisor', 'cpu_limit': '4'}, + sandbox_warmpool={'replicas': 5}, + ) + self.assertEqual(spec.manifest_ref, 'abc123') + self.assertEqual(spec.controller.claim_workers, 8) + self.assertTrue(spec.controller.leader_elect) + self.assertEqual(spec.sandbox_template.runtime_class, 'gvisor') + self.assertEqual(spec.sandbox_template.cpu_limit, '4') + self.assertEqual(spec.sandbox_warmpool.replicas, 5) + + def testFlagsOverrideConfig(self): + FLAGS['agent_sandbox_manifest_ref'].parse('deadbeef') + FLAGS['agent_sandbox_runtime_class'].parse('gvisor') + FLAGS['agent_sandbox_warmpool_replicas'].parse(7) + spec = self._Decode() + self.assertEqual(spec.manifest_ref, 'deadbeef') + self.assertEqual(spec.sandbox_template.runtime_class, 'gvisor') + self.assertEqual(spec.sandbox_warmpool.replicas, 7) + + +class ConfigureControllerManifestTest(pkb_common_test_case.PkbCommonTestCase): + + def _ManifestYaml(self): + manifest = { + 'kind': 'Deployment', + 'spec': {'template': {'spec': {'containers': [{ + 'name': 'manager', + 'image': 'placeholder', + 'args': ['--leader-elect=true', '--existing-arg'], + 'resources': {}, + }]}}}, + } + return yaml.dump(manifest, default_flow_style=False) + + def testImageAndTuningInjected(self): + result_yaml = k8s_agent_sandbox._configure_controller_manifest( + self._ManifestYaml(), + controller_image='my/image:tag', + tuning={'claim_workers': 8, 'kube_api_qps': 50, 'leader_elect': True}, + ) + out = yaml.safe_load(result_yaml) + container = out['spec']['template']['spec']['containers'][0] + self.assertEqual(container['image'], 'my/image:tag') + self.assertIn('--sandbox-claim-concurrent-workers=8', container['args']) + self.assertIn('--kube-api-qps=50', container['args']) + + def testResourceDefaultsApplied(self): + result_yaml = k8s_agent_sandbox._configure_controller_manifest( + self._ManifestYaml(), controller_image='img', tuning={}) + out = yaml.safe_load(result_yaml) + res = out['spec']['template']['spec']['containers'][0]['resources'] + self.assertEqual( + res['requests']['cpu'], k8s_agent_sandbox._DEFAULT_CPU_REQUEST) + self.assertEqual( + res['limits']['memory'], k8s_agent_sandbox._DEFAULT_MEMORY_LIMIT) + + def testResourceTuningOverridesDefaults(self): + result_yaml = k8s_agent_sandbox._configure_controller_manifest( + self._ManifestYaml(), + controller_image='img', + tuning={'cpu_request': '1', 'memory_limit': '2Gi'}, + ) + out = yaml.safe_load(result_yaml) + res = out['spec']['template']['spec']['containers'][0]['resources'] + self.assertEqual(res['requests']['cpu'], '1') + self.assertEqual(res['limits']['memory'], '2Gi') + + +class K8sAgentSandboxCreateTest(pkb_common_test_case.PkbCommonTestCase): + + def _Sandbox(self, **template_overrides): + sandbox_spec = k8s_agent_sandbox_spec.K8sAgentSandboxConfigSpec( + _COMPONENT, flag_values=FLAGS, + type='Kubernetes', manifest_ref='ref123', + sandbox_warmpool={'replicas': 3}, + sandbox_template=template_overrides or {'runtime_class': 'runsc'}, + ) + cluster = mock.Mock() + nodepool = mock.Mock() + nodepool.name = 'sandbox' + cluster.nodepools = {'sandbox': nodepool} + return k8s_agent_sandbox.K8sAgentSandbox(sandbox_spec, cluster) + + @mock.patch.object(k8s_agent_sandbox, 'install_crds_and_rbac') + @mock.patch.object(k8s_agent_sandbox, 'install_gvisor') + def testCreateOrchestration(self, mock_gvisor, mock_crds_rbac): + sandbox = self._Sandbox() + sandbox._Create() + mock_gvisor.assert_called_once() + mock_crds_rbac.assert_called_once_with('ref123') + + @mock.patch.object(k8s_agent_sandbox, 'install_warmpool') + @mock.patch.object(k8s_agent_sandbox, 'apply_template') + @mock.patch.object(k8s_agent_sandbox, 'install_controller') + def testInstallWorkloadOrchestration( + self, mock_controller, mock_template, mock_warmpool): + sandbox = self._Sandbox() + sandbox.InstallWorkload() + mock_controller.assert_called_once() + self.assertEqual( + mock_controller.call_args.kwargs['controller_ref'], 'ref123') + mock_template.assert_called_once() + self.assertIs(mock_template.call_args.args[0], sandbox.spec.sandbox_template) + mock_warmpool.assert_called_once() + self.assertEqual(mock_warmpool.call_args.args[-1], 3) + + def testRefreshSpecFromFlagsAppliesCurrentFlags(self): + sandbox = self._Sandbox() + FLAGS['agent_sandbox_warmpool_replicas'].parse(42) + FLAGS['agent_sandbox_runtime_class'].parse('gvisor') + sandbox.RefreshSpecFromFlags() + self.assertEqual(sandbox.spec.sandbox_warmpool.replicas, 42) + self.assertEqual(sandbox.spec.sandbox_template.runtime_class, 'gvisor') + + def testDeleteIsNoOp(self): + sandbox = self._Sandbox() + self.assertIsNone(sandbox._Delete()) + + +class AgentSandboxRunTest(pkb_common_test_case.PkbCommonTestCase): + + def testRunUsesJobFlowAndParsesJsonl(self): + from perfkitbenchmarker.linux_benchmarks import agent_sandbox_benchmark + from perfkitbenchmarker.linux_benchmarks import agent_sandbox_loadgen + from perfkitbenchmarker.linux_benchmarks import agent_sandbox_metrics + from perfkitbenchmarker.resources.container_service import kubectl as kubectl_mod + from perfkitbenchmarker.resources.container_service import kubernetes_commands + import dataclasses + import json + + sandbox_spec = k8s_agent_sandbox_spec.K8sAgentSandboxConfigSpec( + _COMPONENT, flag_values=FLAGS, type='Kubernetes', + namespace='sandboxes', sandbox_warmpool={'replicas': 4}) + sandbox = k8s_agent_sandbox.K8sAgentSandbox(sandbox_spec, mock.Mock()) + bm_spec = mock.Mock() + bm_spec.agent_sandbox = sandbox + bm_spec.container_cluster.GetResourceMetadata.return_value = {} + + rec = agent_sandbox_loadgen.ClaimRecord( + name='claim-0', requested_at=100.0, ready_at=102.0, + warm_served=False, released_at=103.0) + rec_line = json.dumps(dataclasses.asdict(rec)) + summary_line = json.dumps({'summary': {'peak_concurrency': 3}}) + fake_logs = f'some startup output\n---RESULTS---\n{rec_line}\n{summary_line}\n' + + sentinel = [mock.sentinel.sample] + with mock.patch.object( + kubectl_mod, 'RunKubectlCommand', + return_value=('pod-abc-123', '', 0)) as mock_kubectl, \ + mock.patch.object( + kubernetes_commands, 'ApplyManifest') as mock_apply, \ + mock.patch.object( + kubernetes_commands, 'WaitForResourceForMultiConditions', + return_value='condition=Complete') as mock_wait, \ + mock.patch.object( + agent_sandbox_metrics, 'build_samples', + return_value=sentinel) as mock_build: + # First RunKubectlCommand call is 'delete job'; second is 'get pods'; + # third is 'logs'. Override side_effect for logs call. + kubectl_calls = [ + ('', '', 0), # delete job --ignore-not-found + ('pod-abc-123', '', 0), # get pods -l job-name=... + (fake_logs, '', 0), # logs pod-abc-123 + ] + mock_kubectl.side_effect = kubectl_calls + result = agent_sandbox_benchmark.Run(bm_spec) + + self.assertIs(result, sentinel) + mock_wait.assert_called_once() + wait_kwargs = mock_wait.call_args + actual_conditions = wait_kwargs.kwargs.get( + 'conditions', wait_kwargs.args[1] if len(wait_kwargs.args) > 1 else []) + self.assertIn('condition=Complete', actual_conditions) + self.assertIn('condition=Failed', actual_conditions) + mock_apply.assert_called() + apply_calls = [str(c) for c in mock_apply.call_args_list] + self.assertTrue( + any('load_runner_job' in c for c in apply_calls), + f'Expected ApplyManifest call for load_runner_job manifest; got: {apply_calls}') + mock_build.assert_called_once() + build_args = mock_build.call_args + parsed_records, peak, meta = build_args.args + self.assertEqual(len(parsed_records), 1) + self.assertEqual(parsed_records[0].name, 'claim-0') + self.assertEqual(peak, 3) + self.assertEqual(meta['warmpool_replicas'], 4) + + +class AgentSandboxBenchmarkConfigTest(pkb_common_test_case.PkbCommonTestCase): + + def testConfigBuildsK8sAgentSandbox(self): + from perfkitbenchmarker import configs + from perfkitbenchmarker.linux_benchmarks import agent_sandbox_benchmark + config = configs.LoadConfig( + agent_sandbox_benchmark.BENCHMARK_CONFIG, {}, + agent_sandbox_benchmark.BENCHMARK_NAME) + agent_sandbox_dict = config['agent_sandbox'] + sandbox_spec = agent_sandbox_spec.AgentSandboxConfigDecoder( + option='agent_sandbox').Decode( + agent_sandbox_dict, 'test', FLAGS) + sandbox = agent_sandbox.GetAgentSandbox(sandbox_spec, mock.Mock()) + self.assertIsInstance(sandbox, k8s_agent_sandbox.K8sAgentSandbox) + + +class SandboxSchedulingTest(pkb_common_test_case.PkbCommonTestCase): + + def testTaintToTolerationWithValue(self): + self.assertEqual( + k8s_agent_sandbox._taint_to_toleration( + 'sandbox.gke.io/runtime=runsc:NoSchedule'), + { + 'key': 'sandbox.gke.io/runtime', + 'operator': 'Equal', + 'value': 'runsc', + 'effect': 'NoSchedule', + }) + + def testTaintToTolerationNoValue(self): + self.assertEqual( + k8s_agent_sandbox._taint_to_toleration('dedicated:NoSchedule'), + {'key': 'dedicated', 'operator': 'Exists', 'effect': 'NoSchedule'}) + + def testTaintToTolerationMalformedRaises(self): + with self.assertRaises(ValueError): + k8s_agent_sandbox._taint_to_toleration('no-effect') + + def testSandboxSchedulingSelectorAndToleration(self): + node_selector, tolerations = k8s_agent_sandbox._sandbox_scheduling('sandbox') + self.assertEqual(node_selector, {'pkb_nodepool': 'sandbox'}) + self.assertEqual(tolerations, [{ + 'key': 'sandbox.gke.io/runtime', + 'operator': 'Equal', + 'value': 'runsc', + 'effect': 'NoSchedule', + }]) + + def testRenderGvisorDaemonsetSchedulesOnPkbNodepool(self): + node_selector, tolerations = k8s_agent_sandbox._sandbox_scheduling('sandbox') + manifest = yaml.safe_load( + k8s_agent_sandbox._render_gvisor_daemonset(node_selector, tolerations)) + pod_spec = manifest['spec']['template']['spec'] + self.assertEqual(pod_spec['nodeSelector'], {'pkb_nodepool': 'sandbox'}) + self.assertEqual(pod_spec['tolerations'], tolerations) + + def testRenderTemplateManifestSchedulingAndRuntimeClass(self): + template_spec = mock.Mock() + template_spec.runtime_class = 'runsc' + template_spec.image = 'img:latest' + template_spec.cpu_request = '500m' + template_spec.cpu_limit = '2' + template_spec.memory_request = '256Mi' + template_spec.memory_limit = '1Gi' + template_spec.labels = {'sandbox': 'python-sandbox-bench'} + node_selector, tolerations = k8s_agent_sandbox._sandbox_scheduling('sandbox') + manifest = yaml.safe_load( + k8s_agent_sandbox._render_template_manifest( + template_spec, node_selector, tolerations)) + pod_spec = manifest['spec']['podTemplate']['spec'] + self.assertEqual(pod_spec['nodeSelector'], {'pkb_nodepool': 'sandbox'}) + self.assertEqual(pod_spec['tolerations'], tolerations) + # runtimeClassName stays as runtime identity, not scheduling. + self.assertEqual(pod_spec['runtimeClassName'], 'runsc') + # The old runtime label is no longer used as a node selector. + self.assertNotIn('sandbox.gke.io/runtime', pod_spec['nodeSelector']) + + +if __name__ == '__main__': + unittest.main()