From b3201455717a7392a0b40b747b1f16b2e3fa2bc3 Mon Sep 17 00:00:00 2001 From: 0rlych1kk4 Date: Sat, 21 Feb 2026 14:56:53 +0800 Subject: [PATCH 01/13] test: add scheduled repair concurrency scenario for DatacenterAware multi-agent tests - Add scheduled repair concurrency test for multi-agent DatacenterAware mode - Make schedule interval and initial delay configurable per instance - Make schedule overrides opt-in to avoid affecting other tests - Configure fast schedules explicitly for this scenario --- ecchronos-binary/src/test/bin/ecc_config.py | 142 ++++++++++-------- .../bin/test_multi_agent_datacenter_aware.py | 87 +++++++++-- 2 files changed, 148 insertions(+), 81 deletions(-) diff --git a/ecchronos-binary/src/test/bin/ecc_config.py b/ecchronos-binary/src/test/bin/ecc_config.py index ab959c449..4ad8dd8e1 100644 --- a/ecchronos-binary/src/test/bin/ecc_config.py +++ b/ecchronos-binary/src/test/bin/ecc_config.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 -# vi: syntax=python # -# Copyright 2025 Telefonaktiebolaget LM Ericsson +# Copyright 2024 Telefonaktiebolaget LM Ericsson # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -29,6 +28,12 @@ def __init__( agent_type=global_vars.DEFAULT_AGENT_TYPE, initial_contact_point=global_vars.DEFAULT_INITIAL_CONTACT_POINT, instance_name=global_vars.DEFAULT_INSTANCE_NAME, + # NEW: schedule knobs (OPT-IN only) + schedule_interval_time=None, + schedule_interval_unit=None, + schedule_initial_delay_time=None, + schedule_initial_delay_unit=None, + ): self.container_mounts = {} self.datacenter_aware = ( @@ -39,6 +44,13 @@ def __init__( self.initial_contact_point = initial_contact_point self.instance_name = instance_name + # Optional overrides (None = do not modify schedule) + self.schedule_interval_time = schedule_interval_time + self.schedule_interval_unit = schedule_interval_unit + self.schedule_initial_delay_time = schedule_initial_delay_time + self.schedule_initial_delay_unit = schedule_initial_delay_unit + + def modify_configuration(self): self._modify_ecc_yaml_file() self._modify_security_yaml_file() @@ -46,27 +58,38 @@ def modify_configuration(self): self._modify_schedule_configuration() self._uncomment_head_options() self._modify_logback_configuration() + self.container_mounts["certificates"] = { "host": global_vars.CERTIFICATE_DIRECTORY, "container": global_vars.CONTAINER_CERTIFICATE_PATH, } + + # Per-instance logs (multi-agent safe) self.container_mounts["logs"] = { "host": f"{global_vars.HOST_LOGS_PATH}/{self.instance_name}", "container": global_vars.CONTAINER_LOGS_PATH, } - # Modify ecc.yaml file + # -------------------------------------------------- + # ECC YAML + # -------------------------------------------------- def _modify_ecc_yaml_file(self): data = self._read_yaml_data(global_vars.ECC_YAML_FILE_PATH) data = self._modify_connection_configuration(data) data = self._modify_scheduler_configuration(data) data = self._modify_twcs_configuration(data) + if global_vars.JOLOKIA_ENABLED == "true": data = self._modify_jolokia_configuration(data) - self.container_mounts["ecc"] = {"host": self.write_tmp(data), "container": global_vars.CONTAINER_ECC_YAML_PATH} + + self.container_mounts["ecc"] = { + "host": self.write_tmp(data), + "container": global_vars.CONTAINER_ECC_YAML_PATH, + } def _modify_connection_configuration(self, data): data["connection"]["cql"]["contactPoints"] = [{"host": self.initial_contact_point, "port": 9042}] + data["connection"]["cql"]["datacenterAware"]["datacenters"] = self.datacenter_aware data["connection"]["cql"]["instanceName"] = self.instance_name data["connection"]["cql"]["localDatacenter"] = self.local_dc @@ -87,13 +110,16 @@ def _modify_jolokia_configuration(self, data): data["connection"]["jmx"]["jolokia"]["usePem"] = True return data - # Modify security.yaml file + # -------------------------------------------------- + # SECURITY YAML + # -------------------------------------------------- def _modify_security_yaml_file(self): data = self._read_yaml_data(global_vars.SECURITY_YAML_FILE_PATH) if global_vars.LOCAL != "true": data = self._modify_security_configuration(data) else: data = self._modify_cql_configuration(data) + self.container_mounts["security"] = { "host": self.write_tmp(data), "container": global_vars.CONTAINER_SECURITY_YAML_PATH, @@ -103,6 +129,7 @@ def _modify_security_configuration(self, data): data["cql"]["credentials"]["enabled"] = True data["cql"]["credentials"]["username"] = "eccuser" data["cql"]["credentials"]["password"] = "eccpassword" + data["cql"]["tls"]["enabled"] = True data["cql"]["tls"]["keystore"] = f"{global_vars.CONTAINER_CERTIFICATE_PATH}/.keystore" data["cql"]["tls"]["keystore_password"] = "ecctest" @@ -113,6 +140,7 @@ def _modify_security_configuration(self, data): data["jmx"]["credentials"]["username"] = "cassandra" data["jmx"]["credentials"]["password"] = "cassandra" data["jmx"]["tls"]["enabled"] = True + if global_vars.PEM_ENABLED != "true": data["jmx"]["tls"]["keystore"] = f"{global_vars.CONTAINER_CERTIFICATE_PATH}/.keystore" data["jmx"]["tls"]["keystore_password"] = "ecctest" @@ -133,22 +161,23 @@ def _modify_cql_configuration(self, data): data["cql"]["credentials"]["password"] = "cassandra" return data - # Modify application.yaml file + # -------------------------------------------------- + # APPLICATION YAML + # -------------------------------------------------- def _modify_application_yaml_file(self): data = self._read_yaml_data(global_vars.APPLICATION_YAML_FILE_PATH) if global_vars.LOCAL != "true": data = self._modify_application_configuration(data) + data = self._modify_spring_doc_configuration(data) + self.container_mounts["application"] = { "host": self.write_tmp(data), "container": global_vars.CONTAINER_APPLICATION_YAML_PATH, } def _modify_application_configuration(self, data): - if "server" not in data: - data["server"] = {} - if "ssl" not in data["server"]: - data["server"]["ssl"] = {} + data.setdefault("server", {}).setdefault("ssl", {}) data["server"]["ssl"]["enabled"] = True data["server"]["ssl"]["key-store"] = f"{global_vars.CONTAINER_CERTIFICATE_PATH}/serverkeystore" @@ -165,19 +194,15 @@ def _modify_spring_doc_configuration(self, data): data["springdoc"]["api-docs"]["show-actuator"] = True return data + # -------------------------------------------------- + # JVM / LOGGING + # -------------------------------------------------- def _uncomment_head_options(self): pattern = re.compile(r"^#\s*(-X.*)") with open(global_vars.JVM_OPTIONS_FILE_PATH, "r", encoding="utf-8") as file: lines = file.readlines() - result = [] - - for line in lines: - match = pattern.match(line) - if match: - result.append(match.group(1) + "\n") - else: - result.append(line) + result = [pattern.match(l).group(1) + "\n" if pattern.match(l) else l for l in lines] self.container_mounts["jvm"] = { "host": self.write_tmp(result, ".options"), @@ -189,15 +214,12 @@ def _modify_logback_configuration(self): lines = file.readlines() pattern = re.compile(r'^(\s*)()\s*$') - result = [] for line in lines: match = pattern.match(line) if match: - indent = match.group(1) - content = match.group(2) - result.append(f"{indent}\n") + result.append(f"{match.group(1)}\n") else: result.append(line) @@ -206,58 +228,48 @@ def _modify_logback_configuration(self): "container": global_vars.CONTAINER_LOGBACK_FILE_PATH, } + # -------------------------------------------------- + # SAFE SCHEDULE PATCH (non-global) + # -------------------------------------------------- def _modify_schedule_configuration(self): data = self._read_yaml_data(global_vars.SCHEDULE_YAML_FILE_PATH) - data["keyspaces"] = [ - { - "name": "test", - "tables": [ - { - "name": "table1", - "interval": {"time": 1, "unit": "days"}, - "initial_delay": {"time": 1, "unit": "hours"}, - "unwind_ratio": 0.1, - }, - {"name": "table3", "enabled": False}, - ], - }, - { - "name": "test2", - "tables": [ - {"name": "table1", "repair_type": "incremental"}, - {"name": "table2", "repair_type": "parallel_vnode"}, - ], - }, - { - "name": "system_auth", - "tables": [ - {"name": "network_permissions", "enabled": False}, - {"name": "resource_role_permissons_index", "enabled": False}, - {"name": "role_members", "enabled": False}, - {"name": "role_permissions", "enabled": False}, - {"name": "roles", "enabled": False}, - ], - }, - { - "name": "ecchronos", - "tables": [ - {"name": "lock", "enabled": False}, - {"name": "lock_priority", "enabled": False}, - {"name": "on_demand_repair_status", "enabled": False}, - {"name": "reject_configuration", "enabled": False}, - {"name": "repair_history", "enabled": True}, - ], - }, - ] + + # Only modify when explicitly requested + if any( + [ + self.schedule_interval_time, + self.schedule_interval_unit, + self.schedule_initial_delay_time, + self.schedule_initial_delay_unit, + ] + ): + for ks in data.get("keyspaces", []): + if ks.get("name") != "test": + continue + for tbl in ks.get("tables", []): + if tbl.get("name") != "table1": + continue + + if self.schedule_interval_time is not None: + tbl.setdefault("interval", {})["time"] = self.schedule_interval_time + if self.schedule_interval_unit is not None: + tbl.setdefault("interval", {})["unit"] = self.schedule_interval_unit + + if self.schedule_initial_delay_time is not None: + tbl.setdefault("initial_delay", {})["time"] = self.schedule_initial_delay_time + if self.schedule_initial_delay_unit is not None: + tbl.setdefault("initial_delay", {})["unit"] = self.schedule_initial_delay_unit + break + self.container_mounts["schedule"] = { "host": self.write_tmp(data), "container": global_vars.CONTAINER_SCHEDULE_YAML_PATH, } + # -------------------------------------------------- def _read_yaml_data(self, filename): with open(filename, "r") as f: - data = yaml.safe_load(f) - return data + return yaml.safe_load(f) def write_tmp(self, data, suffix=".yaml") -> str: tmp = tempfile.NamedTemporaryFile(mode="w", suffix=suffix, delete=False) diff --git a/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py b/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py index 5dffa9ec0..92a298ac6 100644 --- a/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py +++ b/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py @@ -14,14 +14,21 @@ # limitations under the License. # -import pytest -import global_variables as global_vars import logging -from conftest import run_ecctool_state_nodes, assert_nodes_size_is_equal, run_ecctool_run_repair, run_ecctool_repairs -from ecc_config import EcchronosConfig from concurrent.futures import ThreadPoolExecutor from threading import Barrier -from tenacity import retry, stop_after_delay, wait_fixed, retry_if_result + +import global_variables as global_vars +import pytest +from ecc_config import EcchronosConfig +from tenacity import retry, retry_if_result, stop_after_delay, wait_fixed + +from conftest import ( + assert_nodes_size_is_equal, + run_ecctool_repairs, + run_ecctool_run_repair, + run_ecctool_state_nodes, +) logger = logging.getLogger(__name__) barrier = Barrier(2) @@ -29,6 +36,12 @@ ECC_INSTANCE_NAME_DC1 = "ecchronos-agent-dc1" ECC_INSTANCE_NAME_DC2 = "ecchronos-agent-dc2" +# Keep schedule values aligned across DCs so contention is deterministic in CI. +SCHEDULE_INTERVAL_TIME = 1 +SCHEDULE_INTERVAL_UNIT = "minutes" +SCHEDULE_INITIAL_DELAY_TIME = 1 +SCHEDULE_INITIAL_DELAY_UNIT = "minutes" + def run_repair(container, params): barrier.wait() @@ -47,9 +60,18 @@ def test_install_cassandra_cluster(install_cassandra_cluster): @pytest.mark.dependency(name="test_install_ecchronos_dc1", depends=["test_install_cassandra_cluster"]) def test_install_ecchronos_dc1(install_cassandra_cluster, test_environment): dcs = [{"name": global_vars.DC1}] - ecchronos_config = EcchronosConfig(datacenter_aware=dcs, instance_name=ECC_INSTANCE_NAME_DC1) + ecchronos_config = EcchronosConfig( + datacenter_aware=dcs, + instance_name=ECC_INSTANCE_NAME_DC1, + schedule_interval_time=SCHEDULE_INTERVAL_TIME, + schedule_interval_unit=SCHEDULE_INTERVAL_UNIT, + schedule_initial_delay_time=SCHEDULE_INITIAL_DELAY_TIME, + schedule_initial_delay_unit=SCHEDULE_INITIAL_DELAY_UNIT, + ) test_environment.start_ecchronos( - suffix="dc1", cassandra_network=install_cassandra_cluster.network, ecchronos_config=ecchronos_config + suffix="dc1", + cassandra_network=install_cassandra_cluster.network, + ecchronos_config=ecchronos_config, ) test_environment.wait_for_ecchronos_ready() out, _ = run_ecctool_state_nodes(ECC_INSTANCE_NAME_DC1) @@ -64,6 +86,10 @@ def test_install_ecchronos_dc2(install_cassandra_cluster, test_environment): instance_name=ECC_INSTANCE_NAME_DC2, local_dc=global_vars.DC2, initial_contact_point=global_vars.DEFAULT_SEED_IP_DC2, + schedule_interval_time=SCHEDULE_INTERVAL_TIME, + schedule_interval_unit=SCHEDULE_INTERVAL_UNIT, + schedule_initial_delay_time=SCHEDULE_INITIAL_DELAY_TIME, + schedule_initial_delay_unit=SCHEDULE_INITIAL_DELAY_UNIT, ) test_environment.start_ecchronos( suffix="dc2", @@ -85,7 +111,6 @@ def test_lock_concurrency(install_cassandra_cluster, test_environment): logger.info("Running parallel on demand jobs inside instances") try: - with ThreadPoolExecutor(max_workers=2) as executor: future_ecc1_dc1 = executor.submit(run_repair, ECC_INSTANCE_NAME_DC1, ["-k", "test", "--all"]) future_ecc1_dc2 = executor.submit(run_repair, ECC_INSTANCE_NAME_DC2, ["-k", "test", "--all"]) @@ -98,7 +123,9 @@ def test_lock_concurrency(install_cassandra_cluster, test_environment): _, exit_code_ecc2_dc2 = future_ecc2_dc2.result() logger.info( - f"On Demand Jobs created with exit code: ecchronos-agent-dc1: {exit_code_ecc1_dc1, exit_code_ecc2_dc1}, ecchronos-agent-dc2: {exit_code_ecc1_dc2, exit_code_ecc2_dc2}" + "On Demand Jobs created with exit code: " + f"{ECC_INSTANCE_NAME_DC1}: {(exit_code_ecc1_dc1, exit_code_ecc2_dc1)}, " + f"{ECC_INSTANCE_NAME_DC2}: {(exit_code_ecc1_dc2, exit_code_ecc2_dc2)}" ) if any( @@ -108,11 +135,9 @@ def test_lock_concurrency(install_cassandra_cluster, test_environment): logger.error("Fail to create on demand jobs") pytest.fail("Fail to create on demand jobs") - # Wait for jobs to be finished in both instances within a time limit try: - wait_for_repairs_completion(executor) + wait_for_repairs_completion_on_demand(executor) except Exception as e: - logger.error(f"Timeout: Repairs did not complete within 5 minutes") pytest.fail(f"Repairs did not complete within 5 minutes: {e}") except Exception as e: @@ -120,10 +145,28 @@ def test_lock_concurrency(install_cassandra_cluster, test_environment): pytest.fail(f"Failed to run behave tests: {e}") +@pytest.mark.dependency( + name="test_lock_concurrency_scheduled", + depends=["test_install_cassandra_cluster", "test_install_ecchronos_dc1", "test_install_ecchronos_dc2"], +) +def test_lock_concurrency_scheduled(install_cassandra_cluster, test_environment): + logger.info("Waiting for scheduled repair jobs to run in both instances") + + try: + with ThreadPoolExecutor(max_workers=2) as executor: + try: + wait_for_repairs_completion_scheduled(executor) + except Exception as e: + pytest.fail(f"Scheduled repairs did not complete within 10 minutes: {e}") + + except Exception as e: + logger.error(f"Failed to run scheduled repair tests: {e}") + pytest.fail(f"Failed to run scheduled repair tests: {e}") + + def handle_repair_output(output_data): output_data = output_data.decode("ascii").lstrip().rstrip().split("\n") rows = output_data[3:-2] - # Clean each row: remove pipes and strip whitespace return [row.strip().strip("|").strip() for row in rows] @@ -132,9 +175,19 @@ def all_repairs_completed(statuses): return all(status == "COMPLETED" for status in statuses) +@retry(stop=stop_after_delay(300), wait=wait_fixed(10), retry=retry_if_result(lambda x: not x)) +def wait_for_repairs_completion_on_demand(executor): + """Wait for all repairs to complete in both datacenters (on-demand timeout ~5min).""" + return _wait_for_repairs_completion_common(executor) + + @retry(stop=stop_after_delay(600), wait=wait_fixed(10), retry=retry_if_result(lambda x: not x)) -def wait_for_repairs_completion(executor): - """Wait for all repairs to complete in both datacenters""" +def wait_for_repairs_completion_scheduled(executor): + """Wait for all repairs to complete in both datacenters (scheduled timeout ~10min).""" + return _wait_for_repairs_completion_common(executor) + + +def _wait_for_repairs_completion_common(executor): params = ["-c", "4"] future_ecc_dc1 = executor.submit(verify_repair_completed, ECC_INSTANCE_NAME_DC1, params) future_ecc_dc2 = executor.submit(verify_repair_completed, ECC_INSTANCE_NAME_DC2, params) @@ -152,6 +205,8 @@ def wait_for_repairs_completion(executor): return True logger.info( - f"Waiting for repairs... DC1: {sum(1 for s in ecc_dc1_statuses if s == 'COMPLETED')}/{len(ecc_dc1_statuses)}, DC2: {sum(1 for s in ecc_dc2_statuses if s == 'COMPLETED')}/{len(ecc_dc2_statuses)}" + "Waiting for repairs... " + f"DC1: {sum(1 for s in ecc_dc1_statuses if s == 'COMPLETED')}/{len(ecc_dc1_statuses)}, " + f"DC2: {sum(1 for s in ecc_dc2_statuses if s == 'COMPLETED')}/{len(ecc_dc2_statuses)}" ) return False From 6e57bcade564318a80b9da42c5caabf51d319afa Mon Sep 17 00:00:00 2001 From: 0rlych1kk4 Date: Sun, 22 Feb 2026 14:04:29 +0800 Subject: [PATCH 02/13] test: harden ConfigRefresher Awaitility timeouts --- .../application/config/TestConfigRefresher.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfigRefresher.java b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfigRefresher.java index e7077792a..8f8d04f45 100644 --- a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfigRefresher.java +++ b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfigRefresher.java @@ -43,10 +43,16 @@ public void testGetFileContent() throws Exception configRefresher.watch(file.toPath(), () -> reference.set(readFileContent(file))); writeToFile(file, "some content"); - await().atMost(1, TimeUnit.SECONDS).until(() -> "some content".equals(reference.get())); + await() + .pollInterval(50, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> "some content".equals(reference.get())); writeToFile(file, "some new content"); - await().atMost(1, TimeUnit.SECONDS).until(() -> "some new content".equals(reference.get())); + await() + .pollInterval(50, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> "some new content".equals(reference.get())); } } @@ -79,7 +85,10 @@ public void testRunnableThrowsAtFirstUpdate() throws Exception shouldThrow.set(false); writeToFile(file, "some new content"); - await().atMost(1, TimeUnit.SECONDS).until(() -> "some new content".equals(reference.get())); + await() + .pollInterval(50, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> "some new content".equals(reference.get())); } } @@ -112,4 +121,3 @@ private String readFileContent(File file) return null; } } - From abdf4736f8fcc23bfcf8ce184694769b7a9edffd Mon Sep 17 00:00:00 2001 From: 0rlych1kk4 Date: Sun, 22 Feb 2026 17:38:15 +0800 Subject: [PATCH 03/13] test: make TestScheduleManager idle status deterministic (avoid race) --- .../core/impl/repair/scheduler/TestScheduleManager.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java index 38305cbdd..487450055 100644 --- a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java @@ -222,7 +222,7 @@ public void testGetCurrentJobStatus() throws InterruptedException } @Test - public void testGetCurrentJobStatusNoRunning() throws InterruptedException + public void testGetCurrentJobStatusNoRunning() { CountDownLatch latch = new CountDownLatch(1); UUID jobId = UUID.randomUUID(); @@ -235,7 +235,10 @@ public void testGetCurrentJobStatusNoRunning() throws InterruptedException jobId, latch); myScheduler.schedule(nodeID1, job1); - new Thread(() -> myScheduler.run(nodeID1)).start(); + + // Deterministic: do NOT start the scheduler thread here. + // This test asserts the idle-state contract (no running job => empty status), + // and starting the scheduler introduces a timing race. assertThat(myScheduler.getCurrentJobStatus()).isEqualTo(""); latch.countDown(); } From 1d8dbf712dd4f3f83bc3a4ff8cc05ef3a052d83a Mon Sep 17 00:00:00 2001 From: 0rlych1kk4 Date: Thu, 26 Feb 2026 11:23:33 +0800 Subject: [PATCH 04/13] style: format ecc_config.py with black (line-length 120) --- ecchronos-binary/src/test/bin/ecc_config.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/ecchronos-binary/src/test/bin/ecc_config.py b/ecchronos-binary/src/test/bin/ecc_config.py index 4ad8dd8e1..262a9b857 100644 --- a/ecchronos-binary/src/test/bin/ecc_config.py +++ b/ecchronos-binary/src/test/bin/ecc_config.py @@ -33,7 +33,6 @@ def __init__( schedule_interval_unit=None, schedule_initial_delay_time=None, schedule_initial_delay_unit=None, - ): self.container_mounts = {} self.datacenter_aware = ( @@ -50,7 +49,6 @@ def __init__( self.schedule_initial_delay_time = schedule_initial_delay_time self.schedule_initial_delay_unit = schedule_initial_delay_unit - def modify_configuration(self): self._modify_ecc_yaml_file() self._modify_security_yaml_file() From 8c6159a2922f9c1060ec665d84c553652d0552a0 Mon Sep 17 00:00:00 2001 From: 0rlych1kk4 Date: Sat, 14 Mar 2026 13:02:56 +0800 Subject: [PATCH 05/13] test: only mount custom schedule config when overrides are requested --- ecchronos-binary/src/test/bin/ecc_config.py | 67 ++++++++++++++------- 1 file changed, 44 insertions(+), 23 deletions(-) diff --git a/ecchronos-binary/src/test/bin/ecc_config.py b/ecchronos-binary/src/test/bin/ecc_config.py index 262a9b857..9ffed7483 100644 --- a/ecchronos-binary/src/test/bin/ecc_config.py +++ b/ecchronos-binary/src/test/bin/ecc_config.py @@ -14,9 +14,11 @@ # limitations under the License. # -import yaml import re import tempfile + +import yaml + import global_variables as global_vars @@ -87,7 +89,6 @@ def _modify_ecc_yaml_file(self): def _modify_connection_configuration(self, data): data["connection"]["cql"]["contactPoints"] = [{"host": self.initial_contact_point, "port": 9042}] - data["connection"]["cql"]["datacenterAware"]["datacenters"] = self.datacenter_aware data["connection"]["cql"]["instanceName"] = self.instance_name data["connection"]["cql"]["localDatacenter"] = self.local_dc @@ -200,7 +201,7 @@ def _uncomment_head_options(self): with open(global_vars.JVM_OPTIONS_FILE_PATH, "r", encoding="utf-8") as file: lines = file.readlines() - result = [pattern.match(l).group(1) + "\n" if pattern.match(l) else l for l in lines] + result = [pattern.match(line).group(1) + "\n" if pattern.match(line) else line for line in lines] self.container_mounts["jvm"] = { "host": self.write_tmp(result, ".options"), @@ -208,7 +209,7 @@ def _uncomment_head_options(self): } def _modify_logback_configuration(self): - with open(global_vars.LOGBACK_FILE_PATH, "r") as file: + with open(global_vars.LOGBACK_FILE_PATH, "r", encoding="utf-8") as file: lines = file.readlines() pattern = re.compile(r'^(\s*)()\s*$') @@ -229,34 +230,54 @@ def _modify_logback_configuration(self): # -------------------------------------------------- # SAFE SCHEDULE PATCH (non-global) # -------------------------------------------------- - def _modify_schedule_configuration(self): - data = self._read_yaml_data(global_vars.SCHEDULE_YAML_FILE_PATH) - - # Only modify when explicitly requested - if any( - [ + def _has_schedule_overrides(self): + return any( + value is not None + for value in ( self.schedule_interval_time, self.schedule_interval_unit, self.schedule_initial_delay_time, self.schedule_initial_delay_unit, - ] - ): - for ks in data.get("keyspaces", []): - if ks.get("name") != "test": + ) + ) + + def _modify_schedule_configuration(self): + # Leave the upstream schedule.yaml untouched unless this instance + # explicitly requests schedule overrides. + if not self._has_schedule_overrides(): + return + + data = self._read_yaml_data(global_vars.SCHEDULE_YAML_FILE_PATH) + + # safe_load may return None for an empty file + if data is None: + data = {} + + if isinstance(data, dict): + for keyspace in data.get("keyspaces") or []: + if not isinstance(keyspace, dict): continue - for tbl in ks.get("tables", []): - if tbl.get("name") != "table1": + if keyspace.get("name") != "test": + continue + + for table in keyspace.get("tables") or []: + if not isinstance(table, dict): + continue + if table.get("name") != "table1": continue + # Only patch timing-related schedule fields. + # Do not touch repair type or unrelated schedule entries. if self.schedule_interval_time is not None: - tbl.setdefault("interval", {})["time"] = self.schedule_interval_time + table.setdefault("interval", {})["time"] = self.schedule_interval_time if self.schedule_interval_unit is not None: - tbl.setdefault("interval", {})["unit"] = self.schedule_interval_unit + table.setdefault("interval", {})["unit"] = self.schedule_interval_unit if self.schedule_initial_delay_time is not None: - tbl.setdefault("initial_delay", {})["time"] = self.schedule_initial_delay_time + table.setdefault("initial_delay", {})["time"] = self.schedule_initial_delay_time if self.schedule_initial_delay_unit is not None: - tbl.setdefault("initial_delay", {})["unit"] = self.schedule_initial_delay_unit + table.setdefault("initial_delay", {})["unit"] = self.schedule_initial_delay_unit + break self.container_mounts["schedule"] = { @@ -266,11 +287,11 @@ def _modify_schedule_configuration(self): # -------------------------------------------------- def _read_yaml_data(self, filename): - with open(filename, "r") as f: - return yaml.safe_load(f) + with open(filename, "r", encoding="utf-8") as file: + return yaml.safe_load(file) def write_tmp(self, data, suffix=".yaml") -> str: - tmp = tempfile.NamedTemporaryFile(mode="w", suffix=suffix, delete=False) + tmp = tempfile.NamedTemporaryFile(mode="w", suffix=suffix, delete=False, encoding="utf-8") if suffix == ".yaml": yaml.safe_dump(data, tmp) else: From 67e9beaa4828dc02b92952c9ceddd81f46bbda0d Mon Sep 17 00:00:00 2001 From: 0rlych1kk4 Date: Sat, 14 Mar 2026 14:11:29 +0800 Subject: [PATCH 06/13] test: verify completion across all repairs --- .../src/test/bin/test_multi_agent_datacenter_aware.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py b/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py index 92a298ac6..10124c3d0 100644 --- a/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py +++ b/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py @@ -188,7 +188,10 @@ def wait_for_repairs_completion_scheduled(executor): def _wait_for_repairs_completion_common(executor): - params = ["-c", "4"] + # Do not limit results, otherwise we may only inspect a subset of repairs + # and get false positives/false negatives in CI. + params = [] + future_ecc_dc1 = executor.submit(verify_repair_completed, ECC_INSTANCE_NAME_DC1, params) future_ecc_dc2 = executor.submit(verify_repair_completed, ECC_INSTANCE_NAME_DC2, params) output_ecc_dc1, _ = future_ecc_dc1.result() From c1eae78f570be8b664eb95a80a67484bb7cbe66a Mon Sep 17 00:00:00 2001 From: 0rlych1kk4 Date: Sat, 14 Mar 2026 14:57:42 +0800 Subject: [PATCH 07/13] test: increase repair query window to avoid partial results --- .../src/test/bin/test_multi_agent_datacenter_aware.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py b/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py index 10124c3d0..9990a9ab5 100644 --- a/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py +++ b/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py @@ -42,6 +42,10 @@ SCHEDULE_INITIAL_DELAY_TIME = 1 SCHEDULE_INITIAL_DELAY_UNIT = "minutes" +# Keep a bounded but sufficiently large query window so shared helper behavior +# remains compatible with the rest of the Python integration suites. +REPAIR_QUERY_COUNT = "50" + def run_repair(container, params): barrier.wait() @@ -188,9 +192,9 @@ def wait_for_repairs_completion_scheduled(executor): def _wait_for_repairs_completion_common(executor): - # Do not limit results, otherwise we may only inspect a subset of repairs - # and get false positives/false negatives in CI. - params = [] + # Keep the CLI shape stable for shared helpers and fetch enough rows to avoid + # only inspecting a partial subset of repairs. + params = ["-c", REPAIR_QUERY_COUNT] future_ecc_dc1 = executor.submit(verify_repair_completed, ECC_INSTANCE_NAME_DC1, params) future_ecc_dc2 = executor.submit(verify_repair_completed, ECC_INSTANCE_NAME_DC2, params) From 6bc0b037ed54134b25ddabcb742527e7e9d0801e Mon Sep 17 00:00:00 2001 From: 0rlych1kk4 Date: Sat, 14 Mar 2026 16:40:32 +0800 Subject: [PATCH 08/13] test: relax on-demand multi-agent repair timeout in CI --- .../bin/test_multi_agent_datacenter_aware.py | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py b/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py index 9990a9ab5..3ff6e6ff4 100644 --- a/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py +++ b/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py @@ -46,6 +46,11 @@ # remains compatible with the rest of the Python integration suites. REPAIR_QUERY_COUNT = "50" +# CI can be slower for parallel datacenter-aware on-demand repairs. +ON_DEMAND_TIMEOUT_SECONDS = 600 +SCHEDULED_TIMEOUT_SECONDS = 600 +REPAIR_POLL_INTERVAL_SECONDS = 10 + def run_repair(container, params): barrier.wait() @@ -142,7 +147,7 @@ def test_lock_concurrency(install_cassandra_cluster, test_environment): try: wait_for_repairs_completion_on_demand(executor) except Exception as e: - pytest.fail(f"Repairs did not complete within 5 minutes: {e}") + pytest.fail(f"Repairs did not complete within {ON_DEMAND_TIMEOUT_SECONDS // 60} minutes: {e}") except Exception as e: logger.error(f"Failed to run behave tests: {e}") @@ -161,7 +166,7 @@ def test_lock_concurrency_scheduled(install_cassandra_cluster, test_environment) try: wait_for_repairs_completion_scheduled(executor) except Exception as e: - pytest.fail(f"Scheduled repairs did not complete within 10 minutes: {e}") + pytest.fail(f"Scheduled repairs did not complete within {SCHEDULED_TIMEOUT_SECONDS // 60} minutes: {e}") except Exception as e: logger.error(f"Failed to run scheduled repair tests: {e}") @@ -179,15 +184,23 @@ def all_repairs_completed(statuses): return all(status == "COMPLETED" for status in statuses) -@retry(stop=stop_after_delay(300), wait=wait_fixed(10), retry=retry_if_result(lambda x: not x)) +@retry( + stop=stop_after_delay(ON_DEMAND_TIMEOUT_SECONDS), + wait=wait_fixed(REPAIR_POLL_INTERVAL_SECONDS), + retry=retry_if_result(lambda x: not x), +) def wait_for_repairs_completion_on_demand(executor): - """Wait for all repairs to complete in both datacenters (on-demand timeout ~5min).""" + """Wait for all repairs to complete in both datacenters for on-demand execution.""" return _wait_for_repairs_completion_common(executor) -@retry(stop=stop_after_delay(600), wait=wait_fixed(10), retry=retry_if_result(lambda x: not x)) +@retry( + stop=stop_after_delay(SCHEDULED_TIMEOUT_SECONDS), + wait=wait_fixed(REPAIR_POLL_INTERVAL_SECONDS), + retry=retry_if_result(lambda x: not x), +) def wait_for_repairs_completion_scheduled(executor): - """Wait for all repairs to complete in both datacenters (scheduled timeout ~10min).""" + """Wait for all repairs to complete in both datacenters for scheduled execution.""" return _wait_for_repairs_completion_common(executor) From 37197434ff96568be982e7fe3901b042c180a40d Mon Sep 17 00:00:00 2001 From: 0rlych1kk4 Date: Sat, 14 Mar 2026 17:14:33 +0800 Subject: [PATCH 09/13] test: parse multi-agent repair statuses more robustly --- .../bin/test_multi_agent_datacenter_aware.py | 37 ++++++++++++++----- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py b/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py index 3ff6e6ff4..980f9dd7c 100644 --- a/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py +++ b/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py @@ -174,14 +174,32 @@ def test_lock_concurrency_scheduled(install_cassandra_cluster, test_environment) def handle_repair_output(output_data): - output_data = output_data.decode("ascii").lstrip().rstrip().split("\n") - rows = output_data[3:-2] - return [row.strip().strip("|").strip() for row in rows] + lines = output_data.decode("ascii").strip().split("\n") + rows = lines[3:-2] + parsed_rows = [row.strip().strip("|").strip() for row in rows if row.strip()] + return [row for row in parsed_rows if row] + + +def extract_terminal_statuses(rows): + terminal_statuses = {"COMPLETED", "RUNNING", "FAILED", "STARTED"} + statuses = [] + + for row in rows: + columns = [column.strip() for column in row.split("|") if column.strip()] + if not columns: + continue + + for column in reversed(columns): + if column in terminal_statuses: + statuses.append(column) + break + + return statuses def all_repairs_completed(statuses): - """Check if all repair statuses are COMPLETED""" - return all(status == "COMPLETED" for status in statuses) + """Check if all visible repair statuses are COMPLETED.""" + return bool(statuses) and all(status == "COMPLETED" for status in statuses) @retry( @@ -205,8 +223,6 @@ def wait_for_repairs_completion_scheduled(executor): def _wait_for_repairs_completion_common(executor): - # Keep the CLI shape stable for shared helpers and fetch enough rows to avoid - # only inspecting a partial subset of repairs. params = ["-c", REPAIR_QUERY_COUNT] future_ecc_dc1 = executor.submit(verify_repair_completed, ECC_INSTANCE_NAME_DC1, params) @@ -214,8 +230,11 @@ def _wait_for_repairs_completion_common(executor): output_ecc_dc1, _ = future_ecc_dc1.result() output_ecc_dc2, _ = future_ecc_dc2.result() - ecc_dc1_statuses = handle_repair_output(output_ecc_dc1) - ecc_dc2_statuses = handle_repair_output(output_ecc_dc2) + ecc_dc1_rows = handle_repair_output(output_ecc_dc1) + ecc_dc2_rows = handle_repair_output(output_ecc_dc2) + + ecc_dc1_statuses = extract_terminal_statuses(ecc_dc1_rows) + ecc_dc2_statuses = extract_terminal_statuses(ecc_dc2_rows) dc1_completed = all_repairs_completed(ecc_dc1_statuses) dc2_completed = all_repairs_completed(ecc_dc2_statuses) From afc4453fceeb58e63fe0b8e407bd1a7d0148d96c Mon Sep 17 00:00:00 2001 From: 0rlych1kk4 Date: Fri, 3 Apr 2026 23:45:07 +0800 Subject: [PATCH 10/13] test: avoid modifying global scheduler frequency (prevent cross-test impact) --- ecchronos-binary/src/test/bin/ecc_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ecchronos-binary/src/test/bin/ecc_config.py b/ecchronos-binary/src/test/bin/ecc_config.py index 9ffed7483..5938c6d0d 100644 --- a/ecchronos-binary/src/test/bin/ecc_config.py +++ b/ecchronos-binary/src/test/bin/ecc_config.py @@ -95,7 +95,7 @@ def _modify_connection_configuration(self, data): return data def _modify_scheduler_configuration(self, data): - data["scheduler"]["frequency"]["time"] = 1 + # return data def _modify_twcs_configuration(self, data): From 05354862c6b1b81143119b33d2a73ce8d0b5f420 Mon Sep 17 00:00:00 2001 From: 0rlych1kk4 Date: Fri, 3 Apr 2026 23:52:53 +0800 Subject: [PATCH 11/13] style: format ecc_config.py with black --- ecchronos-binary/src/test/bin/ecc_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ecchronos-binary/src/test/bin/ecc_config.py b/ecchronos-binary/src/test/bin/ecc_config.py index 5938c6d0d..0c4b6e00a 100644 --- a/ecchronos-binary/src/test/bin/ecc_config.py +++ b/ecchronos-binary/src/test/bin/ecc_config.py @@ -95,7 +95,7 @@ def _modify_connection_configuration(self, data): return data def _modify_scheduler_configuration(self, data): - # + # return data def _modify_twcs_configuration(self, data): From cc7350a37b6323cd6bc17e17c362cdb6594b2564 Mon Sep 17 00:00:00 2001 From: 0rlych1kk4 Date: Sat, 4 Apr 2026 00:12:02 +0800 Subject: [PATCH 12/13] test: always mount schedule.yaml and apply overrides only when specified --- ecchronos-binary/src/test/bin/ecc_config.py | 30 +-------------------- 1 file changed, 1 insertion(+), 29 deletions(-) diff --git a/ecchronos-binary/src/test/bin/ecc_config.py b/ecchronos-binary/src/test/bin/ecc_config.py index 0c4b6e00a..ec162e991 100644 --- a/ecchronos-binary/src/test/bin/ecc_config.py +++ b/ecchronos-binary/src/test/bin/ecc_config.py @@ -30,7 +30,6 @@ def __init__( agent_type=global_vars.DEFAULT_AGENT_TYPE, initial_contact_point=global_vars.DEFAULT_INITIAL_CONTACT_POINT, instance_name=global_vars.DEFAULT_INSTANCE_NAME, - # NEW: schedule knobs (OPT-IN only) schedule_interval_time=None, schedule_interval_unit=None, schedule_initial_delay_time=None, @@ -45,7 +44,6 @@ def __init__( self.initial_contact_point = initial_contact_point self.instance_name = instance_name - # Optional overrides (None = do not modify schedule) self.schedule_interval_time = schedule_interval_time self.schedule_interval_unit = schedule_interval_unit self.schedule_initial_delay_time = schedule_initial_delay_time @@ -64,15 +62,11 @@ def modify_configuration(self): "container": global_vars.CONTAINER_CERTIFICATE_PATH, } - # Per-instance logs (multi-agent safe) self.container_mounts["logs"] = { "host": f"{global_vars.HOST_LOGS_PATH}/{self.instance_name}", "container": global_vars.CONTAINER_LOGS_PATH, } - # -------------------------------------------------- - # ECC YAML - # -------------------------------------------------- def _modify_ecc_yaml_file(self): data = self._read_yaml_data(global_vars.ECC_YAML_FILE_PATH) data = self._modify_connection_configuration(data) @@ -95,7 +89,6 @@ def _modify_connection_configuration(self, data): return data def _modify_scheduler_configuration(self, data): - # return data def _modify_twcs_configuration(self, data): @@ -109,9 +102,6 @@ def _modify_jolokia_configuration(self, data): data["connection"]["jmx"]["jolokia"]["usePem"] = True return data - # -------------------------------------------------- - # SECURITY YAML - # -------------------------------------------------- def _modify_security_yaml_file(self): data = self._read_yaml_data(global_vars.SECURITY_YAML_FILE_PATH) if global_vars.LOCAL != "true": @@ -160,9 +150,6 @@ def _modify_cql_configuration(self, data): data["cql"]["credentials"]["password"] = "cassandra" return data - # -------------------------------------------------- - # APPLICATION YAML - # -------------------------------------------------- def _modify_application_yaml_file(self): data = self._read_yaml_data(global_vars.APPLICATION_YAML_FILE_PATH) if global_vars.LOCAL != "true": @@ -193,9 +180,6 @@ def _modify_spring_doc_configuration(self, data): data["springdoc"]["api-docs"]["show-actuator"] = True return data - # -------------------------------------------------- - # JVM / LOGGING - # -------------------------------------------------- def _uncomment_head_options(self): pattern = re.compile(r"^#\s*(-X.*)") with open(global_vars.JVM_OPTIONS_FILE_PATH, "r", encoding="utf-8") as file: @@ -227,9 +211,6 @@ def _modify_logback_configuration(self): "container": global_vars.CONTAINER_LOGBACK_FILE_PATH, } - # -------------------------------------------------- - # SAFE SCHEDULE PATCH (non-global) - # -------------------------------------------------- def _has_schedule_overrides(self): return any( value is not None @@ -242,18 +223,12 @@ def _has_schedule_overrides(self): ) def _modify_schedule_configuration(self): - # Leave the upstream schedule.yaml untouched unless this instance - # explicitly requests schedule overrides. - if not self._has_schedule_overrides(): - return - data = self._read_yaml_data(global_vars.SCHEDULE_YAML_FILE_PATH) - # safe_load may return None for an empty file if data is None: data = {} - if isinstance(data, dict): + if self._has_schedule_overrides() and isinstance(data, dict): for keyspace in data.get("keyspaces") or []: if not isinstance(keyspace, dict): continue @@ -266,8 +241,6 @@ def _modify_schedule_configuration(self): if table.get("name") != "table1": continue - # Only patch timing-related schedule fields. - # Do not touch repair type or unrelated schedule entries. if self.schedule_interval_time is not None: table.setdefault("interval", {})["time"] = self.schedule_interval_time if self.schedule_interval_unit is not None: @@ -285,7 +258,6 @@ def _modify_schedule_configuration(self): "container": global_vars.CONTAINER_SCHEDULE_YAML_PATH, } - # -------------------------------------------------- def _read_yaml_data(self, filename): with open(filename, "r", encoding="utf-8") as file: return yaml.safe_load(file) From 2625d5fa9381c894f9f10076e346733811ac7f71 Mon Sep 17 00:00:00 2001 From: 0rlych1kk4 Date: Sat, 4 Apr 2026 00:33:52 +0800 Subject: [PATCH 13/13] fix: preserve upstream schedule behavior when YAML is empty --- ecchronos-binary/src/test/bin/ecc_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ecchronos-binary/src/test/bin/ecc_config.py b/ecchronos-binary/src/test/bin/ecc_config.py index ec162e991..5906e1113 100644 --- a/ecchronos-binary/src/test/bin/ecc_config.py +++ b/ecchronos-binary/src/test/bin/ecc_config.py @@ -226,7 +226,7 @@ def _modify_schedule_configuration(self): data = self._read_yaml_data(global_vars.SCHEDULE_YAML_FILE_PATH) if data is None: - data = {} + return if self._has_schedule_overrides() and isinstance(data, dict): for keyspace in data.get("keyspaces") or []: