diff --git a/perfkitbenchmarker/configs/container_spec.py b/perfkitbenchmarker/configs/container_spec.py index 1f808ad066..cb20ef883a 100644 --- a/perfkitbenchmarker/configs/container_spec.py +++ b/perfkitbenchmarker/configs/container_spec.py @@ -243,6 +243,7 @@ def __init__( self.vm_spec: virtual_machine_spec.BaseVmSpec self.machine_families: list[str] | None self.sandbox_config: SandboxSpec | None + self.swap_config: SwapConfigSpec | None @classmethod def _GetOptionDecoderConstructions(cls): @@ -273,6 +274,7 @@ def _GetOptionDecoderConstructions(cls): ), 'vm_spec': (spec.PerCloudConfigDecoder, {}), 'sandbox_config': (_SandboxDecoder, {'default': None}), + 'swap_config': (_SwapConfigDecoder, {'default': None}), }) return result @@ -333,6 +335,100 @@ def Decode(self, value, component_full_name, flag_values): return result +class SwapConfigSpec(spec.BaseSpec): + """Configurable swap options for a GKE/EKS nodepool. + + Declared in BENCHMARK_CONFIG under nodepools..swap_config. + Consumed by the cloud provider's _AddNodeParamsToCmd() / equivalent to + apply the cloud-specific swap configuration during nodepool creation. + + Attributes: + enabled: Whether to enable swap on the nodepool (default True). + swappiness: vm.swappiness sysctl value (0-200, default 100). + min_free_kbytes: vm.min_free_kbytes sysctl (default 200). + watermark_scale_factor: vm.watermark_scale_factor sysctl (default 500). + lssd: True if the nodepool uses local NVMe SSDs for the swap device. + lssd_count: Number of local NVMe SSDs (GKE dedicatedLocalSsdProfile). + boot_disk_iops: Provisioned IOPS for hyperdisk-balanced (0 = not set). + boot_disk_throughput: Provisioned throughput MiB/s for hyperdisk-balanced. + """ + + def __init__(self, *args, **kwargs): + self.enabled: bool = True + self.swappiness: int = 100 + self.min_free_kbytes: int = 200 + self.watermark_scale_factor: int = 500 + self.lssd: bool = False + self.lssd_count: int = 0 + self.boot_disk_iops: int = 0 + self.boot_disk_throughput: int = 0 + super().__init__(*args, **kwargs) + + @classmethod + def _GetOptionDecoderConstructions(cls): + result = super()._GetOptionDecoderConstructions() + result.update({ + 'enabled': ( + option_decoders.BooleanDecoder, + {'default': True}, + ), + 'swappiness': ( + option_decoders.IntDecoder, + {'default': 100, 'min': 0, 'max': 200}, + ), + 'min_free_kbytes': ( + option_decoders.IntDecoder, + {'default': 200, 'min': 0}, + ), + 'watermark_scale_factor': ( + option_decoders.IntDecoder, + {'default': 500, 'min': 0}, + ), + 'lssd': ( + option_decoders.BooleanDecoder, + {'default': False}, + ), + 'lssd_count': ( + option_decoders.IntDecoder, + {'default': 0, 'min': 0}, + ), + 'boot_disk_iops': ( + option_decoders.IntDecoder, + {'default': 0, 'min': 0}, + ), + 'boot_disk_throughput': ( + option_decoders.IntDecoder, + {'default': 0, 'min': 0}, + ), + }) + return result + + +class _SwapConfigDecoder(option_decoders.TypeVerifier): + """Decodes the swap_config option of a NodepoolSpec.""" + + def Decode(self, value, component_full_name, flag_values): + """Decodes the swap_config dictionary into a SwapConfigSpec. + + Args: + value: dict. Keys match SwapConfigSpec._GetOptionDecoderConstructions. + component_full_name: str. Fully qualified name of the parent component. + flag_values: flags.FlagValues. Runtime flags propagated to BaseSpec. + + Returns: + SwapConfigSpec instance. + + Raises: + errors.Config.InvalidValue upon invalid input value. + """ + super().Decode(value, component_full_name, flag_values) + return SwapConfigSpec( + self._GetOptionFullName(component_full_name), + flag_values=flag_values, + **value, + ) + + class SandboxSpec(spec.BaseSpec): """Configurable options for sandboxed node pools.""" diff --git a/perfkitbenchmarker/data/cluster/swap_encryption_daemonset.yaml.j2 b/perfkitbenchmarker/data/cluster/swap_encryption_daemonset.yaml.j2 new file mode 100644 index 0000000000..29cacfb3ce --- /dev/null +++ b/perfkitbenchmarker/data/cluster/swap_encryption_daemonset.yaml.j2 @@ -0,0 +1,120 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: {{ ds_name }} + namespace: {{ ds_namespace }} + labels: + app: {{ ds_label }} +spec: + selector: + matchLabels: + app: {{ ds_label }} + template: + metadata: + labels: + app: {{ ds_label }} + spec: + hostPID: true + hostNetwork: true + # Pin to the benchmark nodepool — never schedule on the dummy default pool. + nodeSelector: + pkb_nodepool: {{ benchmark_nodepool }} + tolerations: + - operator: Exists + containers: + - name: benchmark + image: {{ image }} + command: + - bash + - -c + - | + echo "[pkb] Installing measurement tools..." + # Only the tools needed for Phase 1 (raw-device fio) and Phase 2 + # (CPU/I/O overhead) are installed here. Workload benchmarks + # (redis, opensearch, kernel-build) run in separate pods via + # existing PKB benchmark modules and are NOT installed here. + PKB_APT_OK=0 + for _attempt in 1 2 3; do + apt-get update -qq 2>&1 || true + DEBIAN_FRONTEND=noninteractive apt-get install -y -qq \ + fio \ + cryptsetup \ + mdadm \ + sysstat \ + nvme-cli \ + 2>&1 && PKB_APT_OK=1 && break + echo "[pkb] apt-get attempt $_attempt failed, retrying in 15s..." >&2 + sleep 15 + done + if [ "$PKB_APT_OK" != "1" ] || ! command -v fio >/dev/null 2>&1; then + echo "[pkb] FATAL: fio not installed after 3 attempts" >&2 + exit 1 + fi + echo "[pkb] fio installed: $(fio --version 2>&1 | head -1)" + echo "[pkb] Verifying swap device is active..." + PKB_SWAP_FOUND=0 + for _attempt in $(seq 1 30); do + if awk 'NR>1{found=1} END{exit !found}' /proc/swaps 2>/dev/null; then + PKB_SWAP_DEV=$(awk 'NR==2{print $1}' /proc/swaps) + echo "[pkb] Swap device active: $PKB_SWAP_DEV" + PKB_SWAP_FOUND=1 + break + fi + echo "[pkb] Waiting for swap device (attempt $_attempt/30)..." >&2 + sleep 5 + done + if [ "$PKB_SWAP_FOUND" != "1" ]; then + echo "[pkb] WARNING: no active swap device after 150s — " \ + "check linuxConfig.swapConfig / kubelet swap config." >&2 + fi + echo "[pkb] Measurement tools ready. Writing ready sentinel." + touch /tmp/pkb_ready + sleep infinity + securityContext: + privileged: true + capabilities: + add: ["SYS_ADMIN", "IPC_LOCK"] + resources: + requests: + memory: "512Mi" + env: + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + volumeMounts: + - name: dev + mountPath: /dev + - name: sys + mountPath: /sys + - name: run + mountPath: /run + - name: proc-host + mountPath: /proc-host + readOnly: true + - name: stateful-partition + mountPath: /mnt/stateful_partition + - name: lib-modules + mountPath: /lib/modules + readOnly: true + volumes: + - name: dev + hostPath: + path: /dev + - name: sys + hostPath: + path: /sys + - name: run + hostPath: + path: /run + - name: proc-host + hostPath: + path: /proc + - name: stateful-partition + hostPath: + path: /mnt/stateful_partition + type: DirectoryOrCreate + - name: lib-modules + hostPath: + path: /lib/modules + type: Directory diff --git a/perfkitbenchmarker/linux_benchmarks/swap_encryption_benchmark.py b/perfkitbenchmarker/linux_benchmarks/swap_encryption_benchmark.py new file mode 100644 index 0000000000..d5b4ec08db --- /dev/null +++ b/perfkitbenchmarker/linux_benchmarks/swap_encryption_benchmark.py @@ -0,0 +1,285 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""swap_encryption_benchmark: verifies encrypted swap on GKE/EKS nodepools. + +Architecture: + BENCHMARK_CONFIG declares a 'benchmark' nodepool with swap_config. + GkeCluster._AddNodeParamsToCmd() reads nodepool_config.swap_config and + applies --system-config-from-file (linuxConfig.swapConfig + sysctl) + sets + UBUNTU_CONTAINERD + boot-disk-provisioned-iops/throughput automatically + during cluster creation. No separate nodepool lifecycle management needed. + + Prepare() deploys a privileged SwapDaemonSet on the swap-enabled nodepool + for in-pod benchmark execution (fio / stress-ng / kernel build in later PRs). + + Run() verifies swap is active and dm-crypt encryption is configured, then + reports swap device metadata as PKB samples. + + Cleanup() is empty — PKB auto-deletes spec.resources (SwapDaemonSet). + +Subsequent PRs add phases: + PR3: fio microbenchmarks on raw swap device (Tier 1) + PR4: stress-ng CPU overhead + I/O interference (Tier 2) + PR5: kernel build under cgroup memory constraint (Phase 3b) +""" + +import logging +from typing import Any + +from absl import flags +from perfkitbenchmarker import benchmark_spec +from perfkitbenchmarker import configs +from perfkitbenchmarker import sample +from perfkitbenchmarker.resources.container_service import swap_daemonset + +FLAGS = flags.FLAGS + +BENCHMARK_NAME = 'swap_encryption' +BENCHMARK_CONFIG = """ +swap_encryption: + description: > + Verify dm-crypt encrypted swap on GKE/EKS. Subsequent PRs add fio, + stress-ng, and kernel build phases. + container_cluster: + cloud: GCP + type: Kubernetes + vm_count: 1 + vm_spec: + GCP: + machine_type: e2-medium + zone: us-central1-a + nodepools: + benchmark: + vm_count: 1 + vm_spec: + GCP: + machine_type: n4-highmem-32 + boot_disk_type: hyperdisk-balanced + boot_disk_size: 500 + zone: us-central1-a + swap_config: + enabled: true + swappiness: 100 + min_free_kbytes: 200 + watermark_scale_factor: 500 + boot_disk_iops: 160000 + boot_disk_throughput: 2400 +""" + +_MACHINE_TYPE = flags.DEFINE_string( + 'swap_encryption_machine_type', + None, + 'Override machine type for the benchmark nodepool.', +) +_DISK_TYPE = flags.DEFINE_string( + 'swap_encryption_disk_type', + None, + 'Override disk type for the benchmark nodepool.', +) + +_DAEMONSET_IMAGE = flags.DEFINE_string( + 'swap_encryption_daemonset_image', + 'ubuntu:22.04', + 'Container image for the privileged benchmark DaemonSet.', +) + +_BenchmarkSpec = benchmark_spec.BenchmarkSpec +_BENCHMARK_NODEPOOL = 'benchmark' +_DEFAULT_POOL = 'default-pool' +_DS_NAME = 'pkb-swap-benchmark' +_DS_NAMESPACE = 'default' +_DS_LABEL = 'pkb-swap-benchmark' + + +def GetConfig(user_config: dict[str, Any]) -> dict[str, Any]: + """Load and return benchmark config spec.""" + config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) + nodepool = config['container_cluster']['nodepools'][_BENCHMARK_NODEPOOL] + if _MACHINE_TYPE.value: + for cloud in nodepool['vm_spec']: + nodepool['vm_spec'][cloud]['machine_type'] = _MACHINE_TYPE.value + if _DISK_TYPE.value: + for cloud in nodepool['vm_spec']: + nodepool['vm_spec'][cloud]['boot_disk_type'] = _DISK_TYPE.value + return config + + +def CheckPrerequisites(_) -> None: + """Verifies that benchmark setup is correct.""" + + +def Prepare(spec: _BenchmarkSpec) -> None: + """Deploys the privileged benchmark DaemonSet on the swap-enabled nodepool. + + The swap-enabled 'benchmark' nodepool is already created by GKE cluster + creation (swap_config declared in BENCHMARK_CONFIG). Prepare() deploys the + privileged DaemonSet used for in-pod command execution across all phases. + + After the DaemonSet pod is Running the dummy e2-medium default-pool is + deleted to stop its cost. + + Args: + spec: PKB BenchmarkSpec with spec.container_cluster already created. + """ + cluster = spec.container_cluster + daemonset = swap_daemonset.SwapDaemonSet( + name=_DS_NAME, + namespace=_DS_NAMESPACE, + label=_DS_LABEL, + nodepool=_BENCHMARK_NODEPOOL, + image=_DAEMONSET_IMAGE.value, + ) + daemonset.Create() + spec.resources.append(daemonset) + pod = daemonset.WaitForPod() + logging.info('[swap_encryption] Benchmark pod ready: %s', pod) + _delete_default_pool(cluster) + + +def Run(spec: _BenchmarkSpec) -> list[sample.Sample]: + """Verify swap is active and dm-crypt encryption is configured. + + Returns: + PKB samples: swap_active, swap_encrypted, swap_cipher, swap_total_kb. + """ + daemonset = _get_daemonset(spec) + daemonset.WaitForPod() + daemonset.oom_events.clear() + daemonset.pod_lost.clear() + + swap_dev = _detect_swap_device(daemonset) + base_meta = _build_metadata(daemonset, swap_dev) + results: list[sample.Sample] = [] + + # ── Verify swap is active ────────────────────────────────────────────────── + try: + swap_out, _ = daemonset.PodExec('cat /proc/swaps') + active = any( + l and not l.startswith('Filename') for l in swap_out.splitlines() + ) + results.append(sample.Sample('swap_active', int(active), 'bool', base_meta)) + logging.info('[swap_encryption] swap_active=%s /proc/swaps:\n%s', active, swap_out) + except Exception as e: # pylint: disable=broad-except + logging.warning('[swap_encryption] Could not read /proc/swaps: %s', e) + + # ── Verify dm-crypt encryption ───────────────────────────────────────────── + if swap_dev: + try: + dm_out, _ = daemonset.PodExec( + f'dmsetup status {swap_dev} 2>/dev/null || echo not_encrypted' + ) + encrypted = 'crypt' in dm_out.lower() + cipher = _parse_cipher(dm_out) + meta = {**base_meta, 'dmsetup_status': dm_out.strip()[:200]} + results.append(sample.Sample('swap_encrypted', int(encrypted), 'bool', meta)) + if cipher: + results.append(sample.Sample('swap_cipher', 0, cipher, meta)) + logging.info('[swap_encryption] encrypted=%s cipher=%s', encrypted, cipher) + except Exception as e: # pylint: disable=broad-except + logging.warning('[swap_encryption] dm-crypt check failed: %s', e) + + # ── Swap size ────────────────────────────────────────────────────────────── + try: + sz_out, _ = daemonset.PodExec( + "awk '/^SwapTotal/ {print $2}' /proc/meminfo" + ) + swap_kb = int(sz_out.strip() or '0') + results.append(sample.Sample('swap_total_kb', swap_kb, 'KB', base_meta)) + logging.info( + '[swap_encryption] SwapTotal: %d KB (%.1f GiB)', + swap_kb, swap_kb / 1024 / 1024, + ) + except Exception as e: # pylint: disable=broad-except + logging.warning('[swap_encryption] Could not read SwapTotal: %s', e) + + if daemonset.oom_events: + results.append( + sample.Sample('oom_events', len(daemonset.oom_events), 'count', base_meta) + ) + return results + + +def Cleanup(_: _BenchmarkSpec) -> None: + """Empty — PKB auto-deletes spec.resources (SwapDaemonSet).""" + + +# ── Helpers ──────────────────────────────────────────────────────────────────── + + +def _get_daemonset(spec: _BenchmarkSpec) -> swap_daemonset.SwapDaemonSet: + for r in spec.resources: + if isinstance(r, swap_daemonset.SwapDaemonSet): + return r + raise RuntimeError('[swap_encryption] SwapDaemonSet not found in spec.resources') + + +def _detect_swap_device(ds: swap_daemonset.SwapDaemonSet) -> str: + """Return the first active swap device name (e.g. 'dm-0') or ''.""" + try: + out, _ = ds.PodExec("awk 'NR>1 {print $1}' /proc/swaps") + dev = out.strip().split('\n')[0].strip() + return dev.split('/')[-1] if dev else '' + except Exception as e: # pylint: disable=broad-except + logging.warning('[swap_encryption] _detect_swap_device: %s', e) + return '' + + +def _build_metadata( + ds: swap_daemonset.SwapDaemonSet, swap_dev: str +) -> dict[str, Any]: + """Build base metadata dict for all samples.""" + meta: dict[str, Any] = {'swap_device': swap_dev or 'unknown'} + try: + kver, _ = ds.PodExec('uname -r') + meta['kernel_version'] = kver.strip() + except Exception: # pylint: disable=broad-except + pass + return meta + + +def _parse_cipher(dmsetup_status: str) -> str: + """Extract cipher name from dmsetup status output.""" + parts = dmsetup_status.split() + try: + idx = parts.index('crypt') + return parts[idx + 1] if idx + 1 < len(parts) else '' + except ValueError: + return '' + + +def _delete_default_pool(cluster) -> None: + """Delete the dummy e2-medium default-pool once the benchmark pod is Running. + + GKE requires at least one nodepool at cluster creation time; the e2-medium + default-pool satisfies that requirement. Deleting it before the DaemonSet + pod is Running can trigger a brief API-server timeout while two concurrent + nodepool operations are in progress. + """ + try: + cmd = cluster._GcloudCommand( # pylint: disable=protected-access + 'container', 'node-pools', 'delete', _DEFAULT_POOL, + '--cluster', cluster.name, + ) + cmd.args.append('--quiet') + logging.info('[swap_encryption] Deleting default nodepool: %s', _DEFAULT_POOL) + _, stderr, rc = cmd.Issue(timeout=300, raise_on_failure=False) + if rc != 0: + logging.warning( + '[swap_encryption] Could not delete default nodepool (rc=%d): %s', + rc, stderr, + ) + else: + logging.info('[swap_encryption] Default nodepool deleted') + except Exception as e: # pylint: disable=broad-except + logging.warning('[swap_encryption] _delete_default_pool failed: %s', e) diff --git a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py index f943a53ff1..86d8d7142a 100644 --- a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py +++ b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py @@ -37,6 +37,7 @@ from perfkitbenchmarker.resources.container_service import kubectl from perfkitbenchmarker.resources.container_service import kubernetes_cluster from perfkitbenchmarker.resources.container_service import kubernetes_commands +from perfkitbenchmarker.resources.container_service import swap_config as swap_config_lib FLAGS = flags.FLAGS @@ -428,11 +429,15 @@ def _CreateNodePools(self): cmd = self._GcloudCommand( 'container', 'node-pools', 'create', name, '--cluster', self.name ) - self._AddNodeParamsToCmd( - nodepool, - cmd, - ) - self._IssueResourceCreationCommand(cmd) + self._AddNodeParamsToCmd(nodepool, cmd) + # If swap_config wrote a linuxConfig tempfile, clean it up after Issue(). + swap_cfg = getattr(nodepool, '_gke_swap_config', None) + try: + self._IssueResourceCreationCommand(cmd) + finally: + if swap_cfg is not None: + swap_cfg.CleanupYaml() + nodepool._gke_swap_config = None # pylint: disable=protected-access self._CreateCustomComputeClass(nodepool) def _CreateCustomComputeClass( @@ -570,13 +575,30 @@ def _AddNodeParamsToCmd( ): cmd.args.append('--enable-fast-socket') - if FLAGS.gke_node_system_config is not None: + # Per-nodepool swap config takes precedence over the global flag. + if nodepool_config.swap_config is not None: + gke_swap = swap_config_lib.GkeSwapConfig.from_spec(nodepool_config.swap_config) + cmd.flags['system-config-from-file'] = gke_swap.WriteLinuxConfigYaml() + # Store on nodepool so _CreateNodePools() can clean up the tempfile. + nodepool_config._gke_swap_config = gke_swap # pylint: disable=protected-access + # dm-crypt requires UBUNTU_CONTAINERD (Ajay r3472549985). + cmd.flags['image-type'] = 'UBUNTU_CONTAINERD' + # Prevent GKE from replacing the node after swap setup is complete. + cmd.args.append('--no-enable-autorepair') + sc = nodepool_config.swap_config + if sc.boot_disk_iops and not sc.lssd: + cmd.flags['boot-disk-provisioned-iops'] = sc.boot_disk_iops + cmd.flags['boot-disk-provisioned-throughput'] = ( + gke_swap.ValidHyperdiskThroughput() + ) + elif FLAGS.gke_node_system_config is not None: + # Fall back to global flag when no per-nodepool swap config is set. cmd.flags['system-config-from-file'] = FLAGS.gke_node_system_config if nodepool_config.sandbox_config is not None: cmd.flags['sandbox'] = nodepool_config.sandbox_config.ToSandboxFlag() - if self.image_type: + if self.image_type and 'image-type' not in cmd.flags: cmd.flags['image-type'] = self.image_type cmd.flags['node-labels'] = f'pkb_nodepool={nodepool_config.name}' diff --git a/perfkitbenchmarker/resources/container_service/container.py b/perfkitbenchmarker/resources/container_service/container.py index 3e05a1ec2b..b652eaab32 100644 --- a/perfkitbenchmarker/resources/container_service/container.py +++ b/perfkitbenchmarker/resources/container_service/container.py @@ -187,6 +187,10 @@ def __init__( # Defined by GceVirtualMachineConfig. Used by google_kubernetes_engine # pylint: disable=g-missing-from-attributes self.sandbox_config: container_spec_lib.SandboxSpec | None = None + # Set by container_cluster._InitializeNodePool() when NodepoolSpec + # declares swap_config. Consumed by _AddNodeParamsToCmd() in the cloud + # provider to apply swap configuration during nodepool creation. + self.swap_config: container_spec_lib.SwapConfigSpec | None = None self.max_local_disks: int | None self.ssd_interface: str | None self.threads_per_core: int diff --git a/perfkitbenchmarker/resources/container_service/container_cluster.py b/perfkitbenchmarker/resources/container_service/container_cluster.py index 9458662c98..ed67ff7adb 100644 --- a/perfkitbenchmarker/resources/container_service/container_cluster.py +++ b/perfkitbenchmarker/resources/container_service/container_cluster.py @@ -116,6 +116,7 @@ def _InitializeNodePool( nodepool_spec.machine_families, ) nodepool_config.sandbox_config = nodepool_spec.sandbox_config + nodepool_config.swap_config = nodepool_spec.swap_config nodepool_config.zone = zone nodepool_config.num_nodes = nodepool_spec.vm_count if nodepool_spec.min_vm_count is None: diff --git a/perfkitbenchmarker/resources/container_service/swap_config.py b/perfkitbenchmarker/resources/container_service/swap_config.py new file mode 100644 index 0000000000..ca36dbad8b --- /dev/null +++ b/perfkitbenchmarker/resources/container_service/swap_config.py @@ -0,0 +1,320 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Swap configuration as PKB BaseResource: BaseSwapConfig, GkeSwapConfig, EksSwapConfig. + +These resources encapsulate cloud-specific swap configuration for GKE and EKS +nodepools. They are referenced via NodepoolSpec.swap_config (declared in the +benchmark BENCHMARK_CONFIG YAML) and consumed by the cloud provider's +_AddNodeParamsToCmd() during cluster/nodepool creation. + +Class hierarchy: + BaseSwapConfig(BaseResource) — common sysctl attrs + abstract from_spec() + GkeSwapConfig(BaseSwapConfig) — linuxConfig YAML for --system-config-from-file + EksSwapConfig(BaseSwapConfig) — nodeadm kubelet config (deferred to PR #6780) + +Usage in BENCHMARK_CONFIG: + container_cluster: + nodepools: + benchmark: + vm_spec: + GCP: + machine_type: n4-highmem-32 + boot_disk_type: hyperdisk-balanced + boot_disk_size: 500 + swap_config: + enabled: true + swappiness: 100 + min_free_kbytes: 200 + watermark_scale_factor: 500 + boot_disk_iops: 160000 + boot_disk_throughput: 2400 + +GkeCluster._AddNodeParamsToCmd() creates a GkeSwapConfig from the +SwapConfigSpec and calls WriteLinuxConfigYaml() to obtain the path for +--system-config-from-file. No separate resource.Create() call is needed +for the swap config itself — it is applied as part of nodepool creation. +""" + +import logging +import os +import tempfile + +from perfkitbenchmarker import resource + +# GCP Hyperdisk Balanced constraint: provisioned_iops <= 256 × throughput_MiB_s. +_HYPERDISK_MAX_IOPS_PER_MBPS = 256 + + +class BaseSwapConfig(resource.BaseResource): + """Abstract base class for cloud-specific nodepool swap configuration. + + Subclasses (GkeSwapConfig, EksSwapConfig) implement the cloud-specific + method for applying swap configuration during nodepool creation. + + Common sysctl attributes (vm.swappiness, vm.min_free_kbytes, + vm.watermark_scale_factor) are shared across all cloud providers. + + _Create() and _Delete() are no-ops: the swap config is applied as a + parameter to nodepool creation, not as a standalone cloud resource. + """ + + RESOURCE_TYPE = 'BaseSwapConfig' + REQUIRED_ATTRS = [] + + def __init__( + self, + swappiness: int = 100, + min_free_kbytes: int = 200, + watermark_scale_factor: int = 500, + ) -> None: + super().__init__() + self.swappiness = swappiness + self.min_free_kbytes = min_free_kbytes + self.watermark_scale_factor = watermark_scale_factor + + @classmethod + def from_spec(cls, swap_spec) -> 'BaseSwapConfig': + """Create a BaseSwapConfig subclass from a SwapConfigSpec. + + Subclasses must override this to instantiate with cloud-specific attrs. + """ + raise NotImplementedError( + f'{cls.__name__}.from_spec() must be implemented by subclasses.' + ) + + def _Create(self) -> None: + """No-op: swap config is applied during nodepool creation.""" + + def _Delete(self) -> None: + """No-op: cleaned up when the nodepool is deleted.""" + + +class GkeSwapConfig(BaseSwapConfig): + """GKE swap configuration for a nodepool. + + Encapsulates the linuxConfig (swapConfig + sysctl) YAML for + --system-config-from-file and optional Hyperdisk IOPS/throughput overrides. + + Consumed by GkeCluster._AddNodeParamsToCmd() when nodepool_config.swap_config + is set. + + Attributes: + swappiness: vm.swappiness sysctl value (0-200, default 100). + min_free_kbytes: vm.min_free_kbytes sysctl (default 200). + watermark_scale_factor: vm.watermark_scale_factor sysctl (default 500). + lssd: True if the nodepool uses local NVMe SSDs for swap device. + lssd_count: Number of local NVMe SSDs (dedicatedLocalSsdProfile.diskCount). + boot_disk_iops: Provisioned IOPS for hyperdisk-balanced (0 = not set). + boot_disk_throughput: Provisioned throughput MiB/s for hyperdisk-balanced. + """ + + RESOURCE_TYPE = 'GkeSwapConfig' + REQUIRED_ATTRS = [] + + def __init__( + self, + swappiness: int = 100, + min_free_kbytes: int = 200, + watermark_scale_factor: int = 500, + lssd: bool = False, + lssd_count: int = 0, + boot_disk_iops: int = 0, + boot_disk_throughput: int = 0, + ) -> None: + super().__init__( + swappiness=swappiness, + min_free_kbytes=min_free_kbytes, + watermark_scale_factor=watermark_scale_factor, + ) + self.lssd = lssd + self.lssd_count = lssd_count + self.boot_disk_iops = boot_disk_iops + self.boot_disk_throughput = boot_disk_throughput + self._yaml_path: str | None = None + + @classmethod + def from_spec(cls, swap_spec) -> 'GkeSwapConfig': + """Create a GkeSwapConfig from a SwapConfigSpec decoded from BENCHMARK_CONFIG.""" + return cls( + swappiness=swap_spec.swappiness, + min_free_kbytes=swap_spec.min_free_kbytes, + watermark_scale_factor=swap_spec.watermark_scale_factor, + lssd=swap_spec.lssd, + lssd_count=swap_spec.lssd_count, + boot_disk_iops=swap_spec.boot_disk_iops, + boot_disk_throughput=swap_spec.boot_disk_throughput, + ) + + def _Delete(self) -> None: + """Cleans up any written YAML tempfile.""" + self._CleanupYaml() + + def WriteLinuxConfigYaml(self) -> str: + """Write the GKE linuxConfig YAML to a tempfile; return the path. + + Called by GkeCluster._AddNodeParamsToCmd() to supply + --system-config-from-file. The caller is responsible for deleting the + tempfile via CleanupYaml() after the gcloud command completes. + + Per Ajay review r3472513706: + linuxConfig.swapConfig.enabled=true automatically sets + kubeletConfig.memorySwapBehavior=LimitedSwap — no need to set + kubeletConfig explicitly. + For LSSD machines, dedicatedLocalSsdProfile.diskCount instructs GKE to + use local NVMe as the swap device. + + Returns: + Absolute path to the written tempfile. + """ + if self.lssd and self.lssd_count > 0: + swap_block = ( + ' swapConfig:\n' + ' enabled: true\n' + ' dedicatedLocalSsdProfile:\n' + f' diskCount: {self.lssd_count}\n' + ) + else: + swap_block = ' swapConfig:\n enabled: true\n' + + yaml_content = ( + 'linuxConfig:\n' + + swap_block + + ' sysctl:\n' + + f' vm.swappiness: {self.swappiness}\n' + + f' vm.min_free_kbytes: {self.min_free_kbytes}\n' + + f' vm.watermark_scale_factor: {self.watermark_scale_factor}\n' + ) + + tmp = tempfile.NamedTemporaryFile( + mode='w', suffix='.yaml', delete=False + ) + try: + tmp.write(yaml_content) + tmp.flush() + self._yaml_path = tmp.name + finally: + tmp.close() + + logging.info( + '[swap_config] Wrote linuxConfig YAML (lssd=%s, lssd_count=%d)' + ' to %s:\n%s', + self.lssd, + self.lssd_count, + self._yaml_path, + yaml_content, + ) + return self._yaml_path + + def ValidHyperdiskThroughput(self) -> int: + """Return clamped throughput satisfying GCP Hyperdisk Balanced constraints. + + GCP Hyperdisk Balanced requires: provisioned_iops <= 256 × throughput_MiB_s. + Clamps throughput UP so a mismatched pair cannot abort nodepool creation. + """ + if not self.boot_disk_iops or not self.boot_disk_throughput: + return self.boot_disk_throughput + min_tput = -(-int(self.boot_disk_iops) // _HYPERDISK_MAX_IOPS_PER_MBPS) + if self.boot_disk_throughput < min_tput: + logging.warning( + '[swap_config] boot disk throughput %d MiB/s too low for %d IOPS;' + ' clamping to minimum %d MiB/s', + self.boot_disk_throughput, + self.boot_disk_iops, + min_tput, + ) + return min_tput + return self.boot_disk_throughput + + def CleanupYaml(self) -> None: + """Delete the linuxConfig tempfile if it was written.""" + if self._yaml_path and os.path.exists(self._yaml_path): + try: + os.unlink(self._yaml_path) + logging.info( + '[swap_config] Cleaned up YAML tempfile: %s', self._yaml_path + ) + except OSError: + pass + self._yaml_path = None + + def _CleanupYaml(self) -> None: + self.CleanupYaml() + + +class EksSwapConfig(BaseSwapConfig): + """EKS swap configuration for a nodepool (stub). + + Configures kubelet LimitedSwap via nodeadm bootstrap configuration. + Full implementation deferred to PR #6780. + + Attributes: + swappiness: vm.swappiness sysctl value (inherited from BaseSwapConfig). + min_free_kbytes: vm.min_free_kbytes sysctl (inherited from BaseSwapConfig). + watermark_scale_factor: vm.watermark_scale_factor (inherited from BaseSwapConfig). + memory_swap_behavior: kubelet memorySwapBehavior value ('LimitedSwap'). + fail_swap_on: kubelet failSwapOn setting (False to allow swap on EKS). + """ + + RESOURCE_TYPE = 'EksSwapConfig' + REQUIRED_ATTRS = [] + + def __init__( + self, + swappiness: int = 100, + min_free_kbytes: int = 200, + watermark_scale_factor: int = 500, + memory_swap_behavior: str = 'LimitedSwap', + fail_swap_on: bool = False, + ) -> None: + super().__init__( + swappiness=swappiness, + min_free_kbytes=min_free_kbytes, + watermark_scale_factor=watermark_scale_factor, + ) + self.memory_swap_behavior = memory_swap_behavior + self.fail_swap_on = fail_swap_on + + @classmethod + def from_spec(cls, swap_spec) -> 'EksSwapConfig': + """Create an EksSwapConfig from a SwapConfigSpec.""" + return cls( + swappiness=swap_spec.swappiness, + min_free_kbytes=swap_spec.min_free_kbytes, + watermark_scale_factor=swap_spec.watermark_scale_factor, + ) + + def _Create(self) -> None: + """Stub: EKS kubelet LimitedSwap config via nodeadm (deferred to PR #6780).""" + logging.warning( + '[swap_config] EksSwapConfig._Create() is a stub. ' + 'EKS kubelet LimitedSwap config via nodeadm not yet implemented ' + '(deferred to PR #6780). Swap will not be enabled on EKS nodes.' + ) + + def GetNodeadmConfig(self) -> str: + """Return nodeadm bootstrap YAML for kubelet swap settings.""" + return ( + 'apiVersion: node.eks.aws/v1alpha1\n' + 'kind: NodeConfig\n' + 'spec:\n' + ' kubelet:\n' + ' config:\n' + f' memorySwapBehavior: {self.memory_swap_behavior}\n' + f' failSwapOn: {str(self.fail_swap_on).lower()}\n' + ' containerd:\n' + ' config:\n' + f' vm.swappiness: {self.swappiness}\n' + f' vm.min_free_kbytes: {self.min_free_kbytes}\n' + f' vm.watermark_scale_factor: {self.watermark_scale_factor}\n' + ) diff --git a/perfkitbenchmarker/resources/container_service/swap_daemonset.py b/perfkitbenchmarker/resources/container_service/swap_daemonset.py new file mode 100644 index 0000000000..48e3b9c890 --- /dev/null +++ b/perfkitbenchmarker/resources/container_service/swap_daemonset.py @@ -0,0 +1,609 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""SwapDaemonSet: PKB BaseResource for the swap-encryption privileged DaemonSet. + +Manages the full lifecycle of the privileged benchmark pod used by the +swap_encryption benchmark: + + _Create() — apply the Jinja2 manifest via kubernetes_commands.ApplyManifest + and wait for the pod to reach Running + /tmp/pkb_ready. + _Delete() — run in-pod cleanup (swapoff, dmsetup remove, losetup teardown, + pkill fio/stress-ng) then kubectl delete daemonset. + PodExec() — kubectl exec wrapper with transient-reset retry, OOM-kill (rc=137) + detection, and automatic RecoverPod() after eviction or container + restart. + WaitForPod() — polls for Running phase + sentinel; updates self.pod_name. + RecoverPod() — waits for DaemonSet to recreate / restart the container, + checking deletionTimestamp to avoid false-positive Running state. + +Extracted from swap_encryption_benchmark.py to satisfy PKB resource pattern +(go/pkb-resources): infrastructure lifecycle belongs in BaseResource subclasses, +not in benchmark files. +""" + +import logging +import textwrap +import time +from typing import Optional + +from perfkitbenchmarker import errors +from perfkitbenchmarker import resource +from perfkitbenchmarker.resources.container_service import kubectl +from perfkitbenchmarker.resources.container_service import kubernetes_commands + +# Transient kubectl errors that are safe to retry automatically. +_TRANSIENT_KUBECTL_ERRORS = ('connection reset by peer', 'websocket: close') + +# Errors indicating the container / pod is gone and needs full recovery. +_CONTAINER_GONE_KUBECTL_ERRORS = ( + 'container not found', + 'procready not received', + 'unable to upgrade connection', + 'not found', + 'deleted state', +) + + +class SwapDaemonSet(resource.BaseResource): + """PKB resource for the swap-encryption benchmark privileged DaemonSet. + + The DaemonSet runs a single privileged pod on the benchmark nodepool. + It installs measurement tools (fio, cryptsetup, mdadm, sysstat, nvme-cli), + verifies the swap device is active, then writes /tmp/pkb_ready. All + benchmark phases execute commands inside this pod via PodExec(). + + Attributes: + name: DaemonSet metadata.name (e.g. 'pkb-swap-benchmark'). + namespace: Kubernetes namespace (typically 'default'). + label: Pod label value for app= selector. + nodepool: pkb_nodepool label value pinning the DaemonSet to the + benchmark node. + image: Container image (e.g. 'ubuntu:22.04'). + pod_name: Name of the currently active pod; updated by WaitForPod / + RecoverPod on eviction. + oom_events: Pod names that triggered rc=137 OOM-kill; read by Run() + for the degradation gate. + pod_lost: Pod names that went NotFound during PodExec; read by Run() + for the degradation gate. + """ + + RESOURCE_TYPE = 'SwapDaemonSet' + REQUIRED_ATTRS = [] + + def __init__( + self, + name: str, + namespace: str, + label: str, + nodepool: str, + image: str, + ) -> None: + super().__init__() + self.name = name + self.namespace = namespace + self.label = label + self.nodepool = nodepool + self.image = image + # Active pod tracking — updated by WaitForPod / RecoverPod. + self.pod_name: Optional[str] = None + # Per-run accumulators read by Run() for the degradation gate. + self.oom_events: list[str] = [] + self.pod_lost: list[str] = [] + + # ── PKB lifecycle ───────────────────────────────────────────────────────── + + def _Create(self) -> None: + """Apply the DaemonSet manifest and wait for the pod to be ready.""" + kubernetes_commands.ApplyManifest( + 'cluster/swap_encryption_daemonset.yaml.j2', + ds_name=self.name, + ds_namespace=self.namespace, + ds_label=self.label, + benchmark_nodepool=self.nodepool, + image=self.image, + ) + logging.info('[swap_encryption] Swap-infra DaemonSet applied') + pod = self.WaitForPod() + if pod is None: + raise errors.Benchmarks.PrepareException( + '[swap_encryption] DaemonSet pod did not become ready within' + ' timeout' + ) + + def _Delete(self) -> None: + """Run in-pod teardown then delete the DaemonSet. + + Runs swapoff, dmsetup remove, losetup cleanup, and pkill inside the + pod (best-effort, ignore_failure=True) before deleting the DaemonSet. + This mirrors the original Cleanup() logic so no swap state is leaked. + """ + # Try to get the pod name quickly if not set. + if self.pod_name is None: + self.WaitForPod(timeout=30) + + if self.pod_name: + self.PodExec( + 'swapoff -a 2>/dev/null || true', + ignore_failure=True, + _retries=0, + ) + self.PodExec( + textwrap.dedent("""\ + swapoff /dev/mapper/swap_encrypted 2>/dev/null || true + dmsetup remove --noudevrules --noudevsync \ + swap_encrypted 2>/dev/null || true + """), + ignore_failure=True, + _retries=0, + ) + self.PodExec( + textwrap.dedent("""\ + for backing in \ + /var/pkb_swap_backing \ + /run/pkb_swap_backing \ + /mnt/stateful_partition/pkb_swap_backing + do + losetup -j "$backing" 2>/dev/null \ + | awk -F: '{print $1}' \ + | while read dev + do losetup -d "$dev" 2>/dev/null || true; done + rm -f "$backing" + done + """), + ignore_failure=True, + _retries=0, + ) + self.PodExec( + "pkill -9 'stress-ng|fio' 2>/dev/null || true", + ignore_failure=True, + _retries=0, + ) + + kubectl.RunKubectlCommand( + [ + 'delete', + 'daemonset', + self.name, + '-n', + self.namespace, + '--ignore-not-found', + ], + raise_on_failure=False, + ) + logging.info('[swap_encryption] DaemonSet deleted') + + # ── Pod lifecycle helpers ───────────────────────────────────────────────── + + def WaitForPod(self, timeout: int = 600) -> Optional[str]: + """Wait until the DaemonSet pod is Running AND /tmp/pkb_ready exists. + + Two-phase poll: + 1. Wait for status.phase == Running. + 2. kubectl exec test -f /tmp/pkb_ready. + + The DaemonSet init script writes /tmp/pkb_ready only after verifying + the swap device is active (up to 150 s) and installing all measurement + tools (~1-2 min on cold APT cache). The default 600 s covers + worst-case APT latency on a freshly-booted node. + + Args: + timeout: Maximum seconds to wait. + + Returns: + Pod name on success; None on timeout. Also updates self.pod_name. + """ + deadline = time.time() + timeout + last_phase = '' + ready_pod = None + + while time.time() < deadline: + # Step 1: wait for Running phase. + if ready_pod is None: + out, _, rc = kubectl.RunKubectlCommand( + [ + 'get', + 'pods', + '-l', + f'app={self.label}', + '-n', + self.namespace, + '-o', + ( + r'jsonpath={range .items[*]}' + r'{.metadata.name}{"\t"}' + r'{.status.phase}{"\n"}{end}' + ), + ], + raise_on_failure=False, + ) + if rc == 0 and out.strip(): + for line in out.strip().splitlines(): + parts = line.split('\t') + if len(parts) == 2: + pod_name = parts[0].strip() + phase = parts[1].strip() + if phase == 'Running': + logging.info( + '[swap_encryption] Pod %s is Running' + ' — waiting for sentinel...', + pod_name, + ) + ready_pod = pod_name + break + if phase != last_phase: + logging.info( + '[swap_encryption] Pod %s phase: %s', + pod_name, + phase, + ) + last_phase = phase + if phase == 'Pending': + self._LogPodEvents(pod_name) + else: + logging.info( + '[swap_encryption] Waiting for DaemonSet pod to' + ' appear...' + ) + + # Step 2: poll for /tmp/pkb_ready sentinel. + if ready_pod is not None: + _, sentinel_err, sentinel_rc = kubectl.RunKubectlCommand( + [ + 'exec', + ready_pod, + '-n', + self.namespace, + '--', + 'test', + '-f', + '/tmp/pkb_ready', + ], + raise_on_failure=False, + ) + if sentinel_rc == 0: + logging.info( + '[swap_encryption] Pod %s ready (swap device active)', + ready_pod, + ) + self.pod_name = ready_pod + return ready_pod + # Container crashed (CrashLoopBackOff / exited) — reset and + # re-check pod phase on the next iteration. + if 'container not found' in sentinel_err or ( + 'unable to upgrade connection' in sentinel_err + ): + logging.warning( + '[swap_encryption] Pod %s: container not running' + ' (%s) — will re-check pod state', + ready_pod, + sentinel_err.strip(), + ) + ready_pod = None + last_phase = '' + else: + logging.info( + '[swap_encryption] Pod %s: still installing tools...', + ready_pod, + ) + + time.sleep(15) + + logging.warning( + '[swap_encryption] Benchmark pod not ready after %ds', timeout + ) + return None + + def _LogPodEvents(self, pod_name: str) -> None: + """Dump recent Kubernetes events for a pod to help diagnose hangs.""" + events_out, _, _ = kubectl.RunKubectlCommand( + ['describe', 'pod', pod_name, '-n', self.namespace], + raise_on_failure=False, + ) + in_events = False + lines = [] + for line in events_out.splitlines(): + if line.startswith('Events:'): + in_events = True + if in_events: + lines.append(line) + if lines: + logging.info( + '[swap_encryption] Pod events:\n%s', '\n'.join(lines[:30]) + ) + else: + logging.info( + '[swap_encryption] kubectl describe output:\n%s', + events_out[-2000:] if len(events_out) > 2000 else events_out, + ) + + def _IsPodGone(self, pod: str) -> bool: + """Return True if the named pod no longer exists in the cluster.""" + try: + _, err, rc = kubectl.RunKubectlCommand( + [ + 'get', + 'pod', + pod, + '-n', + self.namespace, + '-o', + 'jsonpath={.metadata.name}', + ], + raise_on_failure=False, + timeout=15, + ) + return rc != 0 and 'not found' in (err or '').lower() + except Exception: # pylint: disable=broad-except + return False + + def PodExec( + self, + cmd: str, + ignore_failure: bool = False, + timeout: int = 300, + _retries: int = 2, + ) -> tuple[str, str]: + """Run a shell command inside the benchmark pod via kubectl exec. + + Handles: + - Transient GKE websocket resets: automatic retry (up to _retries). + - OOM kill (rc=137): records to self.oom_events, calls RecoverPod, + does NOT retry the OOM-triggering command itself. + - Container/pod gone: records to self.pod_lost, calls RecoverPod, + retries the command on the recovered pod. + + Uses self.pod_name as the active pod; RecoverPod updates it on eviction. + + Args: + cmd: Shell command string passed to bash -c. + ignore_failure: When True, non-zero exit codes are logged but not + raised. + timeout: Seconds before PKB kills the kubectl exec process. Pass a + larger value for long-running jobs (fio, stress-ng, kernel build). + _retries: Max automatic retries on transient websocket resets. + + Returns: + Tuple of (stdout, stderr) strings. + """ + active = self.pod_name + + for attempt in range(_retries + 1): + out, err, rc = kubectl.RunKubectlCommand( + [ + 'exec', + active, + '-n', + self.namespace, + '--', + 'bash', + '-c', + cmd, + ], + raise_on_failure=False, + raise_on_timeout=False, + timeout=timeout, + ) + + # Retry transient GKE websocket resets. + is_transient = rc != 0 and any( + e in err for e in _TRANSIENT_KUBECTL_ERRORS + ) + if is_transient and attempt < _retries: + logging.warning( + '[swap_encryption] kubectl exec connection reset (attempt' + ' %d/%d); retrying in 10 s', + attempt + 1, + _retries + 1, + ) + time.sleep(10) + continue + + # rc=137 (SIGKILL): OOM killer terminated the container process. + # Do NOT retry — log, recover, and return so the caller can decide. + if rc == 137: + if active not in self.oom_events: + self.oom_events.append(active) + # Kubernetes takes a few seconds to update pod state after + # eviction — sleep before checking to avoid false-positive Running. + logging.warning( + '[swap_encryption] rc=137 — sleeping 15 s for Kubernetes' + ' to update pod state before recovery check' + ) + time.sleep(15) + if self._IsPodGone(active): + logging.warning( + '[swap_encryption] OOM-eviction detected (rc=137, pod' + ' gone) — recovering pod name for subsequent commands' + ) + else: + logging.warning( + '[swap_encryption] Container OOM-killed (rc=137, pod' + ' still exists) — waiting for container restart' + ) + new_pod = self.RecoverPod(active) + if new_pod != active: + logging.info( + '[swap_encryption] Pod name updated: %s → %s', + active, + new_pod, + ) + self.pod_name = new_pod + active = new_pod + break # OOM cmd is never re-run on the recovered pod. + + # Container or pod gone: record loss, try RecoverPod, retry cmd. + is_container_gone = rc != 0 and any( + e in err.lower() for e in _CONTAINER_GONE_KUBECTL_ERRORS + ) + if is_container_gone: + if active and active not in self.pod_lost: + self.pod_lost.append(active) + logging.error( + '[swap_encryption] Benchmark pod %s is gone (%s) —' + ' recording run as degraded', + active, + (err or '').strip()[:160], + ) + if attempt < _retries: + logging.warning( + '[swap_encryption] Container gone/restarting (attempt' + ' %d/%d) — waiting for pod to recover...', + attempt + 1, + _retries + 1, + ) + new_pod = self.RecoverPod(active) + if new_pod != active: + logging.info( + '[swap_encryption] Pod name updated: %s → %s', + active, + new_pod, + ) + self.pod_name = new_pod + active = new_pod + continue + break + + if rc != 0 and not ignore_failure: + raise errors.VmUtil.IssueCommandError( + f'[swap_encryption] PodExec failed (rc={rc}): {err}' + ) + return out, err + + def RecoverPod(self, pod: str, timeout_sec: int = 600) -> str: + """Wait for the DaemonSet to recover after OOM kill or eviction. + + Handles two scenarios: + 1. Container OOM restart: same pod name, container restarting in + place (DaemonSet restartPolicy=Always). + 2. Pod eviction/deletion: pod is gone; DaemonSet creates a new pod + with a DIFFERENT name. + + Checks metadata.deletionTimestamp in addition to status.phase to + catch the Terminating state where phase may still read Running. + + Args: + pod: Original pod name to monitor. + timeout_sec: Maximum seconds to wait for recovery. + + Returns: + The (possibly new) pod name once Running and /tmp/pkb_ready is + present. + """ + deadline = time.time() + timeout_sec + logging.info( + '[swap_encryption] Waiting for pod %s to recover (up to %ds)...', + pod, + timeout_sec, + ) + + # Phase 1: find a Running pod that is NOT being terminated. + recovered_pod = pod + while time.time() < deadline: + # Query both phase and deletionTimestamp in a single call. + status_out, status_err, status_rc = kubectl.RunKubectlCommand( + [ + 'get', + 'pod', + pod, + '-n', + self.namespace, + '-o', + 'jsonpath={.status.phase}|{.metadata.deletionTimestamp}', + ], + raise_on_failure=False, + timeout=30, + ) + fields = status_out.strip().split('|') + phase = fields[0].strip() if fields else '' + is_terminating = len(fields) > 1 and bool(fields[1].strip()) + + # Genuine Running (not being deleted) — move to Phase 2. + if status_rc == 0 and phase == 'Running' and not is_terminating: + break + + # Pod gone or Terminating — look for a replacement by label. + pod_gone_or_terminating = ( + status_rc != 0 + and 'not found' in (status_out + status_err).lower() + ) or is_terminating + if pod_gone_or_terminating: + label_out, _, label_rc = kubectl.RunKubectlCommand( + [ + 'get', + 'pods', + '-n', + self.namespace, + '-l', + f'app={self.label}', + '-o', + ( + 'jsonpath={range' + ' .items[?(@.status.phase=="Running")]}' + '{.metadata.name}{"\\n"}{end}' + ), + ], + raise_on_failure=False, + timeout=30, + ) + new_pods = [ + p.strip() + for p in label_out.strip().splitlines() + if p.strip() and p.strip() != pod + ] + if label_rc == 0 and new_pods: + recovered_pod = new_pods[0] + logging.info( + '[swap_encryption] Original pod %s gone/terminating;' + ' found replacement %s', + pod, + recovered_pod, + ) + break + + time.sleep(10) + else: + raise errors.VmUtil.IssueCommandError( + f'[swap_encryption] No Running pod found (original: {pod})' + f' within {timeout_sec}s after OOM kill / eviction' + ) + + # Phase 2: wait for init script to finish (sentinel written last). + while time.time() < deadline: + ready_out, _, ready_rc = kubectl.RunKubectlCommand( + [ + 'exec', + recovered_pod, + '-n', + self.namespace, + '--', + 'bash', + '-c', + 'test -f /tmp/pkb_ready && echo READY', + ], + raise_on_failure=False, + timeout=30, + ) + if ready_rc == 0 and 'READY' in ready_out: + logging.info( + '[swap_encryption] Pod %s recovered (swap device active)', + recovered_pod, + ) + self.pod_name = recovered_pod + return recovered_pod + time.sleep(15) + + raise errors.VmUtil.IssueCommandError( + f'[swap_encryption] Pod {recovered_pod} did not become ready' + f' within {timeout_sec}s after OOM kill / eviction' + )