diff --git a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfigRefresher.java b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfigRefresher.java index e7077792a..8f8d04f45 100644 --- a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfigRefresher.java +++ b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfigRefresher.java @@ -43,10 +43,16 @@ public void testGetFileContent() throws Exception configRefresher.watch(file.toPath(), () -> reference.set(readFileContent(file))); writeToFile(file, "some content"); - await().atMost(1, TimeUnit.SECONDS).until(() -> "some content".equals(reference.get())); + await() + .pollInterval(50, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> "some content".equals(reference.get())); writeToFile(file, "some new content"); - await().atMost(1, TimeUnit.SECONDS).until(() -> "some new content".equals(reference.get())); + await() + .pollInterval(50, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> "some new content".equals(reference.get())); } } @@ -79,7 +85,10 @@ public void testRunnableThrowsAtFirstUpdate() throws Exception shouldThrow.set(false); writeToFile(file, "some new content"); - await().atMost(1, TimeUnit.SECONDS).until(() -> "some new content".equals(reference.get())); + await() + .pollInterval(50, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> "some new content".equals(reference.get())); } } @@ -112,4 +121,3 @@ private String readFileContent(File file) return null; } } - diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java index 38305cbdd..487450055 100644 --- a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java @@ -222,7 +222,7 @@ public void testGetCurrentJobStatus() throws InterruptedException } @Test - public void testGetCurrentJobStatusNoRunning() throws InterruptedException + public void testGetCurrentJobStatusNoRunning() { CountDownLatch latch = new CountDownLatch(1); UUID jobId = UUID.randomUUID(); @@ -235,7 +235,10 @@ public void testGetCurrentJobStatusNoRunning() throws InterruptedException jobId, latch); myScheduler.schedule(nodeID1, job1); - new Thread(() -> myScheduler.run(nodeID1)).start(); + + // Deterministic: do NOT start the scheduler thread here. + // This test asserts the idle-state contract (no running job => empty status), + // and starting the scheduler introduces a timing race. assertThat(myScheduler.getCurrentJobStatus()).isEqualTo(""); latch.countDown(); } diff --git a/ecchronos-binary/src/test/bin/ecc_config.py b/ecchronos-binary/src/test/bin/ecc_config.py index ab959c449..5906e1113 100644 --- a/ecchronos-binary/src/test/bin/ecc_config.py +++ b/ecchronos-binary/src/test/bin/ecc_config.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 -# vi: syntax=python # -# Copyright 2025 Telefonaktiebolaget LM Ericsson +# Copyright 2024 Telefonaktiebolaget LM Ericsson # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,9 +14,11 @@ # limitations under the License. # -import yaml import re import tempfile + +import yaml + import global_variables as global_vars @@ -29,6 +30,10 @@ def __init__( agent_type=global_vars.DEFAULT_AGENT_TYPE, initial_contact_point=global_vars.DEFAULT_INITIAL_CONTACT_POINT, instance_name=global_vars.DEFAULT_INSTANCE_NAME, + schedule_interval_time=None, + schedule_interval_unit=None, + schedule_initial_delay_time=None, + schedule_initial_delay_unit=None, ): self.container_mounts = {} self.datacenter_aware = ( @@ -39,6 +44,11 @@ def __init__( self.initial_contact_point = initial_contact_point self.instance_name = instance_name + self.schedule_interval_time = schedule_interval_time + self.schedule_interval_unit = schedule_interval_unit + self.schedule_initial_delay_time = schedule_initial_delay_time + self.schedule_initial_delay_unit = schedule_initial_delay_unit + def modify_configuration(self): self._modify_ecc_yaml_file() self._modify_security_yaml_file() @@ -46,24 +56,30 @@ def modify_configuration(self): self._modify_schedule_configuration() self._uncomment_head_options() self._modify_logback_configuration() + self.container_mounts["certificates"] = { "host": global_vars.CERTIFICATE_DIRECTORY, "container": global_vars.CONTAINER_CERTIFICATE_PATH, } + self.container_mounts["logs"] = { "host": f"{global_vars.HOST_LOGS_PATH}/{self.instance_name}", "container": global_vars.CONTAINER_LOGS_PATH, } - # Modify ecc.yaml file def _modify_ecc_yaml_file(self): data = self._read_yaml_data(global_vars.ECC_YAML_FILE_PATH) data = self._modify_connection_configuration(data) data = self._modify_scheduler_configuration(data) data = self._modify_twcs_configuration(data) + if global_vars.JOLOKIA_ENABLED == "true": data = self._modify_jolokia_configuration(data) - self.container_mounts["ecc"] = {"host": self.write_tmp(data), "container": global_vars.CONTAINER_ECC_YAML_PATH} + + self.container_mounts["ecc"] = { + "host": self.write_tmp(data), + "container": global_vars.CONTAINER_ECC_YAML_PATH, + } def _modify_connection_configuration(self, data): data["connection"]["cql"]["contactPoints"] = [{"host": self.initial_contact_point, "port": 9042}] @@ -73,7 +89,6 @@ def _modify_connection_configuration(self, data): return data def _modify_scheduler_configuration(self, data): - data["scheduler"]["frequency"]["time"] = 1 return data def _modify_twcs_configuration(self, data): @@ -87,13 +102,13 @@ def _modify_jolokia_configuration(self, data): data["connection"]["jmx"]["jolokia"]["usePem"] = True return data - # Modify security.yaml file def _modify_security_yaml_file(self): data = self._read_yaml_data(global_vars.SECURITY_YAML_FILE_PATH) if global_vars.LOCAL != "true": data = self._modify_security_configuration(data) else: data = self._modify_cql_configuration(data) + self.container_mounts["security"] = { "host": self.write_tmp(data), "container": global_vars.CONTAINER_SECURITY_YAML_PATH, @@ -103,6 +118,7 @@ def _modify_security_configuration(self, data): data["cql"]["credentials"]["enabled"] = True data["cql"]["credentials"]["username"] = "eccuser" data["cql"]["credentials"]["password"] = "eccpassword" + data["cql"]["tls"]["enabled"] = True data["cql"]["tls"]["keystore"] = f"{global_vars.CONTAINER_CERTIFICATE_PATH}/.keystore" data["cql"]["tls"]["keystore_password"] = "ecctest" @@ -113,6 +129,7 @@ def _modify_security_configuration(self, data): data["jmx"]["credentials"]["username"] = "cassandra" data["jmx"]["credentials"]["password"] = "cassandra" data["jmx"]["tls"]["enabled"] = True + if global_vars.PEM_ENABLED != "true": data["jmx"]["tls"]["keystore"] = f"{global_vars.CONTAINER_CERTIFICATE_PATH}/.keystore" data["jmx"]["tls"]["keystore_password"] = "ecctest" @@ -133,22 +150,20 @@ def _modify_cql_configuration(self, data): data["cql"]["credentials"]["password"] = "cassandra" return data - # Modify application.yaml file def _modify_application_yaml_file(self): data = self._read_yaml_data(global_vars.APPLICATION_YAML_FILE_PATH) if global_vars.LOCAL != "true": data = self._modify_application_configuration(data) + data = self._modify_spring_doc_configuration(data) + self.container_mounts["application"] = { "host": self.write_tmp(data), "container": global_vars.CONTAINER_APPLICATION_YAML_PATH, } def _modify_application_configuration(self, data): - if "server" not in data: - data["server"] = {} - if "ssl" not in data["server"]: - data["server"]["ssl"] = {} + data.setdefault("server", {}).setdefault("ssl", {}) data["server"]["ssl"]["enabled"] = True data["server"]["ssl"]["key-store"] = f"{global_vars.CONTAINER_CERTIFICATE_PATH}/serverkeystore" @@ -170,14 +185,7 @@ def _uncomment_head_options(self): with open(global_vars.JVM_OPTIONS_FILE_PATH, "r", encoding="utf-8") as file: lines = file.readlines() - result = [] - - for line in lines: - match = pattern.match(line) - if match: - result.append(match.group(1) + "\n") - else: - result.append(line) + result = [pattern.match(line).group(1) + "\n" if pattern.match(line) else line for line in lines] self.container_mounts["jvm"] = { "host": self.write_tmp(result, ".options"), @@ -185,19 +193,16 @@ def _uncomment_head_options(self): } def _modify_logback_configuration(self): - with open(global_vars.LOGBACK_FILE_PATH, "r") as file: + with open(global_vars.LOGBACK_FILE_PATH, "r", encoding="utf-8") as file: lines = file.readlines() pattern = re.compile(r'^(\s*)()\s*$') - result = [] for line in lines: match = pattern.match(line) if match: - indent = match.group(1) - content = match.group(2) - result.append(f"{indent}\n") + result.append(f"{match.group(1)}\n") else: result.append(line) @@ -206,61 +211,59 @@ def _modify_logback_configuration(self): "container": global_vars.CONTAINER_LOGBACK_FILE_PATH, } + def _has_schedule_overrides(self): + return any( + value is not None + for value in ( + self.schedule_interval_time, + self.schedule_interval_unit, + self.schedule_initial_delay_time, + self.schedule_initial_delay_unit, + ) + ) + def _modify_schedule_configuration(self): data = self._read_yaml_data(global_vars.SCHEDULE_YAML_FILE_PATH) - data["keyspaces"] = [ - { - "name": "test", - "tables": [ - { - "name": "table1", - "interval": {"time": 1, "unit": "days"}, - "initial_delay": {"time": 1, "unit": "hours"}, - "unwind_ratio": 0.1, - }, - {"name": "table3", "enabled": False}, - ], - }, - { - "name": "test2", - "tables": [ - {"name": "table1", "repair_type": "incremental"}, - {"name": "table2", "repair_type": "parallel_vnode"}, - ], - }, - { - "name": "system_auth", - "tables": [ - {"name": "network_permissions", "enabled": False}, - {"name": "resource_role_permissons_index", "enabled": False}, - {"name": "role_members", "enabled": False}, - {"name": "role_permissions", "enabled": False}, - {"name": "roles", "enabled": False}, - ], - }, - { - "name": "ecchronos", - "tables": [ - {"name": "lock", "enabled": False}, - {"name": "lock_priority", "enabled": False}, - {"name": "on_demand_repair_status", "enabled": False}, - {"name": "reject_configuration", "enabled": False}, - {"name": "repair_history", "enabled": True}, - ], - }, - ] + + if data is None: + return + + if self._has_schedule_overrides() and isinstance(data, dict): + for keyspace in data.get("keyspaces") or []: + if not isinstance(keyspace, dict): + continue + if keyspace.get("name") != "test": + continue + + for table in keyspace.get("tables") or []: + if not isinstance(table, dict): + continue + if table.get("name") != "table1": + continue + + if self.schedule_interval_time is not None: + table.setdefault("interval", {})["time"] = self.schedule_interval_time + if self.schedule_interval_unit is not None: + table.setdefault("interval", {})["unit"] = self.schedule_interval_unit + + if self.schedule_initial_delay_time is not None: + table.setdefault("initial_delay", {})["time"] = self.schedule_initial_delay_time + if self.schedule_initial_delay_unit is not None: + table.setdefault("initial_delay", {})["unit"] = self.schedule_initial_delay_unit + + break + self.container_mounts["schedule"] = { "host": self.write_tmp(data), "container": global_vars.CONTAINER_SCHEDULE_YAML_PATH, } def _read_yaml_data(self, filename): - with open(filename, "r") as f: - data = yaml.safe_load(f) - return data + with open(filename, "r", encoding="utf-8") as file: + return yaml.safe_load(file) def write_tmp(self, data, suffix=".yaml") -> str: - tmp = tempfile.NamedTemporaryFile(mode="w", suffix=suffix, delete=False) + tmp = tempfile.NamedTemporaryFile(mode="w", suffix=suffix, delete=False, encoding="utf-8") if suffix == ".yaml": yaml.safe_dump(data, tmp) else: diff --git a/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py b/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py index 5dffa9ec0..980f9dd7c 100644 --- a/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py +++ b/ecchronos-binary/src/test/bin/test_multi_agent_datacenter_aware.py @@ -14,14 +14,21 @@ # limitations under the License. # -import pytest -import global_variables as global_vars import logging -from conftest import run_ecctool_state_nodes, assert_nodes_size_is_equal, run_ecctool_run_repair, run_ecctool_repairs -from ecc_config import EcchronosConfig from concurrent.futures import ThreadPoolExecutor from threading import Barrier -from tenacity import retry, stop_after_delay, wait_fixed, retry_if_result + +import global_variables as global_vars +import pytest +from ecc_config import EcchronosConfig +from tenacity import retry, retry_if_result, stop_after_delay, wait_fixed + +from conftest import ( + assert_nodes_size_is_equal, + run_ecctool_repairs, + run_ecctool_run_repair, + run_ecctool_state_nodes, +) logger = logging.getLogger(__name__) barrier = Barrier(2) @@ -29,6 +36,21 @@ ECC_INSTANCE_NAME_DC1 = "ecchronos-agent-dc1" ECC_INSTANCE_NAME_DC2 = "ecchronos-agent-dc2" +# Keep schedule values aligned across DCs so contention is deterministic in CI. +SCHEDULE_INTERVAL_TIME = 1 +SCHEDULE_INTERVAL_UNIT = "minutes" +SCHEDULE_INITIAL_DELAY_TIME = 1 +SCHEDULE_INITIAL_DELAY_UNIT = "minutes" + +# Keep a bounded but sufficiently large query window so shared helper behavior +# remains compatible with the rest of the Python integration suites. +REPAIR_QUERY_COUNT = "50" + +# CI can be slower for parallel datacenter-aware on-demand repairs. +ON_DEMAND_TIMEOUT_SECONDS = 600 +SCHEDULED_TIMEOUT_SECONDS = 600 +REPAIR_POLL_INTERVAL_SECONDS = 10 + def run_repair(container, params): barrier.wait() @@ -47,9 +69,18 @@ def test_install_cassandra_cluster(install_cassandra_cluster): @pytest.mark.dependency(name="test_install_ecchronos_dc1", depends=["test_install_cassandra_cluster"]) def test_install_ecchronos_dc1(install_cassandra_cluster, test_environment): dcs = [{"name": global_vars.DC1}] - ecchronos_config = EcchronosConfig(datacenter_aware=dcs, instance_name=ECC_INSTANCE_NAME_DC1) + ecchronos_config = EcchronosConfig( + datacenter_aware=dcs, + instance_name=ECC_INSTANCE_NAME_DC1, + schedule_interval_time=SCHEDULE_INTERVAL_TIME, + schedule_interval_unit=SCHEDULE_INTERVAL_UNIT, + schedule_initial_delay_time=SCHEDULE_INITIAL_DELAY_TIME, + schedule_initial_delay_unit=SCHEDULE_INITIAL_DELAY_UNIT, + ) test_environment.start_ecchronos( - suffix="dc1", cassandra_network=install_cassandra_cluster.network, ecchronos_config=ecchronos_config + suffix="dc1", + cassandra_network=install_cassandra_cluster.network, + ecchronos_config=ecchronos_config, ) test_environment.wait_for_ecchronos_ready() out, _ = run_ecctool_state_nodes(ECC_INSTANCE_NAME_DC1) @@ -64,6 +95,10 @@ def test_install_ecchronos_dc2(install_cassandra_cluster, test_environment): instance_name=ECC_INSTANCE_NAME_DC2, local_dc=global_vars.DC2, initial_contact_point=global_vars.DEFAULT_SEED_IP_DC2, + schedule_interval_time=SCHEDULE_INTERVAL_TIME, + schedule_interval_unit=SCHEDULE_INTERVAL_UNIT, + schedule_initial_delay_time=SCHEDULE_INITIAL_DELAY_TIME, + schedule_initial_delay_unit=SCHEDULE_INITIAL_DELAY_UNIT, ) test_environment.start_ecchronos( suffix="dc2", @@ -85,7 +120,6 @@ def test_lock_concurrency(install_cassandra_cluster, test_environment): logger.info("Running parallel on demand jobs inside instances") try: - with ThreadPoolExecutor(max_workers=2) as executor: future_ecc1_dc1 = executor.submit(run_repair, ECC_INSTANCE_NAME_DC1, ["-k", "test", "--all"]) future_ecc1_dc2 = executor.submit(run_repair, ECC_INSTANCE_NAME_DC2, ["-k", "test", "--all"]) @@ -98,7 +132,9 @@ def test_lock_concurrency(install_cassandra_cluster, test_environment): _, exit_code_ecc2_dc2 = future_ecc2_dc2.result() logger.info( - f"On Demand Jobs created with exit code: ecchronos-agent-dc1: {exit_code_ecc1_dc1, exit_code_ecc2_dc1}, ecchronos-agent-dc2: {exit_code_ecc1_dc2, exit_code_ecc2_dc2}" + "On Demand Jobs created with exit code: " + f"{ECC_INSTANCE_NAME_DC1}: {(exit_code_ecc1_dc1, exit_code_ecc2_dc1)}, " + f"{ECC_INSTANCE_NAME_DC2}: {(exit_code_ecc1_dc2, exit_code_ecc2_dc2)}" ) if any( @@ -108,41 +144,97 @@ def test_lock_concurrency(install_cassandra_cluster, test_environment): logger.error("Fail to create on demand jobs") pytest.fail("Fail to create on demand jobs") - # Wait for jobs to be finished in both instances within a time limit try: - wait_for_repairs_completion(executor) + wait_for_repairs_completion_on_demand(executor) except Exception as e: - logger.error(f"Timeout: Repairs did not complete within 5 minutes") - pytest.fail(f"Repairs did not complete within 5 minutes: {e}") + pytest.fail(f"Repairs did not complete within {ON_DEMAND_TIMEOUT_SECONDS // 60} minutes: {e}") except Exception as e: logger.error(f"Failed to run behave tests: {e}") pytest.fail(f"Failed to run behave tests: {e}") +@pytest.mark.dependency( + name="test_lock_concurrency_scheduled", + depends=["test_install_cassandra_cluster", "test_install_ecchronos_dc1", "test_install_ecchronos_dc2"], +) +def test_lock_concurrency_scheduled(install_cassandra_cluster, test_environment): + logger.info("Waiting for scheduled repair jobs to run in both instances") + + try: + with ThreadPoolExecutor(max_workers=2) as executor: + try: + wait_for_repairs_completion_scheduled(executor) + except Exception as e: + pytest.fail(f"Scheduled repairs did not complete within {SCHEDULED_TIMEOUT_SECONDS // 60} minutes: {e}") + + except Exception as e: + logger.error(f"Failed to run scheduled repair tests: {e}") + pytest.fail(f"Failed to run scheduled repair tests: {e}") + + def handle_repair_output(output_data): - output_data = output_data.decode("ascii").lstrip().rstrip().split("\n") - rows = output_data[3:-2] - # Clean each row: remove pipes and strip whitespace - return [row.strip().strip("|").strip() for row in rows] + lines = output_data.decode("ascii").strip().split("\n") + rows = lines[3:-2] + parsed_rows = [row.strip().strip("|").strip() for row in rows if row.strip()] + return [row for row in parsed_rows if row] + + +def extract_terminal_statuses(rows): + terminal_statuses = {"COMPLETED", "RUNNING", "FAILED", "STARTED"} + statuses = [] + + for row in rows: + columns = [column.strip() for column in row.split("|") if column.strip()] + if not columns: + continue + + for column in reversed(columns): + if column in terminal_statuses: + statuses.append(column) + break + + return statuses def all_repairs_completed(statuses): - """Check if all repair statuses are COMPLETED""" - return all(status == "COMPLETED" for status in statuses) + """Check if all visible repair statuses are COMPLETED.""" + return bool(statuses) and all(status == "COMPLETED" for status in statuses) + + +@retry( + stop=stop_after_delay(ON_DEMAND_TIMEOUT_SECONDS), + wait=wait_fixed(REPAIR_POLL_INTERVAL_SECONDS), + retry=retry_if_result(lambda x: not x), +) +def wait_for_repairs_completion_on_demand(executor): + """Wait for all repairs to complete in both datacenters for on-demand execution.""" + return _wait_for_repairs_completion_common(executor) -@retry(stop=stop_after_delay(600), wait=wait_fixed(10), retry=retry_if_result(lambda x: not x)) -def wait_for_repairs_completion(executor): - """Wait for all repairs to complete in both datacenters""" - params = ["-c", "4"] +@retry( + stop=stop_after_delay(SCHEDULED_TIMEOUT_SECONDS), + wait=wait_fixed(REPAIR_POLL_INTERVAL_SECONDS), + retry=retry_if_result(lambda x: not x), +) +def wait_for_repairs_completion_scheduled(executor): + """Wait for all repairs to complete in both datacenters for scheduled execution.""" + return _wait_for_repairs_completion_common(executor) + + +def _wait_for_repairs_completion_common(executor): + params = ["-c", REPAIR_QUERY_COUNT] + future_ecc_dc1 = executor.submit(verify_repair_completed, ECC_INSTANCE_NAME_DC1, params) future_ecc_dc2 = executor.submit(verify_repair_completed, ECC_INSTANCE_NAME_DC2, params) output_ecc_dc1, _ = future_ecc_dc1.result() output_ecc_dc2, _ = future_ecc_dc2.result() - ecc_dc1_statuses = handle_repair_output(output_ecc_dc1) - ecc_dc2_statuses = handle_repair_output(output_ecc_dc2) + ecc_dc1_rows = handle_repair_output(output_ecc_dc1) + ecc_dc2_rows = handle_repair_output(output_ecc_dc2) + + ecc_dc1_statuses = extract_terminal_statuses(ecc_dc1_rows) + ecc_dc2_statuses = extract_terminal_statuses(ecc_dc2_rows) dc1_completed = all_repairs_completed(ecc_dc1_statuses) dc2_completed = all_repairs_completed(ecc_dc2_statuses) @@ -152,6 +244,8 @@ def wait_for_repairs_completion(executor): return True logger.info( - f"Waiting for repairs... DC1: {sum(1 for s in ecc_dc1_statuses if s == 'COMPLETED')}/{len(ecc_dc1_statuses)}, DC2: {sum(1 for s in ecc_dc2_statuses if s == 'COMPLETED')}/{len(ecc_dc2_statuses)}" + "Waiting for repairs... " + f"DC1: {sum(1 for s in ecc_dc1_statuses if s == 'COMPLETED')}/{len(ecc_dc1_statuses)}, " + f"DC2: {sum(1 for s in ecc_dc2_statuses if s == 'COMPLETED')}/{len(ecc_dc2_statuses)}" ) return False