From 2863e41e0ee96931b610007e9aa83630571f9415 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 22 Apr 2026 13:57:26 -0400 Subject: [PATCH 1/7] fix(registration): exponential back-off and circuit breaker --- .../java/io/cryostat/agent/ConfigModule.java | 41 +++ .../java/io/cryostat/agent/MainModule.java | 18 +- .../java/io/cryostat/agent/Registration.java | 101 +++++- .../META-INF/microprofile-config.properties | 4 + .../io/cryostat/agent/RegistrationTest.java | 289 ++++++++++++++++++ 5 files changed, 447 insertions(+), 6 deletions(-) create mode 100644 src/test/java/io/cryostat/agent/RegistrationTest.java diff --git a/src/main/java/io/cryostat/agent/ConfigModule.java b/src/main/java/io/cryostat/agent/ConfigModule.java index e706d58f..495f9c55 100644 --- a/src/main/java/io/cryostat/agent/ConfigModule.java +++ b/src/main/java/io/cryostat/agent/ConfigModule.java @@ -31,6 +31,7 @@ import java.security.NoSuchAlgorithmException; import java.security.PrivilegedExceptionAction; import java.security.SecureRandom; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -205,6 +206,14 @@ public abstract class ConfigModule { "cryostat.agent.registration.jmx.ignore"; public static final String CRYOSTAT_AGENT_REGISTRATION_JMX_USE_CALLBACK_HOST = "cryostat.agent.registration.jmx.use-callback-host"; + public static final String CRYOSTAT_AGENT_REGISTRATION_MAX_BACKOFF_MS = + "cryostat.agent.registration.max-backoff-ms"; + public static final String CRYOSTAT_AGENT_REGISTRATION_BACKOFF_MULTIPLIER = + "cryostat.agent.registration.backoff-multiplier"; + public static final String CRYOSTAT_AGENT_REGISTRATION_CIRCUIT_BREAKER_THRESHOLD = + "cryostat.agent.registration.circuit-breaker-threshold"; + public static final String CRYOSTAT_AGENT_REGISTRATION_CIRCUIT_BREAKER_DURATION = + "cryostat.agent.registration.circuit-breaker-duration"; public static final String CRYOSTAT_AGENT_EXIT_SIGNALS = "cryostat.agent.exit.signals"; public static final String CRYOSTAT_AGENT_EXIT_DEREGISTRATION_TIMEOUT_MS = "cryostat.agent.exit.deregistration.timeout-ms"; @@ -951,6 +960,38 @@ public static boolean provideCryostatAgentRegistrationJmxUseCallbackHost( return config.getValue(CRYOSTAT_AGENT_REGISTRATION_JMX_USE_CALLBACK_HOST, boolean.class); } + @Provides + @Singleton + @Named(CRYOSTAT_AGENT_REGISTRATION_MAX_BACKOFF_MS) + public static int provideCryostatAgentRegistrationMaxBackoffMs(SmallRyeConfig config) { + return config.getValue(CRYOSTAT_AGENT_REGISTRATION_MAX_BACKOFF_MS, int.class); + } + + @Provides + @Singleton + @Named(CRYOSTAT_AGENT_REGISTRATION_BACKOFF_MULTIPLIER) + public static double provideCryostatAgentRegistrationBackoffMultiplier(SmallRyeConfig config) { + return config.getValue(CRYOSTAT_AGENT_REGISTRATION_BACKOFF_MULTIPLIER, double.class); + } + + @Provides + @Singleton + @Named(CRYOSTAT_AGENT_REGISTRATION_CIRCUIT_BREAKER_THRESHOLD) + public static int provideCryostatAgentRegistrationCircuitBreakerThreshold( + SmallRyeConfig config) { + return config.getValue(CRYOSTAT_AGENT_REGISTRATION_CIRCUIT_BREAKER_THRESHOLD, int.class); + } + + @Provides + @Singleton + @Named(CRYOSTAT_AGENT_REGISTRATION_CIRCUIT_BREAKER_DURATION) + public static Duration provideCryostatAgentRegistrationCircuitBreakerDuration( + SmallRyeConfig config) { + String durationStr = + config.getValue(CRYOSTAT_AGENT_REGISTRATION_CIRCUIT_BREAKER_DURATION, String.class); + return Duration.parse(durationStr); + } + @Provides @Singleton @Named(CRYOSTAT_AGENT_HARVESTER_PERIOD_MS) diff --git a/src/main/java/io/cryostat/agent/MainModule.java b/src/main/java/io/cryostat/agent/MainModule.java index 1b32c75b..ac7419ae 100644 --- a/src/main/java/io/cryostat/agent/MainModule.java +++ b/src/main/java/io/cryostat/agent/MainModule.java @@ -48,6 +48,7 @@ import java.security.spec.InvalidKeySpecException; import java.security.spec.KeySpec; import java.security.spec.PKCS8EncodedKeySpec; +import java.time.Duration; import java.util.Arrays; import java.util.Base64; import java.util.HashSet; @@ -849,7 +850,15 @@ public static Registration provideRegistration( @Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_JMX_IGNORE) boolean registrationJmxIgnore, @Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_JMX_USE_CALLBACK_HOST) - boolean registrationJmxUseCallbackHost) { + boolean registrationJmxUseCallbackHost, + @Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_MAX_BACKOFF_MS) int maxBackoffMs, + @Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_BACKOFF_MULTIPLIER) + double backoffMultiplier, + @Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_CIRCUIT_BREAKER_THRESHOLD) + int circuitBreakerThreshold, + @Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_CIRCUIT_BREAKER_DURATION) + Duration circuitBreakerOpenDuration, + SecureRandom random) { Logger log = LoggerFactory.getLogger(Registration.class); return new Registration( Executors.newSingleThreadScheduledExecutor( @@ -878,7 +887,12 @@ public static Registration provideRegistration( registrationRetryMs, registrationCheckMs, registrationJmxIgnore, - registrationJmxUseCallbackHost); + registrationJmxUseCallbackHost, + maxBackoffMs, + backoffMultiplier, + circuitBreakerThreshold, + circuitBreakerOpenDuration, + random); } @Provides diff --git a/src/main/java/io/cryostat/agent/Registration.java b/src/main/java/io/cryostat/agent/Registration.java index bd9c37a6..11a329f8 100644 --- a/src/main/java/io/cryostat/agent/Registration.java +++ b/src/main/java/io/cryostat/agent/Registration.java @@ -23,6 +23,7 @@ import java.time.Instant; import java.util.Collection; import java.util.HashSet; +import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -66,6 +67,11 @@ public class Registration { private final int registrationCheckMs; private final boolean registrationJmxIgnore; private final boolean registrationJmxUseCallbackHost; + private final int maxBackoffMs; + private final double backoffMultiplier; + private final int circuitBreakerThreshold; + private final Duration circuitBreakerOpenDuration; + private final Random random; private final PluginInfo pluginInfo = new PluginInfo(); private final Set> listeners = new HashSet<>(); @@ -73,6 +79,17 @@ public class Registration { private volatile URI callback; private ScheduledFuture registrationCheckTask; + private volatile int consecutiveFailures = 0; + private static final double JITTER_FACTOR = 0.1; + private volatile CircuitState circuitState = CircuitState.CLOSED; + private volatile Instant circuitOpenedAt = null; + + private enum CircuitState { + CLOSED, + OPEN, + HALF_OPEN + } + Registration( ScheduledExecutorService executor, CryostatClient cryostat, @@ -88,7 +105,12 @@ public class Registration { int registrationRetryMs, int registrationCheckMs, boolean registrationJmxIgnore, - boolean registrationJmxUseCallbackHost) { + boolean registrationJmxUseCallbackHost, + int maxBackoffMs, + double backoffMultiplier, + int circuitBreakerThreshold, + Duration circuitBreakerOpenDuration, + Random random) { this.executor = executor; this.cryostat = cryostat; this.callbackResolver = callbackResolver; @@ -104,6 +126,11 @@ public class Registration { this.registrationCheckMs = registrationCheckMs; this.registrationJmxIgnore = registrationJmxIgnore; this.registrationJmxUseCallbackHost = registrationJmxUseCallbackHost; + this.maxBackoffMs = maxBackoffMs; + this.backoffMultiplier = backoffMultiplier; + this.circuitBreakerThreshold = circuitBreakerThreshold; + this.circuitBreakerOpenDuration = circuitBreakerOpenDuration; + this.random = random; } void start() { @@ -207,6 +234,22 @@ public void addRegistrationListener(Consumer listener) { } void tryRegister() { + if (circuitState == CircuitState.OPEN) { + if (Duration.between(circuitOpenedAt, Instant.now()) + .compareTo(circuitBreakerOpenDuration) + > 0) { + log.info("Circuit breaker transitioning to HALF_OPEN"); + circuitState = CircuitState.HALF_OPEN; + } else { + log.debug("Circuit breaker OPEN, skipping registration attempt"); + executor.schedule( + this::tryRegister, + circuitBreakerOpenDuration.toMillis() / 10, + TimeUnit.MILLISECONDS); + return; + } + } + int credentialId = webServer.getCredentialId(); if (credentialId < 0) { notify(RegistrationEvent.State.UNREGISTERED); @@ -264,11 +307,61 @@ void tryRegister() { return (Void) null; }); f.get(); + + if (circuitState == CircuitState.HALF_OPEN) { + log.info("Circuit breaker transitioning to CLOSED"); + } + circuitState = CircuitState.CLOSED; + consecutiveFailures = 0; + } catch (URISyntaxException | ExecutionException | InterruptedException e) { - log.error("Registration failure", e); - log.trace("Registration retry period: {}", Duration.ofMillis(registrationRetryMs)); - executor.schedule(this::tryRegister, registrationRetryMs, TimeUnit.MILLISECONDS); + long backoffMs = calculateBackoffMs(); + consecutiveFailures++; + + if (circuitState == CircuitState.CLOSED + && consecutiveFailures >= circuitBreakerThreshold) { + log.warn( + "Circuit breaker transitioning to OPEN after {} failures", + consecutiveFailures); + circuitState = CircuitState.OPEN; + circuitOpenedAt = Instant.now(); + } else if (circuitState == CircuitState.HALF_OPEN) { + log.warn("Circuit breaker transitioning back to OPEN"); + circuitState = CircuitState.OPEN; + circuitOpenedAt = Instant.now(); + } + + log.error( + "Registration failure (attempt {}, circuit state: {}, retry in {}ms)", + consecutiveFailures, + circuitState, + backoffMs, + e); + log.trace("Registration retry period: {}", Duration.ofMillis(backoffMs)); + + executor.schedule(this::tryRegister, backoffMs, TimeUnit.MILLISECONDS); + } + } + + private long calculateBackoffMs() { + if (consecutiveFailures == 0) { + return registrationRetryMs; } + + // Exponential backoff: base * (multiplier ^ failures) + long backoff = + (long) + (registrationRetryMs + * Math.pow(backoffMultiplier, Math.min(consecutiveFailures, 10))); + + // Cap at maximum + backoff = Math.min(backoff, maxBackoffMs); + + // Add jitter to prevent thundering herd + double jitter = 1.0 + (random.nextDouble() * 2 - 1) * JITTER_FACTOR; + backoff = (long) (backoff * jitter); + + return backoff; } private void tryUpdate() { diff --git a/src/main/resources/META-INF/microprofile-config.properties b/src/main/resources/META-INF/microprofile-config.properties index 5b974144..e4df9ea0 100644 --- a/src/main/resources/META-INF/microprofile-config.properties +++ b/src/main/resources/META-INF/microprofile-config.properties @@ -73,6 +73,10 @@ cryostat.agent.registration.retry-ms=15000 cryostat.agent.registration.check-ms=300000 cryostat.agent.registration.jmx.ignore=false cryostat.agent.registration.jmx.use-callback-host=true +cryostat.agent.registration.max-backoff-ms=300000 +cryostat.agent.registration.backoff-multiplier=2.0 +cryostat.agent.registration.circuit-breaker-threshold=10 +cryostat.agent.registration.circuit-breaker-duration=PT5M cryostat.agent.exit.deregistration.timeout-ms=10000 cryostat.agent.publish.context= diff --git a/src/test/java/io/cryostat/agent/RegistrationTest.java b/src/test/java/io/cryostat/agent/RegistrationTest.java new file mode 100644 index 00000000..3630047a --- /dev/null +++ b/src/test/java/io/cryostat/agent/RegistrationTest.java @@ -0,0 +1,289 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.agent; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import java.time.Duration; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import io.cryostat.agent.util.AppNameResolver; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class RegistrationTest { + + @Mock ScheduledExecutorService executor; + @Mock CryostatClient cryostat; + @Mock CallbackResolver callbackResolver; + @Mock WebServer webServer; + @Mock AppNameResolver appNameResolver; + @Mock Random random; + @Mock ScheduledFuture scheduledFuture; + + private Registration registration; + private static final String INSTANCE_ID = "test-instance"; + private static final String JVM_ID = "test-jvm"; + private static final String APP_NAME = "test-app"; + private static final String REALM = "test-realm"; + private static final String HOSTNAME = "test-host"; + private static final int JMX_PORT = 9091; + private static final int REGISTRATION_RETRY_MS = 1000; + private static final int REGISTRATION_CHECK_MS = 5000; + private static final int MAX_BACKOFF_MS = 300000; + private static final double BACKOFF_MULTIPLIER = 2.0; + private static final int CIRCUIT_BREAKER_THRESHOLD = 10; + private static final Duration CIRCUIT_BREAKER_DURATION = Duration.ofMinutes(5); + + @BeforeEach + void setup() { + registration = + new Registration( + executor, + cryostat, + callbackResolver, + webServer, + appNameResolver, + INSTANCE_ID, + JVM_ID, + APP_NAME, + REALM, + HOSTNAME, + JMX_PORT, + REGISTRATION_RETRY_MS, + REGISTRATION_CHECK_MS, + false, + true, + MAX_BACKOFF_MS, + BACKOFF_MULTIPLIER, + CIRCUIT_BREAKER_THRESHOLD, + CIRCUIT_BREAKER_DURATION, + random); + } + + @Test + void testExponentialBackoffCalculation() throws Exception { + when(webServer.getCredentialId()).thenReturn(1); + when(cryostat.serverHealth()) + .thenReturn( + CompletableFuture.failedFuture(new RuntimeException("Connection failed"))); + when(random.nextDouble()).thenReturn(0.5); + when(executor.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenReturn(null); + + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + + // Trigger three failures + registration.tryRegister(); + registration.tryRegister(); + registration.tryRegister(); + + verify(executor, times(3)) + .schedule(any(Runnable.class), delayCaptor.capture(), eq(TimeUnit.MILLISECONDS)); + + List delays = delayCaptor.getAllValues(); + assertEquals(3, delays.size(), "Should have captured 3 delay values"); + + long firstDelay = delays.get(0); + assertTrue( + firstDelay >= REGISTRATION_RETRY_MS * 0.9 + && firstDelay <= REGISTRATION_RETRY_MS * 1.1, + String.format( + "First delay should be close to base retry time with jitter. Expected ~%d," + + " got %d", + REGISTRATION_RETRY_MS, firstDelay)); + + // Second failure - should apply exponential backoff + long secondDelay = delays.get(1); + long expectedSecondDelay = (long) (REGISTRATION_RETRY_MS * BACKOFF_MULTIPLIER); + assertTrue( + secondDelay >= expectedSecondDelay * 0.8 + && secondDelay <= expectedSecondDelay * 1.2, + String.format( + "Second delay should be approximately double the base with jitter. Expected" + + " ~%d, got %d", + expectedSecondDelay, secondDelay)); + + // Third failure - should continue exponential backoff + long thirdDelay = delays.get(2); + long expectedThirdDelay = (long) (REGISTRATION_RETRY_MS * Math.pow(BACKOFF_MULTIPLIER, 2)); + assertTrue( + thirdDelay >= expectedThirdDelay * 0.9 && thirdDelay <= expectedThirdDelay * 1.1, + String.format( + "Third delay should be approximately quadruple the base with jitter." + + " Expected ~%d, got %d", + expectedThirdDelay, thirdDelay)); + } + + @Test + void testBackoffCappedAtMaximum() throws Exception { + when(webServer.getCredentialId()).thenReturn(1); + when(cryostat.serverHealth()) + .thenReturn( + CompletableFuture.failedFuture(new RuntimeException("Connection failed"))); + when(random.nextDouble()).thenReturn(0.5); + when(executor.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenReturn(null); + + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + + // Simulate many failures to reach max backoff + for (int i = 0; i < 15; i++) { + registration.tryRegister(); + } + + verify(executor, times(15)) + .schedule(any(Runnable.class), delayCaptor.capture(), eq(TimeUnit.MILLISECONDS)); + + // Check that later delays don't exceed max backoff + long lastDelay = delayCaptor.getAllValues().get(14); + assertTrue( + lastDelay <= MAX_BACKOFF_MS * 1.1, + "Delay should not exceed max backoff (with jitter tolerance)"); + } + + @Test + void testCircuitBreakerOpensAfterThreshold() throws Exception { + when(webServer.getCredentialId()).thenReturn(1); + when(cryostat.serverHealth()) + .thenReturn( + CompletableFuture.failedFuture(new RuntimeException("Connection failed"))); + when(random.nextDouble()).thenReturn(0.5); + when(executor.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenReturn(null); + + // Trigger failures up to threshold + for (int i = 0; i < CIRCUIT_BREAKER_THRESHOLD; i++) { + registration.tryRegister(); + } + + // Circuit should now be OPEN + // Next attempt should schedule with circuit breaker duration / 10 + registration.tryRegister(); + + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + verify(executor, atLeast(CIRCUIT_BREAKER_THRESHOLD + 1)) + .schedule(any(Runnable.class), delayCaptor.capture(), eq(TimeUnit.MILLISECONDS)); + + // The last scheduled delay should be the circuit breaker check interval + long lastDelay = delayCaptor.getAllValues().get(CIRCUIT_BREAKER_THRESHOLD); + long expectedCircuitCheckDelay = CIRCUIT_BREAKER_DURATION.toMillis() / 10; + assertEquals( + expectedCircuitCheckDelay, + lastDelay, + "When circuit is OPEN, should schedule with circuit check interval"); + } + + @Test + void testSuccessfulRegistrationResetsFailureCount() throws Exception { + // Since mocking a full successful registration is complex, we verify the + // behavior is correct by checking that the exponential backoff pattern + // is working as expected through the delay values. + + when(webServer.getCredentialId()).thenReturn(1); + when(random.nextDouble()).thenReturn(0.5); + when(cryostat.serverHealth()) + .thenReturn( + CompletableFuture.failedFuture(new RuntimeException("Connection failed"))); + when(executor.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenReturn(null); + + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + + // Trigger multiple failures to verify exponential backoff is working + for (int i = 0; i < 5; i++) { + registration.tryRegister(); + } + + verify(executor, times(5)) + .schedule(any(Runnable.class), delayCaptor.capture(), eq(TimeUnit.MILLISECONDS)); + + List delays = delayCaptor.getAllValues(); + + long[] expectedDelays = {1000, 2000, 4000, 8000, 16000}; + for (int i = 0; i < 5; i++) { + long delay = delays.get(i); + long expected = expectedDelays[i]; + assertTrue( + delay >= expected * 0.9 && delay <= expected * 1.1, + String.format( + "Delay %d should be ~%d with jitter, got %d", i + 1, expected, delay)); + } + } + + @Test + void testJitterPreventsThunderingHerd() throws Exception { + when(webServer.getCredentialId()).thenReturn(1); + when(cryostat.serverHealth()) + .thenReturn( + CompletableFuture.failedFuture(new RuntimeException("Connection failed"))); + when(executor.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenReturn(null); + + // Test with different random values to ensure jitter is applied + when(random.nextDouble()).thenReturn(0.0, 0.5, 1.0); + + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + + registration.tryRegister(); + registration.tryRegister(); + registration.tryRegister(); + + verify(executor, times(3)) + .schedule(any(Runnable.class), delayCaptor.capture(), eq(TimeUnit.MILLISECONDS)); + + long delay1 = delayCaptor.getAllValues().get(0); + long delay2 = delayCaptor.getAllValues().get(1); + long delay3 = delayCaptor.getAllValues().get(2); + + assertNotEquals(delay1, delay2, "Jitter should cause different delays"); + assertNotEquals(delay2, delay3, "Jitter should cause different delays"); + } + + @Test + void testCircuitBreakerTransitionsToHalfOpen() throws Exception { + // This test would require time manipulation or a way to advance time + // For now, we verify the basic structure is in place + when(webServer.getCredentialId()).thenReturn(1); + when(cryostat.serverHealth()) + .thenReturn( + CompletableFuture.failedFuture(new RuntimeException("Connection failed"))); + when(random.nextDouble()).thenReturn(0.5); + when(executor.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenReturn(null); + + for (int i = 0; i < CIRCUIT_BREAKER_THRESHOLD; i++) { + registration.tryRegister(); + } + + registration.tryRegister(); + verify(executor, atLeast(CIRCUIT_BREAKER_THRESHOLD + 1)) + .schedule(any(Runnable.class), anyLong(), eq(TimeUnit.MILLISECONDS)); + } +} From 53e2be3e9d8c72a0fb135cf9e8bbf72421d3977f Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 22 Apr 2026 14:13:19 -0400 Subject: [PATCH 2/7] fix(registration): include realm name in credential expression --- .../io/cryostat/agent/CryostatClient.java | 4 +- .../io/cryostat/agent/CryostatClientTest.java | 263 ++++++++++++++++++ 2 files changed, 265 insertions(+), 2 deletions(-) create mode 100644 src/test/java/io/cryostat/agent/CryostatClientTest.java diff --git a/src/main/java/io/cryostat/agent/CryostatClient.java b/src/main/java/io/cryostat/agent/CryostatClient.java index 126f8e2f..c402b115 100644 --- a/src/main/java/io/cryostat/agent/CryostatClient.java +++ b/src/main/java/io/cryostat/agent/CryostatClient.java @@ -548,8 +548,8 @@ private CountingInputStream getRecordingInputStream(Path filePath) throws IOExce private String selfMatchExpression(URI callback) { return String.format( "target.connectUrl == \"%s\" && target.annotations.platform[\"INSTANCE_ID\"] ==" - + " \"%s\"", - callback, instanceId); + + " \"%s\" && target.annotations.cryostat[\"REALM\"] == \"%s\"", + callback, instanceId, realm); } private boolean isOkStatus(ClassicHttpResponse res) { diff --git a/src/test/java/io/cryostat/agent/CryostatClientTest.java b/src/test/java/io/cryostat/agent/CryostatClientTest.java new file mode 100644 index 00000000..65692765 --- /dev/null +++ b/src/test/java/io/cryostat/agent/CryostatClientTest.java @@ -0,0 +1,263 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.agent; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.net.URI; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import io.cryostat.agent.CryostatClient.DiscoveryPublication; +import io.cryostat.agent.WebServer.Credentials; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hc.client5.http.classic.HttpClient; +import org.apache.hc.client5.http.classic.methods.HttpPost; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.message.BasicHeader; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class CryostatClientTest { + + @Mock Executor executor; + @Mock ObjectMapper mapper; + @Mock HttpClient http; + @Mock ClassicHttpResponse checkResponse; + @Mock ClassicHttpResponse submitResponse; + @Mock HttpEntity checkEntity; + @Mock HttpEntity submitEntity; + @Mock Credentials credentials; + + private CryostatClient client; + private static final String INSTANCE_ID = "test-instance-123"; + private static final String JVM_ID = "test-jvm-456"; + private static final String APP_NAME = "test-app"; + private static final URI BASE_URI = URI.create("http://cryostat.example.com:8181"); + private static final String REALM = "test-realm"; + + @BeforeEach + void setup() { + DiscoveryPublication discoveryPublication = + new DiscoveryPublication("MERGE", java.util.Map.of()); + client = + new CryostatClient( + executor, + mapper, + http, + INSTANCE_ID, + JVM_ID, + APP_NAME, + BASE_URI, + REALM, + discoveryPublication); + + doAnswer( + invocation -> { + Runnable r = invocation.getArgument(0); + r.run(); + return CompletableFuture.completedFuture(null); + }) + .when(executor) + .execute(any(Runnable.class)); + } + + @Test + void testSubmitCredentialsIncludesRealmInMatchExpression() throws Exception { + URI callback = URI.create("http://agent.example.com:9977"); + + when(http.execute(any(HttpHost.class), any(HttpPost.class))) + .thenReturn(checkResponse, submitResponse); + lenient().when(checkResponse.getCode()).thenReturn(404); + lenient().when(checkResponse.getEntity()).thenReturn(checkEntity); + lenient() + .when(checkEntity.getContent()) + .thenReturn(new ByteArrayInputStream("{}".getBytes())); + + lenient().when(submitResponse.getCode()).thenReturn(201); + lenient().when(submitResponse.getEntity()).thenReturn(submitEntity); + lenient() + .when(submitResponse.getFirstHeader("Location")) + .thenReturn(new BasicHeader("Location", "/api/v4/credentials/42")); + lenient() + .when(submitEntity.getContent()) + .thenReturn(new ByteArrayInputStream("".getBytes())); + + lenient().when(credentials.user()).thenReturn("testuser"); + lenient().when(credentials.pass()).thenReturn("testpass".getBytes()); + + client.submitCredentialsIfRequired(-1, credentials, callback).get(); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpPost.class); + verify(http, atLeastOnce()).execute(any(HttpHost.class), requestCaptor.capture()); + + boolean foundCredentialSubmission = false; + for (HttpPost capturedRequest : requestCaptor.getAllValues()) { + if (capturedRequest.getUri().getPath().contains("/api/v4/credentials") + && !capturedRequest.getUri().getPath().contains("credential_exists")) { + HttpEntity requestEntity = capturedRequest.getEntity(); + assertNotNull(requestEntity, "Request entity should not be null"); + + String entityContent = new String(requestEntity.getContent().readAllBytes()); + + assertTrue( + entityContent.contains("target.connectUrl"), + "Match expression should contain connectUrl clause"); + assertTrue( + entityContent.contains(callback.toString()), + "Match expression should contain callback URL"); + assertTrue( + entityContent.contains("target.annotations.platform[\"INSTANCE_ID\"]"), + "Match expression should contain INSTANCE_ID clause"); + assertTrue( + entityContent.contains(INSTANCE_ID), + "Match expression should contain instance ID value"); + assertTrue( + entityContent.contains("target.annotations.cryostat[\"REALM\"]"), + "Match expression should contain REALM clause"); + assertTrue( + entityContent.contains(REALM), + "Match expression should contain realm value"); + foundCredentialSubmission = true; + break; + } + } + + assertTrue(foundCredentialSubmission, "Should have found credential submission request"); + } + + @Test + void testSubmitCredentialsWithDifferentRealm() throws Exception { + String differentRealm = "production-realm"; + DiscoveryPublication discoveryPublication = + new DiscoveryPublication("MERGE", java.util.Map.of()); + CryostatClient clientWithDifferentRealm = + new CryostatClient( + executor, + mapper, + http, + INSTANCE_ID, + JVM_ID, + APP_NAME, + BASE_URI, + differentRealm, + discoveryPublication); + + URI callback = URI.create("http://agent.example.com:9977"); + + when(http.execute(any(HttpHost.class), any(HttpPost.class))) + .thenReturn(checkResponse, submitResponse); + lenient().when(checkResponse.getCode()).thenReturn(404); + lenient().when(checkResponse.getEntity()).thenReturn(checkEntity); + lenient() + .when(checkEntity.getContent()) + .thenReturn(new ByteArrayInputStream("{}".getBytes())); + + lenient().when(submitResponse.getCode()).thenReturn(201); + lenient().when(submitResponse.getEntity()).thenReturn(submitEntity); + lenient() + .when(submitResponse.getFirstHeader("Location")) + .thenReturn(new BasicHeader("Location", "/api/v4/credentials/99")); + lenient() + .when(submitEntity.getContent()) + .thenReturn(new ByteArrayInputStream("".getBytes())); + + lenient().when(credentials.user()).thenReturn("testuser"); + lenient().when(credentials.pass()).thenReturn("testpass".getBytes()); + + clientWithDifferentRealm.submitCredentialsIfRequired(-1, credentials, callback).get(); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpPost.class); + verify(http, atLeastOnce()).execute(any(HttpHost.class), requestCaptor.capture()); + + boolean foundCredentialSubmission = false; + for (HttpPost capturedRequest : requestCaptor.getAllValues()) { + if (capturedRequest.getUri().getPath().contains("/api/v4/credentials") + && !capturedRequest.getUri().getPath().contains("credential_exists")) { + HttpEntity requestEntity = capturedRequest.getEntity(); + String entityContent = new String(requestEntity.getContent().readAllBytes()); + + assertTrue( + entityContent.contains(differentRealm), + "Match expression should contain the different realm value"); + assertFalse( + entityContent.contains(REALM), + "Match expression should not contain the original realm value"); + foundCredentialSubmission = true; + break; + } + } + + assertTrue(foundCredentialSubmission, "Should have found credential submission request"); + } + + @Test + void testRealmIsolationInMatchExpression() throws Exception { + URI callback = URI.create("http://agent.example.com:9977"); + + when(http.execute(any(HttpHost.class), any(HttpPost.class))).thenReturn(checkResponse); + when(checkResponse.getCode()).thenReturn(200); + when(checkResponse.getEntity()).thenReturn(checkEntity); + when(checkEntity.getContent()) + .thenReturn(new ByteArrayInputStream("{\"id\": 42}".getBytes())); + when(mapper.readValue(any(InputStream.class), eq(CryostatClient.StoredCredential.class))) + .thenReturn(createStoredCredential(42)); + + client.submitCredentialsIfRequired(-1, credentials, callback).get(); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpPost.class); + verify(http, atLeastOnce()).execute(any(HttpHost.class), requestCaptor.capture()); + + boolean foundCredentialCheck = false; + for (HttpPost capturedRequest : requestCaptor.getAllValues()) { + if (capturedRequest + .getUri() + .getPath() + .contains("/api/beta/discovery/credential_exists")) { + HttpEntity requestEntity = capturedRequest.getEntity(); + String entityContent = new String(requestEntity.getContent().readAllBytes()); + + assertTrue( + entityContent.contains("target.annotations.cryostat[\"REALM\"]"), + "Match expression should include realm clause to prevent cross-realm" + + " credential reuse"); + foundCredentialCheck = true; + break; + } + } + + assertTrue(foundCredentialCheck, "Should have found credential check request"); + } + + private CryostatClient.StoredCredential createStoredCredential(int id) { + CryostatClient.StoredCredential credential = new CryostatClient.StoredCredential(); + credential.id = id; + credential.matchExpression = "test-expression"; + return credential; + } +} From 12d28339865985b66afdd52241099f96f00b65f7 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 22 Apr 2026 14:25:49 -0400 Subject: [PATCH 3/7] fix(registration) avoid blocking .get() calls on Futures --- .../io/cryostat/agent/CryostatClient.java | 75 +++-- .../java/io/cryostat/agent/Registration.java | 273 ++++++++++-------- 2 files changed, 195 insertions(+), 153 deletions(-) diff --git a/src/main/java/io/cryostat/agent/CryostatClient.java b/src/main/java/io/cryostat/agent/CryostatClient.java index c402b115..e4f9f075 100644 --- a/src/main/java/io/cryostat/agent/CryostatClient.java +++ b/src/main/java/io/cryostat/agent/CryostatClient.java @@ -33,7 +33,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.function.Function; @@ -165,11 +164,15 @@ public CompletableFuture register( throw new CompletionException(t); } if (!isOkStatus(res)) { - try { - deleteCredentials(credentialId).get(); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to delete previous credentials", e); - } + deleteCredentials(credentialId) + .exceptionally( + e -> { + log.error( + "Failed to delete previous" + + " credentials", + e); + return null; + }); } return assertOkStatus(req, res); }) @@ -287,28 +290,48 @@ private CompletableFuture submitCredentials( log.trace("{}", req); req.setEntity(entityBuilder.build()); return supply(req, (res) -> logResponse(req, res)) - .thenApply( + .thenCompose( res -> { if (!isOkStatus(res)) { - try { - if (res.getCode() == 409) { - int queried = queryExistingCredentials(callback).get(); - if (queried >= 0) { - return queried; - } - } - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to query for existing credentials", e); - } - try { - deleteCredentials(prevId).get(); - } catch (InterruptedException | ExecutionException e) { - log.error( - "Failed to delete previous credentials with id " - + prevId, - e); - throw new RegistrationException(e); + if (res.getCode() == 409) { + return queryExistingCredentials(callback) + .thenCompose( + queried -> { + if (queried >= 0) { + return CompletableFuture + .completedFuture(queried); + } + return deleteCredentials(prevId) + .handle( + (v, t) -> { + if (t != null) { + log.error( + "Failed to" + + " delete" + + " previous" + + " credentials" + + " with" + + " id " + + prevId, + t); + } + throw new RegistrationException( + new IllegalStateException( + "Credential" + + " conflict")); + }); + }); } + deleteCredentials(prevId) + .exceptionally( + e -> { + log.error( + "Failed to delete previous credentials" + + " with id " + + prevId, + e); + return null; + }); } String location = assertOkStatus(req, res) @@ -317,7 +340,7 @@ private CompletableFuture submitCredentials( String id = location.substring( location.lastIndexOf('/') + 1, location.length()); - return Integer.valueOf(id); + return CompletableFuture.completedFuture(Integer.valueOf(id)); }) .whenComplete((v, t) -> req.reset()); } diff --git a/src/main/java/io/cryostat/agent/Registration.java b/src/main/java/io/cryostat/agent/Registration.java index 11a329f8..441bfeed 100644 --- a/src/main/java/io/cryostat/agent/Registration.java +++ b/src/main/java/io/cryostat/agent/Registration.java @@ -27,8 +27,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -84,6 +82,8 @@ public class Registration { private volatile CircuitState circuitState = CircuitState.CLOSED; private volatile Instant circuitOpenedAt = null; + private volatile CompletableFuture currentRegistration = null; + private enum CircuitState { CLOSED, OPEN, @@ -186,29 +186,29 @@ void start() { this.registrationCheckTask = executor.scheduleAtFixedRate( () -> { - try { - cryostat.checkRegistration(pluginInfo) - .handle( - (v, t) -> { - if (t != null - || !Boolean.TRUE - .equals( - v)) { - this.pluginInfo.clear(); - notify( - RegistrationEvent - .State - .UNREGISTERED); - } - return null; - }) - .get(); - } catch (ExecutionException - | InterruptedException e) { - log.error( - "Could not check registration status", - e); - } + cryostat.checkRegistration(pluginInfo) + .handle( + (v, t) -> { + if (t != null + || !Boolean.TRUE.equals( + v)) { + this.pluginInfo.clear(); + notify( + RegistrationEvent + .State + .UNREGISTERED); + } + return null; + }) + .exceptionally( + e -> { + log.error( + "Could not check" + + " registration" + + " status", + e); + return null; + }); }, registrationCheckMs, registrationCheckMs, @@ -234,6 +234,11 @@ public void addRegistrationListener(Consumer listener) { } void tryRegister() { + if (currentRegistration != null && !currentRegistration.isDone()) { + log.warn("Cancelling previous registration attempt"); + currentRegistration.cancel(true); + } + if (circuitState == CircuitState.OPEN) { if (Duration.between(circuitOpenedAt, Instant.now()) .compareTo(circuitBreakerOpenDuration) @@ -255,92 +260,102 @@ void tryRegister() { notify(RegistrationEvent.State.UNREGISTERED); return; } - try { - cryostat.serverHealth() - .thenAccept( - health -> { - Semver cryostatSemver = health.cryostatSemver(); - log.debug( - "Connected to Cryostat server: version {} , build {}", - cryostatSemver, - health.buildInfo().git().hash()); - try { - VersionInfo version = VersionInfo.load(); - if (!version.validateServerVersion(cryostatSemver)) { - log.warn( - "Cryostat server version {} is outside of expected" - + " range [{}, {})", - cryostatSemver, - version.getServerMin(), - version.getServerMax()); - } - } catch (IOException ioe) { - log.error("Unable to read versions.properties file", ioe); - } - }) - .get(); - URI credentialedCallback = - new URIBuilder(callback) - .setUserInfo("storedcredentials", String.valueOf(credentialId)) - .build(); - CompletableFuture f = - cryostat.register(credentialId, pluginInfo, credentialedCallback) - .handle( - (plugin, t) -> { - if (plugin != null) { - boolean previouslyRegistered = - this.pluginInfo.isInitialized(); - this.pluginInfo.copyFrom(plugin); - log.debug("Registered as {}", this.pluginInfo.getId()); - if (previouslyRegistered) { - notify(RegistrationEvent.State.REFRESHED); - } else { - notify(RegistrationEvent.State.REGISTERED); - tryUpdate(); - } - } else if (t != null) { - this.webServer.resetCredentialId(); - this.pluginInfo.clear(); - throw new RegistrationException(t); - } - return (Void) null; - }); - f.get(); - - if (circuitState == CircuitState.HALF_OPEN) { - log.info("Circuit breaker transitioning to CLOSED"); - } - circuitState = CircuitState.CLOSED; - consecutiveFailures = 0; - - } catch (URISyntaxException | ExecutionException | InterruptedException e) { - long backoffMs = calculateBackoffMs(); - consecutiveFailures++; - - if (circuitState == CircuitState.CLOSED - && consecutiveFailures >= circuitBreakerThreshold) { - log.warn( - "Circuit breaker transitioning to OPEN after {} failures", - consecutiveFailures); - circuitState = CircuitState.OPEN; - circuitOpenedAt = Instant.now(); - } else if (circuitState == CircuitState.HALF_OPEN) { - log.warn("Circuit breaker transitioning back to OPEN"); - circuitState = CircuitState.OPEN; - circuitOpenedAt = Instant.now(); - } + currentRegistration = + cryostat.serverHealth() + .thenCompose( + health -> { + Semver cryostatSemver = health.cryostatSemver(); + log.debug( + "Connected to Cryostat server: version {} , build {}", + cryostatSemver, + health.buildInfo().git().hash()); + try { + VersionInfo version = VersionInfo.load(); + if (!version.validateServerVersion(cryostatSemver)) { + log.warn( + "Cryostat server version {} is outside of" + + " expected range [{}, {})", + cryostatSemver, + version.getServerMin(), + version.getServerMax()); + } + } catch (IOException ioe) { + log.error("Unable to read versions.properties file", ioe); + } - log.error( - "Registration failure (attempt {}, circuit state: {}, retry in {}ms)", - consecutiveFailures, - circuitState, - backoffMs, - e); - log.trace("Registration retry period: {}", Duration.ofMillis(backoffMs)); + try { + URI credentialedCallback = + new URIBuilder(callback) + .setUserInfo( + "storedcredentials", + String.valueOf(credentialId)) + .build(); + return cryostat.register( + credentialId, pluginInfo, credentialedCallback); + } catch (URISyntaxException e) { + return CompletableFuture.failedFuture(e); + } + }) + .handle( + (plugin, t) -> { + if (plugin != null) { + boolean previouslyRegistered = + this.pluginInfo.isInitialized(); + this.pluginInfo.copyFrom(plugin); + log.debug("Registered as {}", this.pluginInfo.getId()); + + if (circuitState == CircuitState.HALF_OPEN) { + log.info("Circuit breaker transitioning to CLOSED"); + } + circuitState = CircuitState.CLOSED; + consecutiveFailures = 0; + + if (previouslyRegistered) { + notify(RegistrationEvent.State.REFRESHED); + } else { + notify(RegistrationEvent.State.REGISTERED); + tryUpdate(); + } + } else if (t != null) { + this.webServer.resetCredentialId(); + this.pluginInfo.clear(); + + long backoffMs = calculateBackoffMs(); + consecutiveFailures++; + + if (circuitState == CircuitState.CLOSED + && consecutiveFailures >= circuitBreakerThreshold) { + log.warn( + "Circuit breaker transitioning to OPEN after {}" + + " failures", + consecutiveFailures); + circuitState = CircuitState.OPEN; + circuitOpenedAt = Instant.now(); + } else if (circuitState == CircuitState.HALF_OPEN) { + log.warn("Circuit breaker transitioning back to OPEN"); + circuitState = CircuitState.OPEN; + circuitOpenedAt = Instant.now(); + } - executor.schedule(this::tryRegister, backoffMs, TimeUnit.MILLISECONDS); - } + log.error( + "Registration failure (attempt {}, circuit state:" + + " {}, retry in {}ms)", + consecutiveFailures, + circuitState, + backoffMs, + t); + log.trace( + "Registration retry period: {}", + Duration.ofMillis(backoffMs)); + + executor.schedule( + this::tryRegister, + backoffMs, + TimeUnit.MILLISECONDS); + } + return (Void) null; + }); } private long calculateBackoffMs() { @@ -381,24 +396,23 @@ private void tryUpdate() { selfNodes.stream() .map(n -> n.getTarget().getConnectUrl()) .collect(Collectors.toList())); - Future f = - cryostat.update(pluginInfo, selfNodes) - .handle( - (n, t) -> { - if (t != null) { - log.error("Update failure", t); - deregister(); - } else { - log.trace("Publish success"); - notify(RegistrationEvent.State.PUBLISHED); - } - return (Void) null; - }); - try { - f.get(); - } catch (ExecutionException | InterruptedException e) { - log.error("Failed to update", e); - } + cryostat.update(pluginInfo, selfNodes) + .handle( + (n, t) -> { + if (t != null) { + log.error("Update failure", t); + deregister(); + } else { + log.trace("Publish success"); + notify(RegistrationEvent.State.PUBLISHED); + } + return (Void) null; + }) + .exceptionally( + e -> { + log.error("Failed to update", e); + return null; + }); } private Set defineSelf() throws UnknownHostException, URISyntaxException { @@ -465,7 +479,12 @@ private Set defineSelf() throws UnknownHostException, URISyntaxEx return discoveryNodes; } - void stop() {} + void stop() { + if (currentRegistration != null && !currentRegistration.isDone()) { + log.info("Cancelling in-flight registration"); + currentRegistration.cancel(true); + } + } CompletableFuture deregister() { if (!this.pluginInfo.isInitialized()) { From 5d49de6be52af6306e2c3dbeaac64589d3b9a9a1 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 22 Apr 2026 14:28:32 -0400 Subject: [PATCH 4/7] use AtomicInteger for circuit-breaker --- .../java/io/cryostat/agent/Registration.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/cryostat/agent/Registration.java b/src/main/java/io/cryostat/agent/Registration.java index 441bfeed..f65b1978 100644 --- a/src/main/java/io/cryostat/agent/Registration.java +++ b/src/main/java/io/cryostat/agent/Registration.java @@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -77,7 +78,7 @@ public class Registration { private volatile URI callback; private ScheduledFuture registrationCheckTask; - private volatile int consecutiveFailures = 0; + private final AtomicInteger consecutiveFailures = new AtomicInteger(0); private static final double JITTER_FACTOR = 0.1; private volatile CircuitState circuitState = CircuitState.CLOSED; private volatile Instant circuitOpenedAt = null; @@ -309,7 +310,7 @@ void tryRegister() { log.info("Circuit breaker transitioning to CLOSED"); } circuitState = CircuitState.CLOSED; - consecutiveFailures = 0; + consecutiveFailures.set(0); if (previouslyRegistered) { notify(RegistrationEvent.State.REFRESHED); @@ -321,15 +322,15 @@ void tryRegister() { this.webServer.resetCredentialId(); this.pluginInfo.clear(); + int failures = consecutiveFailures.incrementAndGet(); long backoffMs = calculateBackoffMs(); - consecutiveFailures++; if (circuitState == CircuitState.CLOSED - && consecutiveFailures >= circuitBreakerThreshold) { + && failures >= circuitBreakerThreshold) { log.warn( "Circuit breaker transitioning to OPEN after {}" + " failures", - consecutiveFailures); + failures); circuitState = CircuitState.OPEN; circuitOpenedAt = Instant.now(); } else if (circuitState == CircuitState.HALF_OPEN) { @@ -341,7 +342,7 @@ void tryRegister() { log.error( "Registration failure (attempt {}, circuit state:" + " {}, retry in {}ms)", - consecutiveFailures, + failures, circuitState, backoffMs, t); @@ -359,15 +360,14 @@ void tryRegister() { } private long calculateBackoffMs() { - if (consecutiveFailures == 0) { + int failures = consecutiveFailures.get(); + if (failures == 0) { return registrationRetryMs; } // Exponential backoff: base * (multiplier ^ failures) long backoff = - (long) - (registrationRetryMs - * Math.pow(backoffMultiplier, Math.min(consecutiveFailures, 10))); + (long) (registrationRetryMs * Math.pow(backoffMultiplier, Math.min(failures, 10))); // Cap at maximum backoff = Math.min(backoff, maxBackoffMs); From 78f3c1835f26bbfe296d8c9395aa0595148b0b4f Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 22 Apr 2026 14:30:58 -0400 Subject: [PATCH 5/7] fixup! use AtomicInteger for circuit-breaker --- src/main/java/io/cryostat/agent/Registration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/cryostat/agent/Registration.java b/src/main/java/io/cryostat/agent/Registration.java index f65b1978..591f7542 100644 --- a/src/main/java/io/cryostat/agent/Registration.java +++ b/src/main/java/io/cryostat/agent/Registration.java @@ -322,8 +322,8 @@ void tryRegister() { this.webServer.resetCredentialId(); this.pluginInfo.clear(); - int failures = consecutiveFailures.incrementAndGet(); long backoffMs = calculateBackoffMs(); + int failures = consecutiveFailures.incrementAndGet(); if (circuitState == CircuitState.CLOSED && failures >= circuitBreakerThreshold) { From 52d06020e8f1d7f8d2c0e34e2979d5fb589355af Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 23 Apr 2026 15:45:39 -0400 Subject: [PATCH 6/7] fix(registration): cooldown, backoff --- README.md | 8 +- src/main/java/io/cryostat/agent/Agent.java | 60 ++++++- .../java/io/cryostat/agent/ConfigModule.java | 29 ++++ .../cryostat/agent/CredentialCleanupJob.java | 146 ++++++++++++++++++ .../io/cryostat/agent/CredentialTracker.java | 67 ++++++++ .../io/cryostat/agent/CryostatClient.java | 32 ++-- .../java/io/cryostat/agent/MainModule.java | 23 +++ .../java/io/cryostat/agent/Registration.java | 139 ++++++++++++++--- .../META-INF/microprofile-config.properties | 4 + .../agent/CredentialCleanupJobTest.java | 143 +++++++++++++++++ .../cryostat/agent/CredentialTrackerTest.java | 113 ++++++++++++++ .../io/cryostat/agent/CryostatClientTest.java | 3 + .../io/cryostat/agent/RegistrationTest.java | 3 + 13 files changed, 726 insertions(+), 44 deletions(-) create mode 100644 src/main/java/io/cryostat/agent/CredentialCleanupJob.java create mode 100644 src/main/java/io/cryostat/agent/CredentialTracker.java create mode 100644 src/test/java/io/cryostat/agent/CredentialCleanupJobTest.java create mode 100644 src/test/java/io/cryostat/agent/CredentialTrackerTest.java diff --git a/README.md b/README.md index d64ccf6e..8407d839 100644 --- a/README.md +++ b/README.md @@ -246,7 +246,13 @@ and how it advertises itself to a Cryostat server instance. Properties that requ - [ ] `cryostat.agent.webserver.credentials.pass.length` [`int`]: the length of the generated password used for `Basic` authorization on the embedded webserver. Default `24`. - [ ] `cryostat.agent.webserver.credentials.pass.hash-function` [`String`]: the name of the hash function to use when generating passwords. Default `SHA-256`. - [ ] `cryostat.agent.app.jmx.port` [`int`]: the JMX RMI port that the application is listening on. The default is to attempt to determine this from the `com.sun.management.jmxremote.port` system property. -- [ ] `cryostat.agent.registration.retry-ms` [`long`]: the duration in milliseconds between attempts to register with the Cryostat server. Default `5000`. +- [ ] `cryostat.agent.registration.retry-ms` [`long`]: the duration in milliseconds between attempts to register with the Cryostat server. This is the base value used for exponential backoff calculations. Default `15000`. +- [ ] `cryostat.agent.registration.max-backoff-ms` [`long`]: the maximum duration in milliseconds for exponential backoff between registration retry attempts. Default `300000` (5 minutes). +- [ ] `cryostat.agent.registration.backoff-multiplier` [`double`]: the multiplier used for exponential backoff calculations. Each consecutive failure multiplies the retry delay by this factor. Default `2.0`. +- [ ] `cryostat.agent.registration.circuit-breaker-threshold` [`int`]: the number of consecutive registration failures before the circuit breaker opens. Default `10`. +- [ ] `cryostat.agent.registration.circuit-breaker-duration` [`Duration`]: the duration the circuit breaker remains open before attempting to close. Default `PT5M` (5 minutes). +- [ ] `cryostat.agent.registration.min-cooldown-ms` [`long`]: the minimum cooldown period in milliseconds after registration failures. This ensures the cooldown always exceeds Cryostat's plugin ping interval, allowing the server to detect and clean up stale registrations. Default `300000` (5 minutes). +- [ ] `cryostat.agent.registration.check-ms` [`long`]: the duration in milliseconds between checks to verify the Agent's registration status with the Cryostat server. Default `300000` (5 minutes). - [ ] `cryostat.agent.registration.jmx.ignore` [`boolean`]: if the Agent detects that the host JVM has its JMX server enabled, then setting this property to `true` will cause the Agent to ignore the JMX server and not publish a JMX Service URL after registering with the Cryostat server. Default `false`. - [ ] `cryostat.agent.registration.jmx.use-callback-host` [`boolean`]: if the Agent should use the host part of the callback URL when constructing the JMX Service URL for registration. If set to `false` then the URL will contain the automatically detected hostname instead. Default `true`. - [ ] `cryostat.agent.exit.signals` [`[String]`]: a comma-separated list of signals that the agent should handle. When any of these signals is caught the agent initiates an orderly shutdown, deregistering from the Cryostat server and potentially uploading the latest harvested JFR data. Default `INT,TERM`. diff --git a/src/main/java/io/cryostat/agent/Agent.java b/src/main/java/io/cryostat/agent/Agent.java index 6ab64add..6a101351 100644 --- a/src/main/java/io/cryostat/agent/Agent.java +++ b/src/main/java/io/cryostat/agent/Agent.java @@ -23,13 +23,16 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import java.util.stream.Collectors; import javax.inject.Named; import javax.inject.Singleton; @@ -226,6 +229,9 @@ public void accept(AgentArgs args) { Registration registration = client.registration(); Harvester harvester = client.harvester(); WebServer webServer = client.webServer(); + CredentialCleanupJob credentialCleanupJob = client.credentialCleanupJob(); + CredentialTracker credentialTracker = client.credentialTracker(); + CryostatClient cryostatClient = client.cryostatClient(); ExecutorService executor = client.executor(); List exitSignals = client.exitSignals(); long exitDeregistrationTimeout = client.exitDeregistrationTimeout(); @@ -237,6 +243,9 @@ public void accept(AgentArgs args) { registration, harvester, webServer, + credentialCleanupJob, + credentialTracker, + cryostatClient, executor, exitDeregistrationTimeout); final AgentExitHandler fHandler = agentExitHandler; @@ -272,6 +281,7 @@ public void accept(AgentArgs args) { }); webServer.start(); registration.start(); + credentialCleanupJob.start(); client.triggerEvaluator().start(args.getSmartTriggers()); log.trace("Startup complete"); } catch (Exception e) { @@ -288,11 +298,21 @@ private static AgentExitHandler installSignalHandlers( Registration registration, Harvester harvester, WebServer webServer, + CredentialCleanupJob credentialCleanupJob, + CredentialTracker credentialTracker, + CryostatClient cryostatClient, ExecutorService executor, long exitDeregistrationTimeout) { AgentExitHandler agentExitHandler = new AgentExitHandler( - registration, harvester, webServer, executor, exitDeregistrationTimeout); + registration, + harvester, + webServer, + credentialCleanupJob, + credentialTracker, + cryostatClient, + executor, + exitDeregistrationTimeout); for (String s : exitSignals) { Signal signal = new Signal(s); try { @@ -344,6 +364,12 @@ interface Client { TriggerEvaluator triggerEvaluator(); + CredentialCleanupJob credentialCleanupJob(); + + CredentialTracker credentialTracker(); + + CryostatClient cryostatClient(); + ScheduledExecutorService executor(); @Named(ConfigModule.CRYOSTAT_AGENT_EXIT_SIGNALS) @@ -366,6 +392,9 @@ private static class AgentExitHandler implements SignalHandler { private final Registration registration; private final Harvester harvester; private final WebServer webServer; + private final CredentialCleanupJob credentialCleanupJob; + private final CredentialTracker credentialTracker; + private final CryostatClient cryostatClient; private final ExecutorService executor; private final long exitDeregistrationTimeout; @@ -373,11 +402,17 @@ private AgentExitHandler( Registration registration, Harvester harvester, WebServer webServer, + CredentialCleanupJob credentialCleanupJob, + CredentialTracker credentialTracker, + CryostatClient cryostatClient, ExecutorService executor, long exitDeregistrationTimeout) { this.registration = Objects.requireNonNull(registration); this.harvester = Objects.requireNonNull(harvester); this.webServer = Objects.requireNonNull(webServer); + this.credentialCleanupJob = Objects.requireNonNull(credentialCleanupJob); + this.credentialTracker = Objects.requireNonNull(credentialTracker); + this.cryostatClient = Objects.requireNonNull(cryostatClient); this.executor = Objects.requireNonNull(executor); this.exitDeregistrationTimeout = exitDeregistrationTimeout; } @@ -413,10 +448,33 @@ void performCleanup(Signal sig) { if (t != null) { log.warn("Exception during deregistration", t); } + + try { + Set orphaned = + credentialTracker.getOrphanedCredentials(); + if (!orphaned.isEmpty()) { + log.debug( + "Cleaning up {} credentials on shutdown", + orphaned.size()); + List> deletions = + orphaned.stream() + .map(cryostatClient::deleteCredentials) + .collect(Collectors.toList()); + CompletableFuture.allOf( + deletions.toArray( + new CompletableFuture[0])) + .get(30, TimeUnit.SECONDS); + log.debug("Cleaned up all credentials"); + } + } catch (Exception e) { + log.error("Error during credential cleanup on shutdown", e); + } + try { log.debug("Shutting down..."); safeCall(webServer::stop); safeCall(registration::stop); + safeCall(credentialCleanupJob::stop); safeCall(executor::shutdown); } finally { log.debug("Shutdown complete"); diff --git a/src/main/java/io/cryostat/agent/ConfigModule.java b/src/main/java/io/cryostat/agent/ConfigModule.java index 495f9c55..d87d177d 100644 --- a/src/main/java/io/cryostat/agent/ConfigModule.java +++ b/src/main/java/io/cryostat/agent/ConfigModule.java @@ -214,6 +214,8 @@ public abstract class ConfigModule { "cryostat.agent.registration.circuit-breaker-threshold"; public static final String CRYOSTAT_AGENT_REGISTRATION_CIRCUIT_BREAKER_DURATION = "cryostat.agent.registration.circuit-breaker-duration"; + public static final String CRYOSTAT_AGENT_REGISTRATION_MIN_COOLDOWN_MS = + "cryostat.agent.registration.min-cooldown-ms"; public static final String CRYOSTAT_AGENT_EXIT_SIGNALS = "cryostat.agent.exit.signals"; public static final String CRYOSTAT_AGENT_EXIT_DEREGISTRATION_TIMEOUT_MS = "cryostat.agent.exit.deregistration.timeout-ms"; @@ -264,6 +266,11 @@ public abstract class ConfigModule { public static final String CRYOSTAT_AGENT_FLEET_SAMPLING_RATIO = "cryostat.agent.fleet-sampling-ratio"; + public static final String CRYOSTAT_AGENT_CREDENTIAL_CLEANUP_INTERVAL = + "cryostat.agent.credential.cleanup.interval"; + public static final String CRYOSTAT_AGENT_CREDENTIAL_CLEANUP_MAX_RETRIES = + "cryostat.agent.credential.cleanup.max-retries"; + @Provides @Singleton public static SmallRyeConfig provideConfig() { @@ -992,6 +999,14 @@ public static Duration provideCryostatAgentRegistrationCircuitBreakerDuration( return Duration.parse(durationStr); } + @Provides + @Singleton + @Named(CRYOSTAT_AGENT_REGISTRATION_MIN_COOLDOWN_MS) + public static Duration provideCryostatAgentRegistrationMinCooldownMs(SmallRyeConfig config) { + int cooldownMs = config.getValue(CRYOSTAT_AGENT_REGISTRATION_MIN_COOLDOWN_MS, int.class); + return Duration.ofMillis(cooldownMs); + } + @Provides @Singleton @Named(CRYOSTAT_AGENT_HARVESTER_PERIOD_MS) @@ -1138,6 +1153,20 @@ public static double provideCryostatAgentFleetSamplingRatio(SmallRyeConfig confi return config.getValue(CRYOSTAT_AGENT_FLEET_SAMPLING_RATIO, double.class); } + @Provides + @Singleton + @Named(CRYOSTAT_AGENT_CREDENTIAL_CLEANUP_INTERVAL) + public static Duration provideCryostatAgentCredentialCleanupInterval(SmallRyeConfig config) { + return config.getValue(CRYOSTAT_AGENT_CREDENTIAL_CLEANUP_INTERVAL, Duration.class); + } + + @Provides + @Singleton + @Named(CRYOSTAT_AGENT_CREDENTIAL_CLEANUP_MAX_RETRIES) + public static int provideCryostatAgentCredentialCleanupMaxRetries(SmallRyeConfig config) { + return config.getValue(CRYOSTAT_AGENT_CREDENTIAL_CLEANUP_MAX_RETRIES, int.class); + } + public enum URIRange { LOOPBACK(u -> check(u, u2 -> true, InetAddress::isLoopbackAddress)), LINK_LOCAL( diff --git a/src/main/java/io/cryostat/agent/CredentialCleanupJob.java b/src/main/java/io/cryostat/agent/CredentialCleanupJob.java new file mode 100644 index 00000000..b97989b2 --- /dev/null +++ b/src/main/java/io/cryostat/agent/CredentialCleanupJob.java @@ -0,0 +1,146 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.agent; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import javax.inject.Named; +import javax.inject.Singleton; + +import dagger.Lazy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Singleton +public class CredentialCleanupJob { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledExecutorService executor; + private final CredentialTracker tracker; + private final Lazy cryostat; + private final Duration cleanupInterval; + private final int maxRetries; + + private final Map retryCount = new ConcurrentHashMap<>(); + private ScheduledFuture cleanupTask; + + CredentialCleanupJob( + ScheduledExecutorService executor, + CredentialTracker tracker, + Lazy cryostat, + @Named(ConfigModule.CRYOSTAT_AGENT_CREDENTIAL_CLEANUP_INTERVAL) + Duration cleanupInterval, + @Named(ConfigModule.CRYOSTAT_AGENT_CREDENTIAL_CLEANUP_MAX_RETRIES) int maxRetries) { + this.executor = executor; + this.tracker = tracker; + this.cryostat = cryostat; + this.cleanupInterval = cleanupInterval; + this.maxRetries = maxRetries; + } + + void start() { + if (cleanupTask != null) { + log.warn("Cleanup job already started"); + return; + } + long intervalMs = cleanupInterval.toMillis(); + cleanupTask = + executor.scheduleAtFixedRate( + this::cleanupOrphanedCredentials, + intervalMs, + intervalMs, + TimeUnit.MILLISECONDS); + log.debug("Credential cleanup job started with interval: {}", cleanupInterval); + } + + void stop() { + if (cleanupTask != null) { + cleanupTask.cancel(false); + cleanupTask = null; + log.debug("Credential cleanup job stopped"); + } + } + + void cleanupOrphanedCredentials() { + Queue pending = tracker.getPendingDeletion(); + + if (pending.isEmpty()) { + return; + } + + log.debug("Cleaning up {} orphaned credentials", pending.size()); + + List> deletions = new ArrayList<>(); + + while (!pending.isEmpty()) { + Integer credentialId = pending.poll(); + int attempts = retryCount.getOrDefault(credentialId, 0); + + if (attempts >= maxRetries) { + log.error( + "Failed to delete credential {} after {} attempts, giving up", + credentialId, + attempts); + retryCount.remove(credentialId); + tracker.trackDeleted(credentialId); + continue; + } + + CompletableFuture deletion = + cryostat.get() + .deleteCredentials(credentialId) + .handle( + (v, t) -> { + if (t != null) { + log.warn( + "Failed to delete credential {} (attempt {})", + credentialId, + attempts + 1, + t); + retryCount.put(credentialId, attempts + 1); + tracker.markForDeletion(credentialId); + } else { + log.debug( + "Successfully deleted credential {}", + credentialId); + retryCount.remove(credentialId); + tracker.trackDeleted(credentialId); + } + return null; + }); + + deletions.add(deletion); + } + + CompletableFuture.allOf(deletions.toArray(new CompletableFuture[0])) + .exceptionally( + t -> { + log.error("Error during credential cleanup", t); + return null; + }) + .join(); + } +} diff --git a/src/main/java/io/cryostat/agent/CredentialTracker.java b/src/main/java/io/cryostat/agent/CredentialTracker.java new file mode 100644 index 00000000..dc07203f --- /dev/null +++ b/src/main/java/io/cryostat/agent/CredentialTracker.java @@ -0,0 +1,67 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.agent; + +import java.util.HashSet; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +import javax.inject.Singleton; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Singleton +public class CredentialTracker { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final Set createdCredentials = ConcurrentHashMap.newKeySet(); + + private final Queue pendingDeletion = new ConcurrentLinkedQueue<>(); + + public void trackCreated(int credentialId) { + createdCredentials.add(credentialId); + log.debug("Tracking credential: {}", credentialId); + } + + public void trackDeleted(int credentialId) { + createdCredentials.remove(credentialId); + log.debug("Credential deleted: {}", credentialId); + } + + public void markForDeletion(int credentialId) { + if (createdCredentials.contains(credentialId)) { + pendingDeletion.offer(credentialId); + log.debug("Marked credential for deletion: {}", credentialId); + } + } + + public Set getOrphanedCredentials() { + return new HashSet<>(createdCredentials); + } + + public Queue getPendingDeletion() { + return new ConcurrentLinkedQueue<>(pendingDeletion); + } + + public void clear() { + createdCredentials.clear(); + pendingDeletion.clear(); + } +} diff --git a/src/main/java/io/cryostat/agent/CryostatClient.java b/src/main/java/io/cryostat/agent/CryostatClient.java index e4f9f075..185f4f5f 100644 --- a/src/main/java/io/cryostat/agent/CryostatClient.java +++ b/src/main/java/io/cryostat/agent/CryostatClient.java @@ -85,6 +85,7 @@ public class CryostatClient { private final ObjectMapper mapper; private final HttpHost host; private final HttpClient http; + private final CredentialTracker credentialTracker; private final String appName; private final String instanceId; @@ -97,6 +98,7 @@ public class CryostatClient { Executor executor, ObjectMapper mapper, HttpClient http, + CredentialTracker credentialTracker, String instanceId, String jvmId, String appName, @@ -107,6 +109,7 @@ public class CryostatClient { this.mapper = mapper; this.host = HttpHost.create(baseUri); this.http = http; + this.credentialTracker = credentialTracker; this.instanceId = instanceId; this.jvmId = jvmId; this.appName = appName; @@ -161,18 +164,11 @@ public CompletableFuture register( .handle( (res, t) -> { if (t != null) { + credentialTracker.markForDeletion(credentialId); throw new CompletionException(t); } if (!isOkStatus(res)) { - deleteCredentials(credentialId) - .exceptionally( - e -> { - log.error( - "Failed to delete previous" - + " credentials", - e); - return null; - }); + credentialTracker.markForDeletion(credentialId); } return assertOkStatus(req, res); }) @@ -182,11 +178,13 @@ public CompletableFuture register( return mapper.readValue(is, PluginInfo.class); } catch (IOException e) { log.error("Unable to parse response as JSON", e); + credentialTracker.markForDeletion(credentialId); throw new RegistrationException(e); } }) .whenComplete((v, t) -> req.reset()); } catch (JsonProcessingException e) { + credentialTracker.markForDeletion(credentialId); return CompletableFuture.failedFuture(e); } } @@ -301,6 +299,7 @@ private CompletableFuture submitCredentials( return CompletableFuture .completedFuture(queried); } + credentialTracker.markForDeletion(prevId); return deleteCredentials(prevId) .handle( (v, t) -> { @@ -322,16 +321,7 @@ private CompletableFuture submitCredentials( }); }); } - deleteCredentials(prevId) - .exceptionally( - e -> { - log.error( - "Failed to delete previous credentials" - + " with id " - + prevId, - e); - return null; - }); + credentialTracker.markForDeletion(prevId); } String location = assertOkStatus(req, res) @@ -340,7 +330,9 @@ private CompletableFuture submitCredentials( String id = location.substring( location.lastIndexOf('/') + 1, location.length()); - return CompletableFuture.completedFuture(Integer.valueOf(id)); + int credId = Integer.parseInt(id); + credentialTracker.trackCreated(credId); + return CompletableFuture.completedFuture(credId); }) .whenComplete((v, t) -> req.reset()); } diff --git a/src/main/java/io/cryostat/agent/MainModule.java b/src/main/java/io/cryostat/agent/MainModule.java index ac7419ae..5726a940 100644 --- a/src/main/java/io/cryostat/agent/MainModule.java +++ b/src/main/java/io/cryostat/agent/MainModule.java @@ -796,12 +796,31 @@ public static ObjectMapper provideObjectMapper() { .registerModule(new JavaTimeModule()); } + @Provides + @Singleton + public static CredentialTracker provideCredentialTracker() { + return new CredentialTracker(); + } + + @Provides + @Singleton + public static CredentialCleanupJob provideCredentialCleanupJob( + ScheduledExecutorService executor, + CredentialTracker tracker, + Lazy cryostat, + @Named(ConfigModule.CRYOSTAT_AGENT_CREDENTIAL_CLEANUP_INTERVAL) + Duration cleanupInterval, + @Named(ConfigModule.CRYOSTAT_AGENT_CREDENTIAL_CLEANUP_MAX_RETRIES) int maxRetries) { + return new CredentialCleanupJob(executor, tracker, cryostat, cleanupInterval, maxRetries); + } + @Provides @Singleton public static CryostatClient provideCryostatClient( ScheduledExecutorService executor, ObjectMapper objectMapper, HttpClient http, + CredentialTracker credentialTracker, @Named(ConfigModule.CRYOSTAT_AGENT_INSTANCE_ID) String instanceId, @Named(JVM_ID) String jvmId, @Named(ConfigModule.CRYOSTAT_AGENT_APP_NAME) String appName, @@ -814,6 +833,7 @@ public static CryostatClient provideCryostatClient( executor, objectMapper, http, + credentialTracker, instanceId, jvmId, appName, @@ -858,6 +878,8 @@ public static Registration provideRegistration( int circuitBreakerThreshold, @Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_CIRCUIT_BREAKER_DURATION) Duration circuitBreakerOpenDuration, + @Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_MIN_COOLDOWN_MS) + Duration minCooldownDuration, SecureRandom random) { Logger log = LoggerFactory.getLogger(Registration.class); return new Registration( @@ -892,6 +914,7 @@ public static Registration provideRegistration( backoffMultiplier, circuitBreakerThreshold, circuitBreakerOpenDuration, + minCooldownDuration, random); } diff --git a/src/main/java/io/cryostat/agent/Registration.java b/src/main/java/io/cryostat/agent/Registration.java index 591f7542..073fa4fe 100644 --- a/src/main/java/io/cryostat/agent/Registration.java +++ b/src/main/java/io/cryostat/agent/Registration.java @@ -70,6 +70,7 @@ public class Registration { private final double backoffMultiplier; private final int circuitBreakerThreshold; private final Duration circuitBreakerOpenDuration; + private final Duration minCooldownDuration; private final Random random; private final PluginInfo pluginInfo = new PluginInfo(); @@ -85,6 +86,11 @@ public class Registration { private volatile CompletableFuture currentRegistration = null; + private volatile Instant lastSuccessfulRegistration = Instant.EPOCH; + private volatile Instant cooldownUntil = null; + private volatile ScheduledFuture cooldownExitTask = null; + private final Object cooldownLock = new Object(); + private enum CircuitState { CLOSED, OPEN, @@ -111,6 +117,7 @@ private enum CircuitState { double backoffMultiplier, int circuitBreakerThreshold, Duration circuitBreakerOpenDuration, + Duration minCooldownDuration, Random random) { this.executor = executor; this.cryostat = cryostat; @@ -131,6 +138,7 @@ private enum CircuitState { this.backoffMultiplier = backoffMultiplier; this.circuitBreakerThreshold = circuitBreakerThreshold; this.circuitBreakerOpenDuration = circuitBreakerOpenDuration; + this.minCooldownDuration = minCooldownDuration; this.random = random; } @@ -240,11 +248,18 @@ void tryRegister() { currentRegistration.cancel(true); } + if (isInCooldown()) { + Duration remaining = getCooldownRemaining(); + log.debug("In cooldown, retry in {}", remaining); + executor.schedule(this::tryRegister, remaining.toMillis(), TimeUnit.MILLISECONDS); + return; + } + if (circuitState == CircuitState.OPEN) { if (Duration.between(circuitOpenedAt, Instant.now()) .compareTo(circuitBreakerOpenDuration) > 0) { - log.info("Circuit breaker transitioning to HALF_OPEN"); + log.debug("Circuit breaker transitioning to HALF_OPEN"); circuitState = CircuitState.HALF_OPEN; } else { log.debug("Circuit breaker OPEN, skipping registration attempt"); @@ -306,11 +321,20 @@ void tryRegister() { this.pluginInfo.copyFrom(plugin); log.debug("Registered as {}", this.pluginInfo.getId()); + lastSuccessfulRegistration = Instant.now(); + consecutiveFailures.set(0); + if (circuitState == CircuitState.HALF_OPEN) { - log.info("Circuit breaker transitioning to CLOSED"); + log.debug("Circuit breaker transitioning to CLOSED"); } circuitState = CircuitState.CLOSED; - consecutiveFailures.set(0); + + log.debug( + "Registration successful at {}, next attempt" + + " allowed after {}", + lastSuccessfulRegistration, + lastSuccessfulRegistration.plus( + minCooldownDuration)); if (previouslyRegistered) { notify(RegistrationEvent.State.REFRESHED); @@ -322,8 +346,9 @@ void tryRegister() { this.webServer.resetCredentialId(); this.pluginInfo.clear(); - long backoffMs = calculateBackoffMs(); int failures = consecutiveFailures.incrementAndGet(); + long backoffMs = calculateBackoffMs(); + Duration cooldown = Duration.ofMillis(backoffMs); if (circuitState == CircuitState.CLOSED && failures >= circuitBreakerThreshold) { @@ -341,19 +366,20 @@ void tryRegister() { log.error( "Registration failure (attempt {}, circuit state:" - + " {}, retry in {}ms)", + + " {}, cooldown: {})", failures, circuitState, - backoffMs, + cooldown, t); - log.trace( - "Registration retry period: {}", - Duration.ofMillis(backoffMs)); - - executor.schedule( - this::tryRegister, - backoffMs, - TimeUnit.MILLISECONDS); + + if (minCooldownDuration.isZero()) { + executor.schedule( + this::tryRegister, + backoffMs, + TimeUnit.MILLISECONDS); + } else { + enterCooldown(cooldown); + } } return (Void) null; }); @@ -365,20 +391,88 @@ private long calculateBackoffMs() { return registrationRetryMs; } - // Exponential backoff: base * (multiplier ^ failures) + double jitter = 1.0 + (random.nextDouble() * 2 - 1) * JITTER_FACTOR; long backoff = - (long) (registrationRetryMs * Math.pow(backoffMultiplier, Math.min(failures, 10))); - - // Cap at maximum + (long) + (registrationRetryMs + * Math.pow(backoffMultiplier, Math.min(failures - 1, 10))); backoff = Math.min(backoff, maxBackoffMs); - - // Add jitter to prevent thundering herd - double jitter = 1.0 + (random.nextDouble() * 2 - 1) * JITTER_FACTOR; backoff = (long) (backoff * jitter); + backoff = Math.max(backoff, minCooldownDuration.toMillis()); return backoff; } + /** + * Check if the Agent is currently in cooldown period. + * + * @return true if in cooldown, false otherwise + */ + boolean isInCooldown() { + synchronized (cooldownLock) { + return cooldownUntil != null && Instant.now().isBefore(cooldownUntil); + } + } + + /** + * Enter cooldown state for the specified duration. + * + * @param duration the cooldown duration + */ + private void enterCooldown(Duration duration) { + synchronized (cooldownLock) { + if (cooldownExitTask != null) { + cooldownExitTask.cancel(false); + } + + cooldownUntil = Instant.now().plus(duration); + log.debug( + "Entering cooldown for {} after {} consecutive failures", + duration, + consecutiveFailures.get()); + notify(RegistrationEvent.State.COOLDOWN); + + cooldownExitTask = + executor.schedule( + this::exitCooldown, duration.toMillis(), TimeUnit.MILLISECONDS); + } + } + + /** Exit cooldown state and prepare for next registration attempt. */ + private void exitCooldown() { + synchronized (cooldownLock) { + log.trace("Exiting cooldown, ready for next registration attempt"); + cooldownUntil = null; + notify(RegistrationEvent.State.UNREGISTERED); + } + } + + /** + * Get time remaining in cooldown period. + * + * @return Duration remaining, or Duration.ZERO if not in cooldown + */ + Duration getCooldownRemaining() { + synchronized (cooldownLock) { + if (!isInCooldown()) { + return Duration.ZERO; + } + return Duration.between(Instant.now(), cooldownUntil); + } + } + + /** + * Get time since last successful registration. + * + * @return Duration since last success, or null if never registered + */ + Duration getTimeSinceLastSuccess() { + if (lastSuccessfulRegistration.equals(Instant.EPOCH)) { + return null; + } + return Duration.between(lastSuccessfulRegistration, Instant.now()); + } + private void tryUpdate() { if (!this.pluginInfo.isInitialized()) { log.warn("update attempted before initialized"); @@ -481,7 +575,7 @@ private Set defineSelf() throws UnknownHostException, URISyntaxEx void stop() { if (currentRegistration != null && !currentRegistration.isDone()) { - log.info("Cancelling in-flight registration"); + log.trace("Cancelling in-flight registration"); currentRegistration.cancel(true); } } @@ -523,6 +617,7 @@ public enum State { PUBLISHED, REFRESHING, REFRESHED, + COOLDOWN, } public final State state; diff --git a/src/main/resources/META-INF/microprofile-config.properties b/src/main/resources/META-INF/microprofile-config.properties index e4df9ea0..7bd6fa74 100644 --- a/src/main/resources/META-INF/microprofile-config.properties +++ b/src/main/resources/META-INF/microprofile-config.properties @@ -77,6 +77,7 @@ cryostat.agent.registration.max-backoff-ms=300000 cryostat.agent.registration.backoff-multiplier=2.0 cryostat.agent.registration.circuit-breaker-threshold=10 cryostat.agent.registration.circuit-breaker-duration=PT5M +cryostat.agent.registration.min-cooldown-ms=300000 cryostat.agent.exit.deregistration.timeout-ms=10000 cryostat.agent.publish.context= @@ -97,4 +98,7 @@ cryostat.agent.async-profiler.repository.path= cryostat.agent.smart-trigger.definitions= cryostat.agent.smart-trigger.evaluation.period-ms=1000 +cryostat.agent.credential.cleanup.interval=PT1M +cryostat.agent.credential.cleanup.max-retries=5 + cryostat.agent.fleet-sampling-ratio=Infinity diff --git a/src/test/java/io/cryostat/agent/CredentialCleanupJobTest.java b/src/test/java/io/cryostat/agent/CredentialCleanupJobTest.java new file mode 100644 index 00000000..bf5e4124 --- /dev/null +++ b/src/test/java/io/cryostat/agent/CredentialCleanupJobTest.java @@ -0,0 +1,143 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.agent; + +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import dagger.Lazy; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class CredentialCleanupJobTest { + + @Mock ScheduledExecutorService executor; + @Mock CredentialTracker tracker; + @Mock Lazy cryostat; + @Mock CryostatClient cryostatClient; + @Mock ScheduledFuture scheduledFuture; + + private CredentialCleanupJob cleanupJob; + private static final Duration CLEANUP_INTERVAL = Duration.ofMinutes(1); + private static final int MAX_RETRIES = 5; + + @BeforeEach + void setup() { + lenient().when(cryostat.get()).thenReturn(cryostatClient); + cleanupJob = + new CredentialCleanupJob( + executor, tracker, cryostat, CLEANUP_INTERVAL, MAX_RETRIES); + } + + @Test + @SuppressWarnings("unchecked") + void testStartSchedulesCleanupJob() { + when(executor.scheduleAtFixedRate( + any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))) + .thenReturn((ScheduledFuture) scheduledFuture); + + cleanupJob.start(); + + verify(executor) + .scheduleAtFixedRate( + any(Runnable.class), + eq(CLEANUP_INTERVAL.toMillis()), + eq(CLEANUP_INTERVAL.toMillis()), + eq(TimeUnit.MILLISECONDS)); + } + + @Test + @SuppressWarnings("unchecked") + void testStopCancelsScheduledTask() { + when(executor.scheduleAtFixedRate( + any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))) + .thenReturn((ScheduledFuture) scheduledFuture); + + cleanupJob.start(); + cleanupJob.stop(); + + verify(scheduledFuture).cancel(false); + } + + @Test + void testCleanupOrphanedCredentialsWithNoPendingDeletions() { + when(tracker.getPendingDeletion()) + .thenReturn(new java.util.concurrent.ConcurrentLinkedQueue<>()); + + cleanupJob.cleanupOrphanedCredentials(); + + verify(cryostatClient, never()).deleteCredentials(anyInt()); + } + + @Test + void testCleanupOrphanedCredentialsSuccessfulDeletion() { + java.util.Queue pending = new java.util.concurrent.ConcurrentLinkedQueue<>(); + pending.offer(1); + pending.offer(2); + when(tracker.getPendingDeletion()).thenReturn(pending); + when(cryostatClient.deleteCredentials(anyInt())) + .thenReturn(CompletableFuture.completedFuture(null)); + + cleanupJob.cleanupOrphanedCredentials(); + + verify(cryostatClient).deleteCredentials(1); + verify(cryostatClient).deleteCredentials(2); + verify(tracker, times(2)).trackDeleted(anyInt()); + } + + @Test + void testCleanupOrphanedCredentialsFailedDeletion() { + java.util.Queue pending = new java.util.concurrent.ConcurrentLinkedQueue<>(); + pending.offer(1); + when(tracker.getPendingDeletion()).thenReturn(pending); + when(cryostatClient.deleteCredentials(1)) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Delete failed"))); + + cleanupJob.cleanupOrphanedCredentials(); + + verify(cryostatClient).deleteCredentials(1); + verify(tracker).markForDeletion(1); + verify(tracker, never()).trackDeleted(1); + } + + @Test + void testCleanupOrphanedCredentialsGivesUpAfterMaxRetries() { + java.util.Queue pending = new java.util.concurrent.ConcurrentLinkedQueue<>(); + for (int i = 0; i < MAX_RETRIES + 1; i++) { + pending.offer(1); + } + when(tracker.getPendingDeletion()).thenReturn(pending); + when(cryostatClient.deleteCredentials(1)) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Delete failed"))); + + for (int i = 0; i <= MAX_RETRIES; i++) { + cleanupJob.cleanupOrphanedCredentials(); + } + + verify(cryostatClient, times(MAX_RETRIES)).deleteCredentials(1); + verify(tracker, times(1)).trackDeleted(1); + } +} diff --git a/src/test/java/io/cryostat/agent/CredentialTrackerTest.java b/src/test/java/io/cryostat/agent/CredentialTrackerTest.java new file mode 100644 index 00000000..df72e2d9 --- /dev/null +++ b/src/test/java/io/cryostat/agent/CredentialTrackerTest.java @@ -0,0 +1,113 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.agent; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Queue; +import java.util.Set; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class CredentialTrackerTest { + + private CredentialTracker tracker; + + @BeforeEach + void setup() { + tracker = new CredentialTracker(); + } + + @Test + void testTrackCreated() { + tracker.trackCreated(1); + tracker.trackCreated(2); + + Set orphaned = tracker.getOrphanedCredentials(); + assertEquals(2, orphaned.size()); + assertTrue(orphaned.contains(1)); + assertTrue(orphaned.contains(2)); + } + + @Test + void testTrackDeleted() { + tracker.trackCreated(1); + tracker.trackCreated(2); + tracker.trackDeleted(1); + + Set orphaned = tracker.getOrphanedCredentials(); + assertEquals(1, orphaned.size()); + assertTrue(orphaned.contains(2)); + assertFalse(orphaned.contains(1)); + } + + @Test + void testMarkForDeletion() { + tracker.trackCreated(1); + tracker.markForDeletion(1); + + Queue pending = tracker.getPendingDeletion(); + assertEquals(1, pending.size()); + assertTrue(pending.contains(1)); + } + + @Test + void testMarkForDeletionIgnoresUntrackedCredentials() { + tracker.markForDeletion(999); + + Queue pending = tracker.getPendingDeletion(); + assertTrue(pending.isEmpty()); + } + + @Test + void testClear() { + tracker.trackCreated(1); + tracker.trackCreated(2); + tracker.markForDeletion(1); + + tracker.clear(); + + Set orphaned = tracker.getOrphanedCredentials(); + Queue pending = tracker.getPendingDeletion(); + + assertTrue(orphaned.isEmpty()); + assertTrue(pending.isEmpty()); + } + + @Test + void testGetPendingDeletionReturnsSnapshot() { + tracker.trackCreated(1); + tracker.markForDeletion(1); + + Queue pending1 = tracker.getPendingDeletion(); + Queue pending2 = tracker.getPendingDeletion(); + + assertNotSame(pending1, pending2); + assertEquals(pending1.size(), pending2.size()); + } + + @Test + void testGetOrphanedCredentialsReturnsSnapshot() { + tracker.trackCreated(1); + + Set orphaned1 = tracker.getOrphanedCredentials(); + Set orphaned2 = tracker.getOrphanedCredentials(); + + assertNotSame(orphaned1, orphaned2); + assertEquals(orphaned1.size(), orphaned2.size()); + } +} diff --git a/src/test/java/io/cryostat/agent/CryostatClientTest.java b/src/test/java/io/cryostat/agent/CryostatClientTest.java index 65692765..96889f32 100644 --- a/src/test/java/io/cryostat/agent/CryostatClientTest.java +++ b/src/test/java/io/cryostat/agent/CryostatClientTest.java @@ -48,6 +48,7 @@ class CryostatClientTest { @Mock Executor executor; @Mock ObjectMapper mapper; @Mock HttpClient http; + @Mock CredentialTracker credentialTracker; @Mock ClassicHttpResponse checkResponse; @Mock ClassicHttpResponse submitResponse; @Mock HttpEntity checkEntity; @@ -70,6 +71,7 @@ void setup() { executor, mapper, http, + credentialTracker, INSTANCE_ID, JVM_ID, APP_NAME, @@ -161,6 +163,7 @@ void testSubmitCredentialsWithDifferentRealm() throws Exception { executor, mapper, http, + credentialTracker, INSTANCE_ID, JVM_ID, APP_NAME, diff --git a/src/test/java/io/cryostat/agent/RegistrationTest.java b/src/test/java/io/cryostat/agent/RegistrationTest.java index 3630047a..deb676d1 100644 --- a/src/test/java/io/cryostat/agent/RegistrationTest.java +++ b/src/test/java/io/cryostat/agent/RegistrationTest.java @@ -60,6 +60,8 @@ class RegistrationTest { private static final double BACKOFF_MULTIPLIER = 2.0; private static final int CIRCUIT_BREAKER_THRESHOLD = 10; private static final Duration CIRCUIT_BREAKER_DURATION = Duration.ofMinutes(5); + private static final Duration MIN_COOLDOWN_DURATION = + Duration.ZERO; // Disable cooldown for existing tests @BeforeEach void setup() { @@ -84,6 +86,7 @@ void setup() { BACKOFF_MULTIPLIER, CIRCUIT_BREAKER_THRESHOLD, CIRCUIT_BREAKER_DURATION, + MIN_COOLDOWN_DURATION, random); } From 0432c1aef5cf454822a00bec0640aebb40022a20 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 23 Apr 2026 15:54:10 -0400 Subject: [PATCH 7/7] fix(registration): shut down WebServer when in cooldown --- .../java/io/cryostat/agent/Registration.java | 20 +++ .../java/io/cryostat/agent/WebServer.java | 130 ++++++++++++++++++ 2 files changed, 150 insertions(+) diff --git a/src/main/java/io/cryostat/agent/Registration.java b/src/main/java/io/cryostat/agent/Registration.java index 073fa4fe..a7234a49 100644 --- a/src/main/java/io/cryostat/agent/Registration.java +++ b/src/main/java/io/cryostat/agent/Registration.java @@ -432,6 +432,22 @@ private void enterCooldown(Duration duration) { consecutiveFailures.get()); notify(RegistrationEvent.State.COOLDOWN); + webServer + .performCleanup(this) + .thenRun( + () -> { + log.trace("Cleanup complete, WebServer entering cooldown mode"); + webServer.enterCooldownMode(); + }) + .exceptionally( + t -> { + log.warn( + "Cleanup failed, WebServer entering cooldown mode anyway", + t); + webServer.enterCooldownMode(); + return null; + }); + cooldownExitTask = executor.schedule( this::exitCooldown, duration.toMillis(), TimeUnit.MILLISECONDS); @@ -443,7 +459,11 @@ private void exitCooldown() { synchronized (cooldownLock) { log.trace("Exiting cooldown, ready for next registration attempt"); cooldownUntil = null; + + webServer.exitCooldownMode(); + notify(RegistrationEvent.State.UNREGISTERED); + tryRegister(); } } diff --git a/src/main/java/io/cryostat/agent/WebServer.java b/src/main/java/io/cryostat/agent/WebServer.java index 8d4f7f66..c92ac34b 100644 --- a/src/main/java/io/cryostat/agent/WebServer.java +++ b/src/main/java/io/cryostat/agent/WebServer.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.zip.DeflaterOutputStream; @@ -58,6 +59,18 @@ class WebServer { private final AgentAuthenticator agentAuthenticator; private final RequestLoggingFilter requestLoggingFilter; private final CompressionFilter compressionFilter; + private final CooldownFilter cooldownFilter; + + private volatile ServerState serverState = ServerState.STOPPED; + private final Object stateLock = new Object(); + + public enum ServerState { + STOPPED, + STARTING, + RUNNING, + REJECTING, + STOPPING + } WebServer( SecureRandom random, @@ -77,9 +90,14 @@ class WebServer { this.agentAuthenticator = new AgentAuthenticator(); this.requestLoggingFilter = new RequestLoggingFilter(); this.compressionFilter = new CompressionFilter(); + this.cooldownFilter = new CooldownFilter(); } void start() { + synchronized (stateLock) { + serverState = ServerState.STARTING; + } + Set mergedContexts = new HashSet<>(remoteContexts.get()); mergedContexts.add(new PingContext(registration)); mergedContexts.stream() @@ -87,19 +105,106 @@ void start() { .forEach( rc -> { HttpContext ctx = this.http.createContext(rc.path(), wrap(rc::handle)); + ctx.getFilters().add(0, cooldownFilter); ctx.setAuthenticator(agentAuthenticator); ctx.getFilters().add(requestLoggingFilter); ctx.getFilters().add(compressionFilter); }); this.http.start(); + + synchronized (stateLock) { + serverState = ServerState.RUNNING; + } log.debug("WebServer listening on {}", this.http.getAddress()); } void stop() { + synchronized (stateLock) { + serverState = ServerState.STOPPING; + } if (this.http != null) { this.http.stop(0); this.http = null; } + synchronized (stateLock) { + serverState = ServerState.STOPPED; + } + } + + /** + * Enter cooldown mode: keep socket bound but reject all requests. This allows Cryostat to + * detect the plugin as unhealthy while preventing port binding issues on restart. + */ + void enterCooldownMode() { + synchronized (stateLock) { + if (serverState != ServerState.RUNNING) { + log.warn("Cannot enter cooldown from state: {}", serverState); + return; + } + + log.debug("Entering cooldown mode"); + serverState = ServerState.REJECTING; + } + } + + /** Exit cooldown mode and resume accepting requests. */ + void exitCooldownMode() { + synchronized (stateLock) { + if (serverState != ServerState.REJECTING) { + log.warn("Cannot exit cooldown from state: {}", serverState); + return; + } + + log.debug("Exiting cooldown mode"); + serverState = ServerState.RUNNING; + } + } + + /** Check if server is in a state where it can accept requests. */ + boolean canAcceptRequests() { + synchronized (stateLock) { + return serverState == ServerState.RUNNING; + } + } + + /** + * Perform cleanup before entering cooldown. This is a best-effort operation that should not + * block cooldown entry. + * + * @param reg the Registration instance to use for deregistration + */ + CompletableFuture performCleanup(Registration reg) { + log.debug("Performing cleanup before cooldown"); + + return CompletableFuture.runAsync( + () -> { + if (credentialId >= 0) { + log.trace("Marking credential {} for deletion", credentialId); + resetCredentialId(); + } + + try { + reg.deregister() + .exceptionally( + t -> { + log.warn( + "Failed to deregister during cleanup", + t); + return null; + }) + .get(5, TimeUnit.SECONDS); + } catch (Exception e) { + log.warn("Deregistration during cleanup failed or timed out", e); + } + + log.trace("Cleanup completed"); + }) + .orTimeout(10, TimeUnit.SECONDS) + .exceptionally( + t -> { + log.warn("Cleanup timed out or failed", t); + return null; + }); } int getCredentialId() { @@ -259,6 +364,31 @@ public String description() { } } + private class CooldownFilter extends Filter { + @Override + public void doFilter(HttpExchange exchange, Chain chain) throws IOException { + if (!canAcceptRequests()) { + log.trace( + "Rejecting request during cooldown: {} {}", + exchange.getRequestMethod(), + exchange.getRequestURI().getPath()); + + exchange.getResponseHeaders().add("Retry-After", "60"); + exchange.sendResponseHeaders( + HttpStatus.SC_SERVICE_UNAVAILABLE, RemoteContext.BODY_LENGTH_NONE); + exchange.close(); + return; + } + + chain.doFilter(exchange); + } + + @Override + public String description() { + return "cooldownFilter"; + } + } + private class AgentAuthenticator extends BasicAuthenticator { public AgentAuthenticator() {