Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 275 additions & 3 deletions perfkitbenchmarker/linux_benchmarks/kubernetes_management_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
# limitations under the License.
"""Benchmark for Kubernetes management plane operations.

Measures GKE/EKS/AKS control-plane API responsiveness via two scenarios:
Measures GKE/EKS/AKS control-plane API responsiveness via three scenarios:
concurrent_node_pool_ops: concurrent node-pool create/delete.
overlapping_cluster_update: node-pool create overlapping a cluster update.
large_scale_provisioning: large-scale node-pool provisioning (scale/sweep).

Optimizations for minimum run time:
- Streaming concurrency in large_scale_provisioning (no batch barriers)
- Reduced poll_interval in provider WaitForOperation (5s vs 10s)
- Per-op threads capped at _MAX_CONCURRENT to avoid OS limits
- Accurate delete success rate via attempted_ops denominator
Expand Down Expand Up @@ -66,9 +68,13 @@
# overlapping_cluster_update: run a cluster update and a node-pool create
# simultaneously; measures behaviour when a cluster-scoped op overlaps a
# node-pool-scoped one.
# large_scale_provisioning: create then delete a large number of node pools
# (optionally swept via --k8s_mgmt_scale_sweep); measures scaling limits
# and large-batch provisioning latency.
_VALID_SCENARIOS = frozenset({
"concurrent_node_pool_ops",
"overlapping_cluster_update",
"large_scale_provisioning",
})

# ── Shared flags (apply across all scenarios) ──
Expand All @@ -77,9 +83,11 @@
[
"concurrent_node_pool_ops",
"overlapping_cluster_update",
"large_scale_provisioning",
],
"Comma-separated subset of scenarios to run. Valid values: "
+ "concurrent_node_pool_ops, overlapping_cluster_update.",
+ "concurrent_node_pool_ops, overlapping_cluster_update, "
+ "large_scale_provisioning.",
)
_NODES_PER_NODEPOOL = flags.DEFINE_integer(
"k8s_mgmt_nodes_per_nodepool",
Expand All @@ -106,6 +114,24 @@
"Kubernetes version for newly-created node pools (N-1). None = auto.",
)

# ── large_scale_provisioning flags ──
_LARGE_SCALE_NODEPOOLS = flags.DEFINE_integer(
"k8s_mgmt_large_scale_nodepools",
1000,
"Number of node pools to provision in the large_scale_provisioning "
+ "scenario. Spec target is 1000; ensure VPC/quota is available before "
+ "running.",
)
_SCALE_SWEEP = flags.DEFINE_list(
"k8s_mgmt_scale_sweep",
[],
"Comma-separated list of node-pool counts for the large_scale_provisioning "
+ "scale sweep. Each scale runs as a separate sub-run with full "
+ "create/delete cycle. Example:"
" --k8s_mgmt_scale_sweep=10,50,100,500,1000. "
+ "If empty, uses --k8s_mgmt_large_scale_nodepools.",
)

# AKS caps node-pool names at 12 chars — keep all names within that limit.
_PREFIX = "pkbm"

Expand All @@ -121,13 +147,18 @@ def _ConcurrentPoolName(i):
_OVERLAPPING_POOL_NAME = f"{_PREFIX}b"


def _ScalePoolName(i):
return f"{_PREFIX}c{i:04d}"


@dataclasses.dataclass
class OpTiming:
"""Latency of a single async management-plane operation.

Pure timing data — the metric name is supplied by the sample builder, and
failures abort the run rather than being recorded here (so there is no
error field).
error field). large_scale_provisioning, which tolerates partial failure,
tracks failed pool names separately.

Attributes:
initiation_latency: Seconds from issuing the async API call until it is
Expand All @@ -154,6 +185,19 @@ def CheckPrerequisites(
f"Invalid value(s) for --k8s_mgmt_scenarios: {invalid}. "
+ f"Valid options: {sorted(_VALID_SCENARIOS)}."
)
selected = {s.strip() for s in _SCENARIOS.value}
if _SCALE_SWEEP.value and "large_scale_provisioning" not in selected:
raise errors.Config.InvalidValue(
"--k8s_mgmt_scale_sweep applies only to the large_scale_provisioning "
+ "scenario, which is not selected."
)
for s in _SCALE_SWEEP.value:
try:
int(s.strip())
except ValueError as e:
raise errors.Config.InvalidValue(
f"Non-integer value in --k8s_mgmt_scale_sweep: {s!r}"
) from e
if benchmark_config.container_cluster.type != "Kubernetes":
raise errors.Config.InvalidValue(
"kubernetes_management benchmark requires a Kubernetes"
Expand Down Expand Up @@ -234,6 +278,9 @@ def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> list[sample.Sample]:
if "overlapping_cluster_update" in scenarios:
samples += _RunOverlappingClusterUpdate(cluster, initial)
_ClearNodePools(cluster)
if "large_scale_provisioning" in scenarios:
samples += _SweepScales(cluster, initial)
_ClearNodePools(cluster)

# Tag all samples with version path and run config for published results.
run_meta = {
Expand Down Expand Up @@ -262,6 +309,33 @@ def Cleanup(benchmark_spec: bm_spec.BenchmarkSpec) -> None:
_ClearNodePools(cluster)


def _SweepScales(
cluster: kubernetes_cluster.KubernetesCluster,
initial: str,
) -> list[sample.Sample]:
"""Runs large-scale provisioning at each requested goal node-pool count.

Goal counts come from --k8s_mgmt_scale_sweep when set, else the single
--k8s_mgmt_large_scale_nodepools value. Each count's samples are tagged
with goal_nodepools so results stay distinguishable.
"""
goal_counts = (
[int(x.strip()) for x in _SCALE_SWEEP.value]
if _SCALE_SWEEP.value
else [_LARGE_SCALE_NODEPOOLS.value]
)
logging.info(
"large_scale_provisioning: goal node-pool counts = %s", goal_counts
)
samples: list[sample.Sample] = []
for goal_nodepools in goal_counts:
goal_samples = _ScaleToPoolCount(cluster, initial, goal_nodepools)
for s in goal_samples:
s.metadata["goal_nodepools"] = str(goal_nodepools)
samples += goal_samples
return samples


def _RunConcurrentNodePoolOps(
cluster: kubernetes_cluster.KubernetesCluster,
initial: str,
Expand Down Expand Up @@ -344,6 +418,86 @@ def DoCreate():
return samples


def _ScaleToPoolCount(
cluster: kubernetes_cluster.KubernetesCluster,
initial: str,
goal_nodepools: int,
) -> list[sample.Sample]:
"""Large-goal_nodepools node-pool provisioning to a goal node-pool count.

Streams all `goal_nodepools` creates through a single executor capped at
_MAX_CONCURRENT workers — as each op completes the next starts immediately
(no batch barriers). Delete uses a live-list so EKS-rolled-back pools are
excluded from the denominator correctly.
"""
logging.info(
"large_scale goal=%d, max_concurrent=%d, initial_version=%s",
goal_nodepools,
_MAX_CONCURRENT.value,
initial,
)
pool_names = [_ScalePoolName(i) for i in range(goal_nodepools)]
configs_ = [_MakeNodePoolConfig(cluster, name) for name in pool_names]
samples: list[sample.Sample] = []

# ── Creates (tolerant — partial failure expected at scale) ──────────────
create_results, create_failed = _RunAsyncTolerant(
kickoff=lambda cfg: cluster.CreateNodePoolAsync(
cfg, node_version=initial
),
wait_fn=cluster.WaitForOperation,
items=configs_,
get_name=lambda cfg: cfg.name,
)
logging.info(
"large_scale goal=%d: %d/%d creates succeeded (%d failed)",
goal_nodepools,
len(create_results),
goal_nodepools,
len(create_failed),
)
samples += _LargeScaleSamples(
"LargeScale_Create",
create_results,
create_failed,
attempted_ops=goal_nodepools,
)

# ── Deletes (live-list) ──────────────────────────────────────────────────
alive = _LiveNodePoolNames(cluster, f"{_PREFIX}c")
logging.info(
"large_scale goal=%d: %d live pools for delete (originally %d;"
+ " %d rolled back by cloud)",
goal_nodepools,
len(alive),
goal_nodepools,
goal_nodepools - len(alive),
)
if not alive:
logging.info(
"large_scale goal=%d: all creates rolled back.", goal_nodepools
)
samples += _LargeScaleSamples(
"LargeScale_Delete", [], [], attempted_ops=goal_nodepools
)
return samples

delete_results, delete_failed = _RunAsyncTolerant(
kickoff=cluster.DeleteNodePoolAsync,
wait_fn=cluster.WaitForOperation,
items=alive,
get_name=str,
)
# attempted_ops=goal_nodepools: accurate rate against original request count.
samples += _LargeScaleSamples(
"LargeScale_Delete",
delete_results,
delete_failed,
attempted_ops=goal_nodepools,
)
return samples


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -418,6 +572,44 @@ def DoWrap(item):
return results.entries


def _RunAsyncTolerant(
kickoff: Callable,
wait_fn: Callable[[str], None],
items: list,
get_name: Callable[[object], str],
) -> tuple[list[tuple[str, OpTiming]], list[str]]:
"""Like _RunAsync but tolerates per-op failures (large_scale only).

Returns (successful (name, OpTiming) pairs, failed names). A failing op is
caught, its name recorded, and execution continues — appropriate only for
large-scale provisioning where overshooting quota is an expected scenario,
not a benchmark failure.
"""
if not items:
return [], []
results = ThreadSafeResults()
cap = min(len(items), _MAX_CONCURRENT.value)

def DoWrap(item):
name = get_name(item)
try:
timing = _TimedAsync(lambda: kickoff(item), wait_fn)
except Exception as exc: # pylint: disable=broad-except
results.add_failure(name)
logging.warning("%s FAILED: %s", name, str(exc)[:200])
return
results.add(name, timing)
logging.info(
"%s initiation=%.2fs end_to_end=%.2fs",
name,
timing.initiation_latency,
timing.end_to_end_latency,
)

background_tasks.RunThreaded(DoWrap, items, max_concurrent_threads=cap)
return results.entries, results.failed


def _MakeNodePoolConfig(
cluster: kubernetes_cluster.KubernetesCluster,
name: str,
Expand Down Expand Up @@ -483,6 +675,86 @@ def _OpSamples(
return samples


def _LargeScaleSamples(
metric_prefix: str,
results: list[tuple[str, OpTiming]],
failed: list[str],
attempted_ops: int,
) -> list[sample.Sample]:
"""Latency + success/failure accounting for the tolerant large-scale path.

Unlike _OpSamples, large_scale_provisioning tolerates partial failure, so
this reports how many ops succeeded/failed (against the originally-attempted
count) and lists the failed pool names in metadata.

Args:
metric_prefix: prefix for all metric names.
results: successful (name, OpTiming) pairs.
failed: names of ops that failed.
attempted_ops: total ops originally requested (the denominator).
"""
samples: list[sample.Sample] = []
init_latencies: list[float] = []
e2e_latencies: list[float] = []

for name, timing in results:
meta = {"operation_name": name}
init_latencies.append(timing.initiation_latency)
e2e_latencies.append(timing.end_to_end_latency)
samples.append(
sample.Sample(
f"{metric_prefix}_InitiationLatency",
timing.initiation_latency,
"seconds",
dict(meta),
)
)
samples.append(
sample.Sample(
f"{metric_prefix}_EndToEndLatency",
timing.end_to_end_latency,
"seconds",
dict(meta),
)
)

if attempted_ops == 0:
raise errors.Benchmarks.RunError(
f"{metric_prefix}: zero operations attempted — the scenario produced "
"no work, which indicates a setup or dispatch failure."
)
succeeded = len(results)
count_meta = {
"total_ops": str(attempted_ops),
"succeeded_ops": str(succeeded),
"failed_ops": str(attempted_ops - succeeded),
"failed_pools": ",".join(failed) if failed else "none",
}
for label, value in (
("TotalOps", attempted_ops),
("SucceededOps", succeeded),
("FailedOps", attempted_ops - succeeded),
):
samples.append(
sample.Sample(
f"{metric_prefix}_{label}", value, "count", dict(count_meta)
)
)
samples.append(
sample.Sample(
f"{metric_prefix}_SuccessRate",
100.0 * succeeded / attempted_ops,
"percent",
dict(count_meta),
)
)

samples += _AggregateAndOutlierSamples(
metric_prefix, init_latencies, e2e_latencies
)
return samples


def _AggregateAndOutlierSamples(
metric_prefix: str,
init_latencies: list[float],
Expand Down
Loading