From 9565417622df87ce16cd958589e3c49796a448e0 Mon Sep 17 00:00:00 2001 From: Ashish Suneja Date: Thu, 18 Jun 2026 20:08:48 +0000 Subject: [PATCH] kubernetes_management: add large_scale_provisioning scenario --- .../kubernetes_management_benchmark.py | 278 ++++++++++++++- .../kubernetes_management_benchmark_test.py | 329 +++++++++++++++++- 2 files changed, 602 insertions(+), 5 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_management_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_management_benchmark.py index 596a324d04..41540b7855 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_management_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_management_benchmark.py @@ -13,11 +13,13 @@ # limitations under the License. """Benchmark for Kubernetes management plane operations. -Measures GKE/EKS/AKS control-plane API responsiveness via two scenarios: +Measures GKE/EKS/AKS control-plane API responsiveness via three scenarios: concurrent_node_pool_ops: concurrent node-pool create/delete. overlapping_cluster_update: node-pool create overlapping a cluster update. + large_scale_provisioning: large-scale node-pool provisioning (scale/sweep). Optimizations for minimum run time: + - Streaming concurrency in large_scale_provisioning (no batch barriers) - Reduced poll_interval in provider WaitForOperation (5s vs 10s) - Per-op threads capped at _MAX_CONCURRENT to avoid OS limits - Accurate delete success rate via attempted_ops denominator @@ -66,9 +68,13 @@ # overlapping_cluster_update: run a cluster update and a node-pool create # simultaneously; measures behaviour when a cluster-scoped op overlaps a # node-pool-scoped one. +# large_scale_provisioning: create then delete a large number of node pools +# (optionally swept via --k8s_mgmt_scale_sweep); measures scaling limits +# and large-batch provisioning latency. _VALID_SCENARIOS = frozenset({ "concurrent_node_pool_ops", "overlapping_cluster_update", + "large_scale_provisioning", }) # ── Shared flags (apply across all scenarios) ── @@ -77,9 +83,11 @@ [ "concurrent_node_pool_ops", "overlapping_cluster_update", + "large_scale_provisioning", ], "Comma-separated subset of scenarios to run. Valid values: " - + "concurrent_node_pool_ops, overlapping_cluster_update.", + + "concurrent_node_pool_ops, overlapping_cluster_update, " + + "large_scale_provisioning.", ) _NODES_PER_NODEPOOL = flags.DEFINE_integer( "k8s_mgmt_nodes_per_nodepool", @@ -106,6 +114,24 @@ "Kubernetes version for newly-created node pools (N-1). None = auto.", ) +# ── large_scale_provisioning flags ── +_LARGE_SCALE_NODEPOOLS = flags.DEFINE_integer( + "k8s_mgmt_large_scale_nodepools", + 1000, + "Number of node pools to provision in the large_scale_provisioning " + + "scenario. Spec target is 1000; ensure VPC/quota is available before " + + "running.", +) +_SCALE_SWEEP = flags.DEFINE_list( + "k8s_mgmt_scale_sweep", + [], + "Comma-separated list of node-pool counts for the large_scale_provisioning " + + "scale sweep. Each scale runs as a separate sub-run with full " + + "create/delete cycle. Example:" + " --k8s_mgmt_scale_sweep=10,50,100,500,1000. " + + "If empty, uses --k8s_mgmt_large_scale_nodepools.", +) + # AKS caps node-pool names at 12 chars — keep all names within that limit. _PREFIX = "pkbm" @@ -121,13 +147,18 @@ def _ConcurrentPoolName(i): _OVERLAPPING_POOL_NAME = f"{_PREFIX}b" +def _ScalePoolName(i): + return f"{_PREFIX}c{i:04d}" + + @dataclasses.dataclass class OpTiming: """Latency of a single async management-plane operation. Pure timing data — the metric name is supplied by the sample builder, and failures abort the run rather than being recorded here (so there is no - error field). + error field). large_scale_provisioning, which tolerates partial failure, + tracks failed pool names separately. Attributes: initiation_latency: Seconds from issuing the async API call until it is @@ -154,6 +185,19 @@ def CheckPrerequisites( f"Invalid value(s) for --k8s_mgmt_scenarios: {invalid}. " + f"Valid options: {sorted(_VALID_SCENARIOS)}." ) + selected = {s.strip() for s in _SCENARIOS.value} + if _SCALE_SWEEP.value and "large_scale_provisioning" not in selected: + raise errors.Config.InvalidValue( + "--k8s_mgmt_scale_sweep applies only to the large_scale_provisioning " + + "scenario, which is not selected." + ) + for s in _SCALE_SWEEP.value: + try: + int(s.strip()) + except ValueError as e: + raise errors.Config.InvalidValue( + f"Non-integer value in --k8s_mgmt_scale_sweep: {s!r}" + ) from e if benchmark_config.container_cluster.type != "Kubernetes": raise errors.Config.InvalidValue( "kubernetes_management benchmark requires a Kubernetes" @@ -234,6 +278,9 @@ def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> list[sample.Sample]: if "overlapping_cluster_update" in scenarios: samples += _RunOverlappingClusterUpdate(cluster, initial) _ClearNodePools(cluster) + if "large_scale_provisioning" in scenarios: + samples += _SweepScales(cluster, initial) + _ClearNodePools(cluster) # Tag all samples with version path and run config for published results. run_meta = { @@ -262,6 +309,33 @@ def Cleanup(benchmark_spec: bm_spec.BenchmarkSpec) -> None: _ClearNodePools(cluster) +def _SweepScales( + cluster: kubernetes_cluster.KubernetesCluster, + initial: str, +) -> list[sample.Sample]: + """Runs large-scale provisioning at each requested goal node-pool count. + + Goal counts come from --k8s_mgmt_scale_sweep when set, else the single + --k8s_mgmt_large_scale_nodepools value. Each count's samples are tagged + with goal_nodepools so results stay distinguishable. + """ + goal_counts = ( + [int(x.strip()) for x in _SCALE_SWEEP.value] + if _SCALE_SWEEP.value + else [_LARGE_SCALE_NODEPOOLS.value] + ) + logging.info( + "large_scale_provisioning: goal node-pool counts = %s", goal_counts + ) + samples: list[sample.Sample] = [] + for goal_nodepools in goal_counts: + goal_samples = _ScaleToPoolCount(cluster, initial, goal_nodepools) + for s in goal_samples: + s.metadata["goal_nodepools"] = str(goal_nodepools) + samples += goal_samples + return samples + + def _RunConcurrentNodePoolOps( cluster: kubernetes_cluster.KubernetesCluster, initial: str, @@ -344,6 +418,86 @@ def DoCreate(): return samples +def _ScaleToPoolCount( + cluster: kubernetes_cluster.KubernetesCluster, + initial: str, + goal_nodepools: int, +) -> list[sample.Sample]: + """Large-goal_nodepools node-pool provisioning to a goal node-pool count. + + Streams all `goal_nodepools` creates through a single executor capped at + _MAX_CONCURRENT workers — as each op completes the next starts immediately + (no batch barriers). Delete uses a live-list so EKS-rolled-back pools are + excluded from the denominator correctly. + """ + logging.info( + "large_scale goal=%d, max_concurrent=%d, initial_version=%s", + goal_nodepools, + _MAX_CONCURRENT.value, + initial, + ) + pool_names = [_ScalePoolName(i) for i in range(goal_nodepools)] + configs_ = [_MakeNodePoolConfig(cluster, name) for name in pool_names] + samples: list[sample.Sample] = [] + + # ── Creates (tolerant — partial failure expected at scale) ────────────── + create_results, create_failed = _RunAsyncTolerant( + kickoff=lambda cfg: cluster.CreateNodePoolAsync( + cfg, node_version=initial + ), + wait_fn=cluster.WaitForOperation, + items=configs_, + get_name=lambda cfg: cfg.name, + ) + logging.info( + "large_scale goal=%d: %d/%d creates succeeded (%d failed)", + goal_nodepools, + len(create_results), + goal_nodepools, + len(create_failed), + ) + samples += _LargeScaleSamples( + "LargeScale_Create", + create_results, + create_failed, + attempted_ops=goal_nodepools, + ) + + # ── Deletes (live-list) ────────────────────────────────────────────────── + alive = _LiveNodePoolNames(cluster, f"{_PREFIX}c") + logging.info( + "large_scale goal=%d: %d live pools for delete (originally %d;" + + " %d rolled back by cloud)", + goal_nodepools, + len(alive), + goal_nodepools, + goal_nodepools - len(alive), + ) + if not alive: + logging.info( + "large_scale goal=%d: all creates rolled back.", goal_nodepools + ) + samples += _LargeScaleSamples( + "LargeScale_Delete", [], [], attempted_ops=goal_nodepools + ) + return samples + + delete_results, delete_failed = _RunAsyncTolerant( + kickoff=cluster.DeleteNodePoolAsync, + wait_fn=cluster.WaitForOperation, + items=alive, + get_name=str, + ) + # attempted_ops=goal_nodepools: accurate rate against original request count. + samples += _LargeScaleSamples( + "LargeScale_Delete", + delete_results, + delete_failed, + attempted_ops=goal_nodepools, + ) + return samples + + # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -418,6 +572,44 @@ def DoWrap(item): return results.entries +def _RunAsyncTolerant( + kickoff: Callable, + wait_fn: Callable[[str], None], + items: list, + get_name: Callable[[object], str], +) -> tuple[list[tuple[str, OpTiming]], list[str]]: + """Like _RunAsync but tolerates per-op failures (large_scale only). + + Returns (successful (name, OpTiming) pairs, failed names). A failing op is + caught, its name recorded, and execution continues — appropriate only for + large-scale provisioning where overshooting quota is an expected scenario, + not a benchmark failure. + """ + if not items: + return [], [] + results = ThreadSafeResults() + cap = min(len(items), _MAX_CONCURRENT.value) + + def DoWrap(item): + name = get_name(item) + try: + timing = _TimedAsync(lambda: kickoff(item), wait_fn) + except Exception as exc: # pylint: disable=broad-except + results.add_failure(name) + logging.warning("%s FAILED: %s", name, str(exc)[:200]) + return + results.add(name, timing) + logging.info( + "%s initiation=%.2fs end_to_end=%.2fs", + name, + timing.initiation_latency, + timing.end_to_end_latency, + ) + + background_tasks.RunThreaded(DoWrap, items, max_concurrent_threads=cap) + return results.entries, results.failed + + def _MakeNodePoolConfig( cluster: kubernetes_cluster.KubernetesCluster, name: str, @@ -483,6 +675,86 @@ def _OpSamples( return samples +def _LargeScaleSamples( + metric_prefix: str, + results: list[tuple[str, OpTiming]], + failed: list[str], + attempted_ops: int, +) -> list[sample.Sample]: + """Latency + success/failure accounting for the tolerant large-scale path. + + Unlike _OpSamples, large_scale_provisioning tolerates partial failure, so + this reports how many ops succeeded/failed (against the originally-attempted + count) and lists the failed pool names in metadata. + + Args: + metric_prefix: prefix for all metric names. + results: successful (name, OpTiming) pairs. + failed: names of ops that failed. + attempted_ops: total ops originally requested (the denominator). + """ + samples: list[sample.Sample] = [] + init_latencies: list[float] = [] + e2e_latencies: list[float] = [] + + for name, timing in results: + meta = {"operation_name": name} + init_latencies.append(timing.initiation_latency) + e2e_latencies.append(timing.end_to_end_latency) + samples.append( + sample.Sample( + f"{metric_prefix}_InitiationLatency", + timing.initiation_latency, + "seconds", + dict(meta), + ) + ) + samples.append( + sample.Sample( + f"{metric_prefix}_EndToEndLatency", + timing.end_to_end_latency, + "seconds", + dict(meta), + ) + ) + + if attempted_ops == 0: + raise errors.Benchmarks.RunError( + f"{metric_prefix}: zero operations attempted — the scenario produced " + "no work, which indicates a setup or dispatch failure." + ) + succeeded = len(results) + count_meta = { + "total_ops": str(attempted_ops), + "succeeded_ops": str(succeeded), + "failed_ops": str(attempted_ops - succeeded), + "failed_pools": ",".join(failed) if failed else "none", + } + for label, value in ( + ("TotalOps", attempted_ops), + ("SucceededOps", succeeded), + ("FailedOps", attempted_ops - succeeded), + ): + samples.append( + sample.Sample( + f"{metric_prefix}_{label}", value, "count", dict(count_meta) + ) + ) + samples.append( + sample.Sample( + f"{metric_prefix}_SuccessRate", + 100.0 * succeeded / attempted_ops, + "percent", + dict(count_meta), + ) + ) + + samples += _AggregateAndOutlierSamples( + metric_prefix, init_latencies, e2e_latencies + ) + return samples + + def _AggregateAndOutlierSamples( metric_prefix: str, init_latencies: list[float], diff --git a/tests/linux_benchmarks/kubernetes_management_benchmark_test.py b/tests/linux_benchmarks/kubernetes_management_benchmark_test.py index d2afdd72f1..c9fa2b90ea 100644 --- a/tests/linux_benchmarks/kubernetes_management_benchmark_test.py +++ b/tests/linux_benchmarks/kubernetes_management_benchmark_test.py @@ -94,6 +94,16 @@ def testConcurrentPoolNameZeroPadsToThreeDigits(self, index, expected): expected, kubernetes_management_benchmark._ConcurrentPoolName(index) ) + @parameterized.named_parameters( + ('zero', 0, 'pkbmc0000'), + ('single_digit', 7, 'pkbmc0007'), + ('four_digit', 1000, 'pkbmc1000'), + ) + def testScalePoolNameZeroPadsToFourDigits(self, index, expected): + self.assertEqual( + expected, kubernetes_management_benchmark._ScalePoolName(index) + ) + def testOverlappingPoolNameIsConstant(self): self.assertEqual( 'pkbmb', kubernetes_management_benchmark._OVERLAPPING_POOL_NAME @@ -104,6 +114,10 @@ def testAllNamesWithinAksLimit(self): self.assertLessEqual( len(kubernetes_management_benchmark._ConcurrentPoolName(i)), 12 ) + for i in range(10000): + self.assertLessEqual( + len(kubernetes_management_benchmark._ScalePoolName(i)), 12 + ) self.assertLessEqual( len(kubernetes_management_benchmark._OVERLAPPING_POOL_NAME), 12 ) @@ -117,6 +131,7 @@ def testValidScenariosPass(self): k8s_mgmt_scenarios=[ 'concurrent_node_pool_ops', 'overlapping_cluster_update', + 'large_scale_provisioning', ] ): kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) @@ -140,11 +155,34 @@ def testNonKubernetesClusterTypeRaises(self): _make_mock_config(cluster_type='Mesos') ) + def testInvalidScaleSweepRaises(self): + with flagsaver.flagsaver( + k8s_mgmt_scenarios=['large_scale_provisioning'], + k8s_mgmt_scale_sweep=['10', 'abc'], + ): + with self.assertRaises(errors.Config.InvalidValue): + kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) + + def testValidScaleSweepPasses(self): + with flagsaver.flagsaver( + k8s_mgmt_scenarios=['large_scale_provisioning'], + k8s_mgmt_scale_sweep=['10', '50', '100'], + ): + kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) + def testLowercaseScenarioRaises(self): with flagsaver.flagsaver(k8s_mgmt_scenarios=['a']): with self.assertRaises(errors.Config.InvalidValue): kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) + def testScaleSweepWithoutLargeScaleRaises(self): + with flagsaver.flagsaver( + k8s_mgmt_scenarios=['concurrent_node_pool_ops'], + k8s_mgmt_scale_sweep=['10', '50'], + ): + with self.assertRaises(errors.Config.InvalidValue): + kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) + class PrepareTest(pkb_common_test_case.PkbCommonTestCase): """Tests for the Prepare benchmark lifecycle function.""" @@ -387,6 +425,46 @@ def testGetNameCallableApplied(self): self.assertEqual('poolname', name) +class RunAsyncTolerantTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _RunAsyncTolerant helper (large_scale path).""" + + @flagsaver.flagsaver(k8s_mgmt_max_concurrent=50) + def testAllSucceed(self): + results, failed = kubernetes_management_benchmark._RunAsyncTolerant( + kickoff=mock.Mock(return_value='op'), + wait_fn=mock.Mock(return_value=None), + items=['a', 'b'], + get_name=str, + ) + self.assertLen(results, 2) + self.assertEmpty(failed) + + @flagsaver.flagsaver(k8s_mgmt_max_concurrent=50) + def testFailuresRecordedNotRaised(self): + """Tolerant path catches failures and records their names.""" + + def _kickoff(item): + if item == 'b': + raise RuntimeError('b failed') + return 'op' + + results, failed = kubernetes_management_benchmark._RunAsyncTolerant( + kickoff=_kickoff, + wait_fn=mock.Mock(return_value=None), + items=['a', 'b', 'c'], + get_name=str, + ) + self.assertLen(results, 2) + self.assertEqual(['b'], failed) + + def testEmptyItemsReturnsEmpty(self): + results, failed = kubernetes_management_benchmark._RunAsyncTolerant( + kickoff=mock.Mock(), wait_fn=mock.Mock(), items=[], get_name=str + ) + self.assertEmpty(results) + self.assertEmpty(failed) + + class MakeNodePoolConfigTest(pkb_common_test_case.PkbCommonTestCase): """Tests for the _MakeNodePoolConfig factory.""" @@ -490,6 +568,54 @@ def testOutliersNotGeneratedForThreeOrFewer(self): ) +class LargeScaleSamplesTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _LargeScaleSamples tolerant-path helper.""" + + def testSuccessRateHundredWhenAllSucceed(self): + results = [ + ('op1', kubernetes_management_benchmark.OpTiming(1.0, 2.0)), + ('op2', kubernetes_management_benchmark.OpTiming(0.5, 1.5)), + ] + samples = kubernetes_management_benchmark._LargeScaleSamples( + 'Op', results, [], attempted_ops=2 + ) + rate = next(s for s in samples if s.metric == 'Op_SuccessRate') + self.assertAlmostEqual(100.0, rate.value) + + def testSuccessRateReflectsFailures(self): + results = [('op1', kubernetes_management_benchmark.OpTiming(1.0, 2.0))] + samples = kubernetes_management_benchmark._LargeScaleSamples( + 'Op', results, ['op2', 'op3'], attempted_ops=3 + ) + rate = next(s for s in samples if s.metric == 'Op_SuccessRate') + self.assertAlmostEqual(100.0 / 3, rate.value, places=3) + + def testFailedPoolsListedInMetadata(self): + results = [('op1', kubernetes_management_benchmark.OpTiming(1.0, 2.0))] + samples = kubernetes_management_benchmark._LargeScaleSamples( + 'Op', results, ['op2', 'op3'], attempted_ops=3 + ) + failed = next(s for s in samples if s.metric == 'Op_FailedOps') + self.assertEqual(2, failed.value) + self.assertEqual('op2,op3', failed.metadata['failed_pools']) + + def testCountMetricsExposed(self): + results = [('op1', kubernetes_management_benchmark.OpTiming(1.0, 2.0))] + samples = kubernetes_management_benchmark._LargeScaleSamples( + 'Op', results, ['op2'], attempted_ops=2 + ) + metrics = {s.metric: s.value for s in samples} + self.assertEqual(2, metrics['Op_TotalOps']) + self.assertEqual(1, metrics['Op_SucceededOps']) + self.assertEqual(1, metrics['Op_FailedOps']) + + def testZeroAttemptedRaisesRunError(self): + with self.assertRaises(errors.Benchmarks.RunError): + kubernetes_management_benchmark._LargeScaleSamples( + 'Op', [], [], attempted_ops=0 + ) + + class AggregateSamplesTest(pkb_common_test_case.PkbCommonTestCase): """Tests for the _AggregateSamples statistics helper.""" @@ -598,8 +724,41 @@ def testReturnsSingleSample(self): class RunTest(pkb_common_test_case.PkbCommonTestCase): """Tests for the Run benchmark entry-point function.""" + @flagsaver.flagsaver( + k8s_mgmt_scenarios=[ + 'concurrent_node_pool_ops', + 'overlapping_cluster_update', + 'large_scale_provisioning', + ], + k8s_mgmt_scale_sweep=[], + k8s_mgmt_large_scale_nodepools=10, + ) + def testRunSweepsAfterEachScenario(self): + """Run sweeps node pools after each scenario that executes.""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with mock.patch.object( + kubernetes_management_benchmark, '_ClearNodePools' + ) as mock_clean, mock.patch.object( + kubernetes_management_benchmark, + '_RunConcurrentNodePoolOps', + return_value=[], + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunOverlappingClusterUpdate', + return_value=[], + ), mock.patch.object( + kubernetes_management_benchmark, '_ScaleToPoolCount', return_value=[] + ): + kubernetes_management_benchmark.Run(bm_spec) + # All three scenarios run by default -> one sweep after each. + self.assertEqual(mock_clean.call_count, 3) + mock_clean.assert_called_with(cluster) + @flagsaver.flagsaver( k8s_mgmt_scenarios=['concurrent_node_pool_ops'], + k8s_mgmt_scale_sweep=[], + k8s_mgmt_large_scale_nodepools=10, ) def testRunOnlyScenarioACallsOnlyA(self): """Run dispatches only to _RunConcurrentNodePoolOps for that scenario.""" @@ -615,13 +774,18 @@ def testRunOnlyScenarioACallsOnlyA(self): kubernetes_management_benchmark, '_RunOverlappingClusterUpdate', return_value=[], - ) as mock_b: + ) as mock_b, mock.patch.object( + kubernetes_management_benchmark, '_ScaleToPoolCount', return_value=[] + ) as mock_c: kubernetes_management_benchmark.Run(bm_spec) mock_a.assert_called_once() mock_b.assert_not_called() + mock_c.assert_not_called() @flagsaver.flagsaver( k8s_mgmt_scenarios=['overlapping_cluster_update'], + k8s_mgmt_scale_sweep=[], + k8s_mgmt_large_scale_nodepools=10, ) def testRunOnlyScenarioBCallsOnlyB(self): """Run dispatches only to _RunOverlappingClusterUpdate for that scenario.""" @@ -637,13 +801,104 @@ def testRunOnlyScenarioBCallsOnlyB(self): kubernetes_management_benchmark, '_RunOverlappingClusterUpdate', return_value=[], - ) as mock_b: + ) as mock_b, mock.patch.object( + kubernetes_management_benchmark, '_ScaleToPoolCount', return_value=[] + ) as mock_c: kubernetes_management_benchmark.Run(bm_spec) mock_a.assert_not_called() mock_b.assert_called_once() + mock_c.assert_not_called() + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['large_scale_provisioning'], + k8s_mgmt_scale_sweep=[], + k8s_mgmt_large_scale_nodepools=42, + ) + def testRunScenarioCPassesLargeScaleFlag(self): + """Run passes the large-scale-nodepools flag down to _ScaleToPoolCount.""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with mock.patch.object( + kubernetes_management_benchmark, '_ClearNodePools' + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunConcurrentNodePoolOps', + return_value=[], + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunOverlappingClusterUpdate', + return_value=[], + ), mock.patch.object( + kubernetes_management_benchmark, '_ScaleToPoolCount', return_value=[] + ) as mock_c: + kubernetes_management_benchmark.Run(bm_spec) + mock_c.assert_called_once() + _, _, scale = mock_c.call_args.args + self.assertEqual(42, scale) + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['large_scale_provisioning'], + k8s_mgmt_scale_sweep=['10', '50'], + k8s_mgmt_large_scale_nodepools=100, + ) + def testRunScenarioCScaleSweepRunsTwice(self): + """Tests that Run calls _ScaleToPoolCount once per scale in the sweep.""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with mock.patch.object( + kubernetes_management_benchmark, '_ClearNodePools' + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunConcurrentNodePoolOps', + return_value=[], + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunOverlappingClusterUpdate', + return_value=[], + ), mock.patch.object( + kubernetes_management_benchmark, + '_ScaleToPoolCount', + return_value=[_make_sample('m', 1.0)], + ) as mock_c: + kubernetes_management_benchmark.Run(bm_spec) + self.assertEqual(2, mock_c.call_count) + scales = [call.args[2] for call in mock_c.call_args_list] + self.assertIn(10, scales) + self.assertIn(50, scales) + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['large_scale_provisioning'], + k8s_mgmt_scale_sweep=['10'], + k8s_mgmt_large_scale_nodepools=10, + ) + def testRunTagsScenarioCGoalInMetadata(self): + """Tests that Run adds goal_nodepools to each sample's metadata.""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + test_sample = _make_sample('metric', 1.0) + with mock.patch.object( + kubernetes_management_benchmark, '_ClearNodePools' + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunConcurrentNodePoolOps', + return_value=[], + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunOverlappingClusterUpdate', + return_value=[], + ), mock.patch.object( + kubernetes_management_benchmark, + '_ScaleToPoolCount', + return_value=[test_sample], + ): + samples = kubernetes_management_benchmark.Run(bm_spec) + self.assertIn('goal_nodepools', samples[0].metadata) + self.assertEqual('10', samples[0].metadata['goal_nodepools']) @flagsaver.flagsaver( k8s_mgmt_scenarios=['concurrent_node_pool_ops'], + k8s_mgmt_scale_sweep=[], + k8s_mgmt_large_scale_nodepools=10, ) def testRunTagsAllSamplesWithRunMetadata(self): """Tests that Run adds version and config keys to all sample metadata.""" @@ -660,6 +915,8 @@ def testRunTagsAllSamplesWithRunMetadata(self): kubernetes_management_benchmark, '_RunOverlappingClusterUpdate', return_value=[], + ), mock.patch.object( + kubernetes_management_benchmark, '_ScaleToPoolCount', return_value=[] ): samples = kubernetes_management_benchmark.Run(bm_spec) meta = samples[0].metadata @@ -674,6 +931,8 @@ def testRunTagsAllSamplesWithRunMetadata(self): @flagsaver.flagsaver( k8s_mgmt_scenarios=['concurrent_node_pool_ops'], k8s_mgmt_initial_version='1.30', + k8s_mgmt_scale_sweep=[], + k8s_mgmt_large_scale_nodepools=10, ) def testRunUsesExplicitVersionFlags(self): """Tests that Run uses explicit version flags over auto-resolved ones.""" @@ -689,6 +948,8 @@ def testRunUsesExplicitVersionFlags(self): kubernetes_management_benchmark, '_RunOverlappingClusterUpdate', return_value=[], + ), mock.patch.object( + kubernetes_management_benchmark, '_ScaleToPoolCount', return_value=[] ): samples = kubernetes_management_benchmark.Run(bm_spec) cluster.ResolveNodePoolVersions.assert_not_called() @@ -696,6 +957,8 @@ def testRunUsesExplicitVersionFlags(self): @flagsaver.flagsaver( k8s_mgmt_scenarios=['concurrent_node_pool_ops'], + k8s_mgmt_scale_sweep=[], + k8s_mgmt_large_scale_nodepools=10, ) def testRunAutoResolvesVersionsWhenFlagsAbsent(self): """Tests Run calls ResolveNodePoolVersions when version flags absent.""" @@ -712,6 +975,8 @@ def testRunAutoResolvesVersionsWhenFlagsAbsent(self): kubernetes_management_benchmark, '_RunOverlappingClusterUpdate', return_value=[], + ), mock.patch.object( + kubernetes_management_benchmark, '_ScaleToPoolCount', return_value=[] ): samples = kubernetes_management_benchmark.Run(bm_spec) cluster.ResolveNodePoolVersions.assert_called_once() @@ -831,5 +1096,65 @@ def testPassesInitialVersionToCreate(self): self.assertEqual('1.33', node_version) +class RunScenarioCTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _ScaleToPoolCount large-scale create-and-delete scenario.""" + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testProducesCreateAndDeleteSamples(self): + cluster = _make_mock_cluster(pool_names=['pkbmc0000', 'pkbmc0001']) + samples = kubernetes_management_benchmark._ScaleToPoolCount( + cluster, '1.33', goal_nodepools=2 + ) + metrics = {s.metric for s in samples} + self.assertTrue(any('LargeScale_Create' in m for m in metrics)) + self.assertTrue(any('LargeScale_Delete' in m for m in metrics)) + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testZeroLivePoolsRecordsZeroDeleteSuccessRate(self): + """Tests Scenario C records 0% delete rate when no live pools exist.""" + cluster = _make_mock_cluster(pool_names=[]) + samples = kubernetes_management_benchmark._ScaleToPoolCount( + cluster, '1.33', goal_nodepools=3 + ) + delete_rate = next( + s for s in samples if s.metric == 'LargeScale_Delete_SuccessRate' + ) + self.assertEqual(0.0, delete_rate.value) + cluster.DeleteNodePoolAsync.assert_not_called() + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testDeleteUsesLiveListNotOriginalCreateList(self): + cluster = _make_mock_cluster(pool_names=['pkbmc0000', 'pkbmc0001']) + kubernetes_management_benchmark._ScaleToPoolCount( + cluster, '1.33', goal_nodepools=3 + ) + self.assertEqual(2, cluster.DeleteNodePoolAsync.call_count) + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testCreateSuccessRateUsesScaleAsDenominator(self): + """Tests Scenario C create success rate uses scale as total_ops.""" + cluster = _make_mock_cluster(pool_names=['pkbmc0000']) + samples = kubernetes_management_benchmark._ScaleToPoolCount( + cluster, '1.33', goal_nodepools=3 + ) + create_rate = next( + s for s in samples if s.metric == 'LargeScale_Create_SuccessRate' + ) + self.assertLessEqual(create_rate.value, 100.0) + self.assertEqual('3', create_rate.metadata['total_ops']) + + if __name__ == '__main__': unittest.main()