Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@ public void testGetFileContent() throws Exception
configRefresher.watch(file.toPath(), () -> reference.set(readFileContent(file)));

writeToFile(file, "some content");
await().atMost(1, TimeUnit.SECONDS).until(() -> "some content".equals(reference.get()));
await()
.pollInterval(50, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.until(() -> "some content".equals(reference.get()));

writeToFile(file, "some new content");
await().atMost(1, TimeUnit.SECONDS).until(() -> "some new content".equals(reference.get()));
await()
.pollInterval(50, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.until(() -> "some new content".equals(reference.get()));
}
}

Expand Down Expand Up @@ -79,7 +85,10 @@ public void testRunnableThrowsAtFirstUpdate() throws Exception
shouldThrow.set(false);

writeToFile(file, "some new content");
await().atMost(1, TimeUnit.SECONDS).until(() -> "some new content".equals(reference.get()));
await()
.pollInterval(50, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.until(() -> "some new content".equals(reference.get()));
}
}

Expand Down Expand Up @@ -112,4 +121,3 @@ private String readFileContent(File file)
return null;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void testGetCurrentJobStatus() throws InterruptedException
}

@Test
public void testGetCurrentJobStatusNoRunning() throws InterruptedException
public void testGetCurrentJobStatusNoRunning()
{
CountDownLatch latch = new CountDownLatch(1);
UUID jobId = UUID.randomUUID();
Expand All @@ -235,7 +235,10 @@ public void testGetCurrentJobStatusNoRunning() throws InterruptedException
jobId,
latch);
myScheduler.schedule(nodeID1, job1);
new Thread(() -> myScheduler.run(nodeID1)).start();

// Deterministic: do NOT start the scheduler thread here.
// This test asserts the idle-state contract (no running job => empty status),
// and starting the scheduler introduces a timing race.
assertThat(myScheduler.getCurrentJobStatus()).isEqualTo("");
latch.countDown();
}
Expand Down
169 changes: 100 additions & 69 deletions ecchronos-binary/src/test/bin/ecc_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python3
# vi: syntax=python
#
# Copyright 2025 Telefonaktiebolaget LM Ericsson
# Copyright 2024 Telefonaktiebolaget LM Ericsson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,9 +14,11 @@
# limitations under the License.
#

import yaml
import re
import tempfile

import yaml

import global_variables as global_vars


Expand All @@ -29,6 +30,11 @@ def __init__(
agent_type=global_vars.DEFAULT_AGENT_TYPE,
initial_contact_point=global_vars.DEFAULT_INITIAL_CONTACT_POINT,
instance_name=global_vars.DEFAULT_INSTANCE_NAME,
# NEW: schedule knobs (OPT-IN only)
schedule_interval_time=None,
schedule_interval_unit=None,
schedule_initial_delay_time=None,
schedule_initial_delay_unit=None,
):
self.container_mounts = {}
self.datacenter_aware = (
Expand All @@ -39,31 +45,47 @@ def __init__(
self.initial_contact_point = initial_contact_point
self.instance_name = instance_name

# Optional overrides (None = do not modify schedule)
self.schedule_interval_time = schedule_interval_time
self.schedule_interval_unit = schedule_interval_unit
self.schedule_initial_delay_time = schedule_initial_delay_time
self.schedule_initial_delay_unit = schedule_initial_delay_unit

def modify_configuration(self):
self._modify_ecc_yaml_file()
self._modify_security_yaml_file()
self._modify_application_yaml_file()
self._modify_schedule_configuration()
self._uncomment_head_options()
self._modify_logback_configuration()

self.container_mounts["certificates"] = {
"host": global_vars.CERTIFICATE_DIRECTORY,
"container": global_vars.CONTAINER_CERTIFICATE_PATH,
}

# Per-instance logs (multi-agent safe)
self.container_mounts["logs"] = {
"host": f"{global_vars.HOST_LOGS_PATH}/{self.instance_name}",
"container": global_vars.CONTAINER_LOGS_PATH,
}

# Modify ecc.yaml file
# --------------------------------------------------
# ECC YAML
# --------------------------------------------------
def _modify_ecc_yaml_file(self):
data = self._read_yaml_data(global_vars.ECC_YAML_FILE_PATH)
data = self._modify_connection_configuration(data)
data = self._modify_scheduler_configuration(data)
data = self._modify_twcs_configuration(data)

if global_vars.JOLOKIA_ENABLED == "true":
data = self._modify_jolokia_configuration(data)
self.container_mounts["ecc"] = {"host": self.write_tmp(data), "container": global_vars.CONTAINER_ECC_YAML_PATH}

self.container_mounts["ecc"] = {
"host": self.write_tmp(data),
"container": global_vars.CONTAINER_ECC_YAML_PATH,
}

def _modify_connection_configuration(self, data):
data["connection"]["cql"]["contactPoints"] = [{"host": self.initial_contact_point, "port": 9042}]
Expand All @@ -87,13 +109,16 @@ def _modify_jolokia_configuration(self, data):
data["connection"]["jmx"]["jolokia"]["usePem"] = True
return data

# Modify security.yaml file
# --------------------------------------------------
# SECURITY YAML
# --------------------------------------------------
def _modify_security_yaml_file(self):
data = self._read_yaml_data(global_vars.SECURITY_YAML_FILE_PATH)
if global_vars.LOCAL != "true":
data = self._modify_security_configuration(data)
else:
data = self._modify_cql_configuration(data)

self.container_mounts["security"] = {
"host": self.write_tmp(data),
"container": global_vars.CONTAINER_SECURITY_YAML_PATH,
Expand All @@ -103,6 +128,7 @@ def _modify_security_configuration(self, data):
data["cql"]["credentials"]["enabled"] = True
data["cql"]["credentials"]["username"] = "eccuser"
data["cql"]["credentials"]["password"] = "eccpassword"

data["cql"]["tls"]["enabled"] = True
data["cql"]["tls"]["keystore"] = f"{global_vars.CONTAINER_CERTIFICATE_PATH}/.keystore"
data["cql"]["tls"]["keystore_password"] = "ecctest"
Expand All @@ -113,6 +139,7 @@ def _modify_security_configuration(self, data):
data["jmx"]["credentials"]["username"] = "cassandra"
data["jmx"]["credentials"]["password"] = "cassandra"
data["jmx"]["tls"]["enabled"] = True

if global_vars.PEM_ENABLED != "true":
data["jmx"]["tls"]["keystore"] = f"{global_vars.CONTAINER_CERTIFICATE_PATH}/.keystore"
data["jmx"]["tls"]["keystore_password"] = "ecctest"
Expand All @@ -133,22 +160,23 @@ def _modify_cql_configuration(self, data):
data["cql"]["credentials"]["password"] = "cassandra"
return data

# Modify application.yaml file
# --------------------------------------------------
# APPLICATION YAML
# --------------------------------------------------
def _modify_application_yaml_file(self):
data = self._read_yaml_data(global_vars.APPLICATION_YAML_FILE_PATH)
if global_vars.LOCAL != "true":
data = self._modify_application_configuration(data)

data = self._modify_spring_doc_configuration(data)

self.container_mounts["application"] = {
"host": self.write_tmp(data),
"container": global_vars.CONTAINER_APPLICATION_YAML_PATH,
}

def _modify_application_configuration(self, data):
if "server" not in data:
data["server"] = {}
if "ssl" not in data["server"]:
data["server"]["ssl"] = {}
data.setdefault("server", {}).setdefault("ssl", {})

data["server"]["ssl"]["enabled"] = True
data["server"]["ssl"]["key-store"] = f"{global_vars.CONTAINER_CERTIFICATE_PATH}/serverkeystore"
Expand All @@ -165,39 +193,32 @@ def _modify_spring_doc_configuration(self, data):
data["springdoc"]["api-docs"]["show-actuator"] = True
return data

# --------------------------------------------------
# JVM / LOGGING
# --------------------------------------------------
def _uncomment_head_options(self):
pattern = re.compile(r"^#\s*(-X.*)")
with open(global_vars.JVM_OPTIONS_FILE_PATH, "r", encoding="utf-8") as file:
lines = file.readlines()

result = []

for line in lines:
match = pattern.match(line)
if match:
result.append(match.group(1) + "\n")
else:
result.append(line)
result = [pattern.match(line).group(1) + "\n" if pattern.match(line) else line for line in lines]

self.container_mounts["jvm"] = {
"host": self.write_tmp(result, ".options"),
"container": global_vars.CONTAINER_JVM_OPTION_PATH,
}

def _modify_logback_configuration(self):
with open(global_vars.LOGBACK_FILE_PATH, "r") as file:
with open(global_vars.LOGBACK_FILE_PATH, "r", encoding="utf-8") as file:
lines = file.readlines()

pattern = re.compile(r'^(\s*)(<appender-ref ref="STDOUT" />)\s*$')

result = []

for line in lines:
match = pattern.match(line)
if match:
indent = match.group(1)
content = match.group(2)
result.append(f"{indent}<!-- {content} -->\n")
result.append(f"{match.group(1)}<!-- {match.group(2)} -->\n")
else:
result.append(line)

Expand All @@ -206,61 +227,71 @@ def _modify_logback_configuration(self):
"container": global_vars.CONTAINER_LOGBACK_FILE_PATH,
}

# --------------------------------------------------
# SAFE SCHEDULE PATCH (non-global)
# --------------------------------------------------
def _has_schedule_overrides(self):
return any(
value is not None
for value in (
self.schedule_interval_time,
self.schedule_interval_unit,
self.schedule_initial_delay_time,
self.schedule_initial_delay_unit,
)
)

def _modify_schedule_configuration(self):
# Leave the upstream schedule.yaml untouched unless this instance
# explicitly requests schedule overrides.
if not self._has_schedule_overrides():
return

data = self._read_yaml_data(global_vars.SCHEDULE_YAML_FILE_PATH)
data["keyspaces"] = [
{
"name": "test",
"tables": [
{
"name": "table1",
"interval": {"time": 1, "unit": "days"},
"initial_delay": {"time": 1, "unit": "hours"},
"unwind_ratio": 0.1,
},
{"name": "table3", "enabled": False},
],
},
{
"name": "test2",
"tables": [
{"name": "table1", "repair_type": "incremental"},
{"name": "table2", "repair_type": "parallel_vnode"},
],
},
{
"name": "system_auth",
"tables": [
{"name": "network_permissions", "enabled": False},
{"name": "resource_role_permissons_index", "enabled": False},
{"name": "role_members", "enabled": False},
{"name": "role_permissions", "enabled": False},
{"name": "roles", "enabled": False},
],
},
{
"name": "ecchronos",
"tables": [
{"name": "lock", "enabled": False},
{"name": "lock_priority", "enabled": False},
{"name": "on_demand_repair_status", "enabled": False},
{"name": "reject_configuration", "enabled": False},
{"name": "repair_history", "enabled": True},
],
},
]

# safe_load may return None for an empty file
if data is None:
data = {}

if isinstance(data, dict):
for keyspace in data.get("keyspaces") or []:
if not isinstance(keyspace, dict):
continue
if keyspace.get("name") != "test":
continue

for table in keyspace.get("tables") or []:
if not isinstance(table, dict):
continue
if table.get("name") != "table1":
continue

# Only patch timing-related schedule fields.
# Do not touch repair type or unrelated schedule entries.
if self.schedule_interval_time is not None:
table.setdefault("interval", {})["time"] = self.schedule_interval_time
if self.schedule_interval_unit is not None:
table.setdefault("interval", {})["unit"] = self.schedule_interval_unit

if self.schedule_initial_delay_time is not None:
table.setdefault("initial_delay", {})["time"] = self.schedule_initial_delay_time
if self.schedule_initial_delay_unit is not None:
table.setdefault("initial_delay", {})["unit"] = self.schedule_initial_delay_unit

break

self.container_mounts["schedule"] = {
"host": self.write_tmp(data),
"container": global_vars.CONTAINER_SCHEDULE_YAML_PATH,
}

# --------------------------------------------------
def _read_yaml_data(self, filename):
with open(filename, "r") as f:
data = yaml.safe_load(f)
return data
with open(filename, "r", encoding="utf-8") as file:
return yaml.safe_load(file)

def write_tmp(self, data, suffix=".yaml") -> str:
tmp = tempfile.NamedTemporaryFile(mode="w", suffix=suffix, delete=False)
tmp = tempfile.NamedTemporaryFile(mode="w", suffix=suffix, delete=False, encoding="utf-8")
if suffix == ".yaml":
yaml.safe_dump(data, tmp)
else:
Expand Down
Loading
Loading