From 5573766adda2e0246004d7af58a76f00aa9dfb30 Mon Sep 17 00:00:00 2001 From: Amanda Gonsalves <64379712+amandagonsalves@users.noreply.github.com> Date: Wed, 20 May 2026 16:30:20 -0300 Subject: [PATCH 01/13] test(platform): add test for empty scv_map event * add test coverage for empty scv_map event values * prevent crashes or stalls when processing an empty scv_map event * ensure the observer continues processing and updates the cursor successfully --- .../stellar/StellarRpcPaymentObserverTest.kt | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/platform/src/test/kotlin/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserverTest.kt b/platform/src/test/kotlin/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserverTest.kt index cb8e57c32b..a3993807b2 100644 --- a/platform/src/test/kotlin/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserverTest.kt +++ b/platform/src/test/kotlin/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserverTest.kt @@ -31,6 +31,10 @@ import org.stellar.sdk.requests.sorobanrpc.GetEventsRequest import org.stellar.sdk.responses.sorobanrpc.GetEventsResponse import org.stellar.sdk.scval.Scv import org.stellar.sdk.xdr.OperationType +import org.stellar.sdk.xdr.SCMap +import org.stellar.sdk.xdr.SCMapEntry +import org.stellar.sdk.xdr.SCVal +import org.stellar.sdk.xdr.SCValType class StellarRpcPaymentObserverTest { private lateinit var config: StellarPaymentObserverConfig @@ -376,6 +380,46 @@ class StellarRpcPaymentObserverTest { assertDoesNotThrow { observer.fetchEvents() } assertEquals(ObserverStatus.STREAM_ERROR, observer.getStatus()) } + + @Test + fun `fetchEvents skips empty SCV_MAP poison event without crashing or stalling`() { + val distAccount = KeyPair.random().accountId + val attackerAccount = KeyPair.random().accountId + + val topics = + listOf( + Scv.toSymbol("transfer").toXdrBase64(), + Scv.toAddress(distAccount).toXdrBase64(), + Scv.toAddress(attackerAccount).toXdrBase64(), + Scv.toString("native").toXdrBase64(), + ) + + val emptyMapValue = + SCVal.builder() + .discriminant(SCValType.SCV_MAP) + .map(SCMap(emptyArray())) + .build() + .toXdrBase64() + + val poisonEvent = mockk() + every { poisonEvent.topic } returns topics + every { poisonEvent.value } returns emptyMapValue + + val response = mockk() + every { observer.buildEventRequest(any()) } returns mockk() + every { response.events } returns listOf(poisonEvent) + every { response.latestLedger } returns 777L + every { response.cursor } returns "SAFE_CUR" + every { sorobanServer.getEvents(any()) } returns response + justRun { paymentStreamerCursorStore.saveStellarRpcCursor(any()) } + + observer.setStatus(ObserverStatus.RUNNING) + + assertDoesNotThrow { observer.fetchEvents() } + + assertEquals(ObserverStatus.RUNNING, observer.getStatus()) + assertEquals("SAFE_CUR", observer.cursor) + } } /** From afbefe285ac719fce1b78a8a95d6e59b643067b9 Mon Sep 17 00:00:00 2001 From: Amanda Gonsalves <64379712+amandagonsalves@users.noreply.github.com> Date: Wed, 20 May 2026 16:34:38 -0300 Subject: [PATCH 02/13] fix(stellar-observer): handle muxed account creation * update `scv_u64` memo discriminant check * add error handling for invalid muxed account creation --- .../stellar/StellarRpcPaymentObserver.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java b/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java index 8e3911d113..95854603f5 100644 --- a/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java +++ b/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java @@ -229,12 +229,14 @@ private ShouldProcessResult shouldProcess(EventInfo event) { new String(Base64.getEncoder().encode(memoVal.getBytes().getSCBytes())); default -> null; }; - if (scValue.getMap().getSCMap()[1].getVal().getDiscriminant() == SCValType.SCV_U64) { - toAddr = - new MuxedAccount( - Scv.fromAddress(to).toString(), - Scv.fromUint64(scValue.getMap().getSCMap()[1].getVal())) - .getAddress(); + if (memoVal.getDiscriminant() == SCValType.SCV_U64) { + try { + toAddr = + new MuxedAccount(Scv.fromAddress(to).toString(), Scv.fromUint64(memoVal)) + .getAddress(); + } catch (IllegalArgumentException iae) { + warnF("Cannot build MuxedAccount for address '{}', using unmuxed value. ex={}", toAddr, iae.getMessage()); + } } } From d358bee8b0cb44f78405d40e94c8bd8ef9d45ccb Mon Sep 17 00:00:00 2001 From: Amanda Gonsalves <64379712+amandagonsalves@users.noreply.github.com> Date: Wed, 20 May 2026 16:35:04 -0300 Subject: [PATCH 03/13] refactor(observer): update stellar rpc observer error handling * update exception handling for payment event processing to catch all exceptions * update error logging to include full exception string * refactor logging format for muxed account creation --- .../observer/stellar/StellarRpcPaymentObserver.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java b/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java index 95854603f5..dfb89bbc6e 100644 --- a/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java +++ b/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java @@ -235,7 +235,10 @@ private ShouldProcessResult shouldProcess(EventInfo event) { new MuxedAccount(Scv.fromAddress(to).toString(), Scv.fromUint64(memoVal)) .getAddress(); } catch (IllegalArgumentException iae) { - warnF("Cannot build MuxedAccount for address '{}', using unmuxed value. ex={}", toAddr, iae.getMessage()); + warnF( + "Cannot build MuxedAccount for address '{}', using unmuxed value. ex={}", + toAddr, + iae.getMessage()); } } } @@ -254,11 +257,9 @@ private ShouldProcessResult shouldProcess(EventInfo event) { .eventMemo(eventMemo) .sep11Asset(asset.getStr().getSCString().toString()) .build(); - } catch (IOException ioex) { + } catch (Exception ex) { warnF( - "Skip processing event: {}. ex={}", - GsonUtils.getInstance().toJson(event), - ioex.getMessage()); + "Skip processing event: {}. ex={}", GsonUtils.getInstance().toJson(event), ex.toString()); return builder.build(); } } From b86ec5e59a39be59ecfbda0c1346862d30279d35 Mon Sep 17 00:00:00 2001 From: Amanda Gonsalves <64379712+amandagonsalves@users.noreply.github.com> Date: Wed, 20 May 2026 16:35:26 -0300 Subject: [PATCH 04/13] fix(stellar-rpc-observer): add error handling for events * add try-catch block around event processing * update processing to fix potential observer crashes from single event errors --- .../observer/stellar/StellarRpcPaymentObserver.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java b/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java index dfb89bbc6e..70bc5e21db 100644 --- a/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java +++ b/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java @@ -153,9 +153,16 @@ private void processEvents(List events) { debugF("Processing {} 'transfer' events", events.size()); for (EventInfo event : events) { - ShouldProcessResult result = shouldProcess(event); - if (result.shouldProcess) { - processTransferEvent(result); + try { + ShouldProcessResult result = shouldProcess(event); + if (result.shouldProcess) { + processTransferEvent(result); + } + } catch (Exception ex) { + warnF( + "Skip event due to unexpected error: {}. ex={}", + GsonUtils.getInstance().toJson(event), + ex.toString()); } } } From f58d2a11dbf7973fcd1061fd53e5d5f5a330edfb Mon Sep 17 00:00:00 2001 From: Amanda Gonsalves <64379712+amandagonsalves@users.noreply.github.com> Date: Wed, 20 May 2026 16:35:44 -0300 Subject: [PATCH 05/13] refactor(observer): update cursor saving logic * refactor cursor saving to execute in a `finally` block * update metric and cursor persistence to always run after event processing * fix potential re-processing of events on error during processing --- .../stellar/StellarRpcPaymentObserver.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java b/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java index 70bc5e21db..b79c02000b 100644 --- a/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java +++ b/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java @@ -123,17 +123,18 @@ void fetchEvents() { lastActivityTime = Instant.now(); silenceTimeoutCount = 0; metricLatestBlockRead.set(response.getLatestLedger()); - if (response.getEvents() != null && !response.getEvents().isEmpty()) { - processEvents(response.getEvents()); - } - // Save the cursor for the next request - cursor = response.getCursor(); try { - saveCursor(cursor); - metricLatestBlockProcessed.set(response.getLatestLedger()); - } catch (Exception tex) { - warnF("Failed to persist RPC cursor. Will retry next tick. ex={}", tex.getMessage()); - setStatus(ObserverStatus.DATABASE_ERROR); + if (response.getEvents() != null && !response.getEvents().isEmpty()) { + processEvents(response.getEvents()); + } + } finally { + try { + saveCursor(response.getCursor()); + metricLatestBlockProcessed.set(response.getLatestLedger()); + } catch (Exception tex) { + warnF("Failed to persist RPC cursor. Will retry next tick. ex={}", tex.getMessage()); + setStatus(ObserverStatus.DATABASE_ERROR); + } } } catch (IOException ioex) { warnF( From a1e51f81aa8bdd6f41a844a063524ec6b4c43fa7 Mon Sep 17 00:00:00 2001 From: Amanda Gonsalves <64379712+amandagonsalves@users.noreply.github.com> Date: Wed, 20 May 2026 16:35:58 -0300 Subject: [PATCH 06/13] fix(stellar): fix parsing malformed contract events * add null and length checks for scv_map entries * fix parsing by validating scv_i128 amount discriminant * refactor contract event data extraction for robustness --- .../observer/stellar/StellarRpcPaymentObserver.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java b/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java index b79c02000b..ef5e8644af 100644 --- a/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java +++ b/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java @@ -227,8 +227,16 @@ private ShouldProcessResult shouldProcess(EventInfo event) { if (scValue.getDiscriminant() == SCValType.SCV_I128) { amount = Scv.fromInt128(scValue).longValue(); } else if (scValue.getDiscriminant() == SCValType.SCV_MAP) { - amount = Scv.fromInt128(scValue.getMap().getSCMap()[0].getVal()).longValue(); - SCVal memoVal = scValue.getMap().getSCMap()[1].getVal(); + var entries = scValue.getMap() == null ? null : scValue.getMap().getSCMap(); + if (entries == null || entries.length < 2) { + return builder.build(); + } + SCVal amountVal = entries[0].getVal(); + SCVal memoVal = entries[1].getVal(); + if (amountVal.getDiscriminant() != SCValType.SCV_I128) { + return builder.build(); + } + amount = Scv.fromInt128(amountVal).longValue(); eventMemo = switch (memoVal.getDiscriminant()) { case SCV_STRING -> memoVal.getStr().getSCString().toString(); From cec4e2580368e38b0b9f4bc531dbbf892740a0ed Mon Sep 17 00:00:00 2001 From: Amanda Gonsalves <64379712+amandagonsalves@users.noreply.github.com> Date: Wed, 20 May 2026 16:40:55 -0300 Subject: [PATCH 07/13] test(observer): add tests for varied event map values * add tests for one-entry scv_map events * add tests for scv_map events with wrong first-entry type * add tests for contract-address recipient with scv_u64 memo * update helper function to mock varied poison responses --- .../stellar/StellarRpcPaymentObserverTest.kt | 112 +++++++++++++++++- 1 file changed, 107 insertions(+), 5 deletions(-) diff --git a/platform/src/test/kotlin/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserverTest.kt b/platform/src/test/kotlin/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserverTest.kt index a3993807b2..7125485036 100644 --- a/platform/src/test/kotlin/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserverTest.kt +++ b/platform/src/test/kotlin/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserverTest.kt @@ -22,6 +22,7 @@ import org.stellar.anchor.ledger.StellarRpc import org.stellar.anchor.platform.config.PaymentObserverConfig.StellarPaymentObserverConfig import org.stellar.anchor.platform.observer.PaymentListener import org.stellar.anchor.platform.observer.stellar.AbstractPaymentObserver.ObserverStatus +import org.stellar.sdk.Address import org.stellar.sdk.Asset import org.stellar.sdk.KeyPair import org.stellar.sdk.SorobanServer @@ -385,7 +386,7 @@ class StellarRpcPaymentObserverTest { fun `fetchEvents skips empty SCV_MAP poison event without crashing or stalling`() { val distAccount = KeyPair.random().accountId val attackerAccount = KeyPair.random().accountId - + val topics = listOf( Scv.toSymbol("transfer").toXdrBase64(), @@ -393,14 +394,14 @@ class StellarRpcPaymentObserverTest { Scv.toAddress(attackerAccount).toXdrBase64(), Scv.toString("native").toXdrBase64(), ) - + val emptyMapValue = SCVal.builder() .discriminant(SCValType.SCV_MAP) .map(SCMap(emptyArray())) .build() .toXdrBase64() - + val poisonEvent = mockk() every { poisonEvent.topic } returns topics every { poisonEvent.value } returns emptyMapValue @@ -414,12 +415,113 @@ class StellarRpcPaymentObserverTest { justRun { paymentStreamerCursorStore.saveStellarRpcCursor(any()) } observer.setStatus(ObserverStatus.RUNNING) - + assertDoesNotThrow { observer.fetchEvents() } - + assertEquals(ObserverStatus.RUNNING, observer.getStatus()) assertEquals("SAFE_CUR", observer.cursor) } + + @Test + fun `fetchEvents skips one-entry SCV_MAP without crashing or stalling`() { + val oneEntryMap = + SCVal.builder() + .discriminant(SCValType.SCV_MAP) + .map( + SCMap(arrayOf(SCMapEntry(Scv.toSymbol("amount"), Scv.toInt128(BigInteger.valueOf(100))))) + ) + .build() + .toXdrBase64() + + every { observer.buildEventRequest(any()) } returns mockk() + every { sorobanServer.getEvents(any()) } returns mockPoisonResponse(oneEntryMap) + justRun { paymentStreamerCursorStore.saveStellarRpcCursor(any()) } + observer.setStatus(ObserverStatus.RUNNING) + + assertDoesNotThrow { observer.fetchEvents() } + + assertEquals(ObserverStatus.RUNNING, observer.getStatus()) + assertEquals("POISON_CUR", observer.cursor) + } + + @Test + fun `fetchEvents skips SCV_MAP with wrong first-entry type without crashing or stalling`() { + val wrongTypeMap = + SCVal.builder() + .discriminant(SCValType.SCV_MAP) + .map( + SCMap( + arrayOf( + SCMapEntry(Scv.toSymbol("amount"), Scv.toUint64(BigInteger.valueOf(100))), + SCMapEntry(Scv.toSymbol("memo"), Scv.toString("memo123")), + ) + ) + ) + .build() + .toXdrBase64() + + every { observer.buildEventRequest(any()) } returns mockk() + every { sorobanServer.getEvents(any()) } returns mockPoisonResponse(wrongTypeMap) + justRun { paymentStreamerCursorStore.saveStellarRpcCursor(any()) } + observer.setStatus(ObserverStatus.RUNNING) + + assertDoesNotThrow { observer.fetchEvents() } + + assertEquals(ObserverStatus.RUNNING, observer.getStatus()) + assertEquals("POISON_CUR", observer.cursor) + } + + @Test + fun `fetchEvents handles contract-address recipient with SCV_U64 memo without crashing`() { + val contractAddress = Address.fromContract(ByteArray(32)).toSCVal() + val validMapWithU64Memo = + SCVal.builder() + .discriminant(SCValType.SCV_MAP) + .map( + SCMap( + arrayOf( + SCMapEntry(Scv.toSymbol("amount"), Scv.toInt128(BigInteger.valueOf(500))), + SCMapEntry(Scv.toSymbol("memo"), Scv.toUint64(BigInteger.valueOf(42))), + ) + ) + ) + .build() + .toXdrBase64() + + every { observer.buildEventRequest(any()) } returns mockk() + every { sorobanServer.getEvents(any()) } returns + mockPoisonResponse(validMapWithU64Memo, toSCVal = contractAddress) + justRun { paymentStreamerCursorStore.saveStellarRpcCursor(any()) } + observer.setStatus(ObserverStatus.RUNNING) + + assertDoesNotThrow { observer.fetchEvents() } + + assertEquals(ObserverStatus.RUNNING, observer.getStatus()) + assertEquals("POISON_CUR", observer.cursor) + } + + private fun mockPoisonResponse( + valueBase64: String, + fromAccount: String = KeyPair.random().accountId, + toSCVal: SCVal = Scv.toAddress(KeyPair.random().accountId), + ): GetEventsResponse { + val topics = + listOf( + Scv.toSymbol("transfer").toXdrBase64(), + Scv.toAddress(fromAccount).toXdrBase64(), + toSCVal.toXdrBase64(), + Scv.toString("native").toXdrBase64(), + ) + val event = mockk() + every { event.topic } returns topics + every { event.value } returns valueBase64 + + val response = mockk() + every { response.events } returns listOf(event) + every { response.latestLedger } returns 777L + every { response.cursor } returns "POISON_CUR" + return response + } } /** From 067f8346f9bb5ec47689bad6fc86c9784a8c87c2 Mon Sep 17 00:00:00 2001 From: Amanda Gonsalves <64379712+amandagonsalves@users.noreply.github.com> Date: Wed, 20 May 2026 17:30:09 -0300 Subject: [PATCH 08/13] test(payment-observer): update test assertions and timeout * update timeout for waiting for payment observer events. * refactor payment event assertions to explicitly check for null events. * add explicit failure assertion when payment event waiting times out. --- .../integrationtest/PaymentObserverTests.kt | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt b/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt index d3da10b82b..855250541a 100644 --- a/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt +++ b/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Assertions.assertNull import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test @@ -301,9 +302,11 @@ class PaymentObserverTests { fromEvent: List?, toEvent: List?, ) { - assertEquals(1, fromEvent?.size) - assertEquals(1, toEvent?.size) - assertEquals(fromKeyPair.accountId, fromEvent!![0].from) + assertNotNull(fromEvent) { "Observer did not capture a fromEvent within timeout" } + assertNotNull(toEvent) { "Observer did not capture a toEvent within timeout" } + assertEquals(1, fromEvent!!.size) + assertEquals(1, toEvent!!.size) + assertEquals(fromKeyPair.accountId, fromEvent[0].from) assertEquals(toKeyPair.accountId, fromEvent[0].to) val paymentOperation: PaymentOperation = txn.operations[0] as PaymentOperation assertEquals( @@ -322,9 +325,11 @@ class PaymentObserverTests { fromEvent: List?, toEvent: List?, ) { - assertEquals(1, fromEvent?.size) - assertEquals(1, toEvent?.size) - assertEquals(fromKeyPair.accountId, fromEvent!![0].from) + assertNotNull(fromEvent) { "Observer did not capture a fromEvent within timeout" } + assertNotNull(toEvent) { "Observer did not capture a toEvent within timeout" } + assertEquals(1, fromEvent!!.size) + assertEquals(1, toEvent!!.size) + assertEquals(fromKeyPair.accountId, fromEvent[0].from) assertEquals(toKeyPair.accountId, fromEvent[0].to) val paymentOperation: PathPaymentStrictSendOperation = txn.operations[0] as PathPaymentStrictSendOperation @@ -342,7 +347,7 @@ class PaymentObserverTests { private suspend fun waitForEventsCoroutine( fromAccountId: String, listener: EventCapturingListener, - timeout: Long = 10000L, + timeout: Long = 30000L, ) { val startTime = System.currentTimeMillis() while (System.currentTimeMillis() - startTime <= timeout) { @@ -353,7 +358,9 @@ class PaymentObserverTests { } delay(1000) } - info("Timeout waiting for event for account: $fromAccountId") + assertNotNull(null) { + "Timed out after ${timeout}ms waiting for event from account $fromAccountId" + } } private fun sendTestPayment(fromKeyPair: KeyPair, toKeyPair: KeyPair): Transaction { From 583b3ef95070427d8403f44d291abb2fe4b98ba3 Mon Sep 17 00:00:00 2001 From: Amanda Gonsalves <64379712+amandagonsalves@users.noreply.github.com> Date: Wed, 20 May 2026 17:30:43 -0300 Subject: [PATCH 09/13] test(stellar-rpc-observer): add poison resilience tests * add new test file for stellar rpc payment observer * add tests to verify observer resilience against malformed soroban events * add scenarios where malformed events do not halt event processing * add checks to ensure observer health remains green after encountering poison events * add verification that the cursor advances correctly past mixed batches of events --- .../StellarRpcObserverPoisonResilienceTest.kt | 307 ++++++++++++++++++ 1 file changed, 307 insertions(+) create mode 100644 platform/src/test/kotlin/org/stellar/anchor/platform/observer/stellar/StellarRpcObserverPoisonResilienceTest.kt diff --git a/platform/src/test/kotlin/org/stellar/anchor/platform/observer/stellar/StellarRpcObserverPoisonResilienceTest.kt b/platform/src/test/kotlin/org/stellar/anchor/platform/observer/stellar/StellarRpcObserverPoisonResilienceTest.kt new file mode 100644 index 0000000000..4bd56b249c --- /dev/null +++ b/platform/src/test/kotlin/org/stellar/anchor/platform/observer/stellar/StellarRpcObserverPoisonResilienceTest.kt @@ -0,0 +1,307 @@ +package org.stellar.anchor.platform.observer.stellar + +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import java.math.BigInteger +import java.util.concurrent.atomic.AtomicInteger +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.stellar.anchor.api.platform.HealthCheckStatus.GREEN +import org.stellar.anchor.asset.AssetService +import org.stellar.anchor.ledger.StellarRpc +import org.stellar.anchor.platform.config.PaymentObserverConfig.StellarPaymentObserverConfig +import org.stellar.anchor.platform.observer.PaymentListener +import org.stellar.anchor.platform.observer.stellar.AbstractPaymentObserver.ObserverStatus +import org.stellar.sdk.Address +import org.stellar.sdk.KeyPair +import org.stellar.sdk.SorobanServer +import org.stellar.sdk.requests.sorobanrpc.GetEventsRequest +import org.stellar.sdk.responses.sorobanrpc.GetEventsResponse +import org.stellar.sdk.scval.Scv +import org.stellar.sdk.xdr.SCMap +import org.stellar.sdk.xdr.SCMapEntry +import org.stellar.sdk.xdr.SCVal +import org.stellar.sdk.xdr.SCValType + +class StellarRpcObserverPoisonResilienceTest { + private lateinit var config: StellarPaymentObserverConfig + private lateinit var observer: StellarRpcPaymentObserver + private lateinit var sorobanServer: SorobanServer + private lateinit var stellarRpc: StellarRpc + private lateinit var paymentObservingAccountsManager: PaymentObservingAccountsManager + private lateinit var cursorStore: InMemoryCursorStore + private lateinit var assetService: AssetService + + @BeforeEach + fun setUp() { + config = + StellarPaymentObserverConfig().apply { + silenceCheckInterval = 60 + silenceTimeout = 300 + silenceTimeoutRetries = 3 + initialStreamBackoffTime = 500 + maxStreamBackoffTime = 5000 + initialEventBackoffTime = 250 + maxEventBackoffTime = 2500 + } + stellarRpc = mockk(relaxed = true) + sorobanServer = mockk(relaxed = true) + every { stellarRpc.sorobanServer } returns sorobanServer + every { stellarRpc.getSorobanServer() } returns sorobanServer + every { sorobanServer.getLatestLedger() } returns mockk { every { sequence } returns 10 } + + paymentObservingAccountsManager = mockk(relaxed = true) + cursorStore = InMemoryCursorStore() + assetService = mockk(relaxed = true) + + observer = + spyk( + StellarRpcPaymentObserver( + stellarRpc, + config, + emptyList(), + paymentObservingAccountsManager, + cursorStore, + MockSacToAssetMapper(), + assetService, + ), + recordPrivateCalls = true, + ) + every { observer.buildEventRequest(any()) } returns mockk() + } + + @AfterEach + fun tearDown() { + runCatching { observer.shutdown() } + } + + @Test + fun `observer stays RUNNING after empty SCV_MAP poison event on real scheduler`() { + val poisonValue = + SCVal.builder() + .discriminant(SCValType.SCV_MAP) + .map(SCMap(emptyArray())) + .build() + .toXdrBase64() + + setupSequentialResponses(poisonValue) + observer.start() + + waitForCursor("NORMAL_CUR") + + assertEquals(ObserverStatus.RUNNING, observer.getStatus()) + assertEquals("NORMAL_CUR", cursorStore.loadStellarRpcCursor()) + } + + @Test + fun `observer stays RUNNING after one-entry SCV_MAP poison event on real scheduler`() { + val poisonValue = + SCVal.builder() + .discriminant(SCValType.SCV_MAP) + .map( + SCMap(arrayOf(SCMapEntry(Scv.toSymbol("amount"), Scv.toInt128(BigInteger.valueOf(100))))) + ) + .build() + .toXdrBase64() + + setupSequentialResponses(poisonValue) + observer.start() + + waitForCursor("NORMAL_CUR") + + assertEquals(ObserverStatus.RUNNING, observer.getStatus()) + assertEquals("NORMAL_CUR", cursorStore.loadStellarRpcCursor()) + } + + @Test + fun `observer stays RUNNING after SCV_MAP with wrong first-entry type on real scheduler`() { + val poisonValue = + SCVal.builder() + .discriminant(SCValType.SCV_MAP) + .map( + SCMap( + arrayOf( + SCMapEntry(Scv.toSymbol("amount"), Scv.toUint64(BigInteger.valueOf(100))), + SCMapEntry(Scv.toSymbol("memo"), Scv.toString("memo123")), + ) + ) + ) + .build() + .toXdrBase64() + + setupSequentialResponses(poisonValue) + observer.start() + + waitForCursor("NORMAL_CUR") + + assertEquals(ObserverStatus.RUNNING, observer.getStatus()) + assertEquals("NORMAL_CUR", cursorStore.loadStellarRpcCursor()) + } + + @Test + fun `observer stays RUNNING after contract-address recipient with SCV_U64 memo on real scheduler`() { + val poisonValue = + SCVal.builder() + .discriminant(SCValType.SCV_MAP) + .map( + SCMap( + arrayOf( + SCMapEntry(Scv.toSymbol("amount"), Scv.toInt128(BigInteger.valueOf(500))), + SCMapEntry(Scv.toSymbol("memo"), Scv.toUint64(BigInteger.valueOf(42))), + ) + ) + ) + .build() + .toXdrBase64() + val contractAddress = Address.fromContract(ByteArray(32)).toSCVal() + + setupSequentialResponses(poisonValue, toSCVal = contractAddress) + observer.start() + + waitForCursor("NORMAL_CUR") + + assertEquals(ObserverStatus.RUNNING, observer.getStatus()) + assertEquals("NORMAL_CUR", cursorStore.loadStellarRpcCursor()) + } + + @Test + fun `observer health check returns GREEN after processing poison events`() { + val poisonValue = + SCVal.builder() + .discriminant(SCValType.SCV_MAP) + .map(SCMap(emptyArray())) + .build() + .toXdrBase64() + + setupSequentialResponses(poisonValue) + observer.start() + + waitForCursor("NORMAL_CUR") + + val healthResult = observer.check() + assertNotNull(healthResult) + assertEquals(GREEN, healthResult.status) + } + + @Test + fun `observer advances cursor past mixed batch containing poison and valid events`() { + val emptyMapPoison = + SCVal.builder() + .discriminant(SCValType.SCV_MAP) + .map(SCMap(emptyArray())) + .build() + .toXdrBase64() + val validValue = Scv.toInt128(BigInteger.valueOf(100)).toXdrBase64() + + val callCount = AtomicInteger(0) + every { sorobanServer.getEvents(any()) } answers + { + if (callCount.incrementAndGet() == 1) + mockBatchResponse(listOf(emptyMapPoison, validValue), "BATCH_CUR") + else mockEmptyBatchResponse("NORMAL_CUR") + } + + observer.start() + + waitForCursor("NORMAL_CUR") + + assertEquals(ObserverStatus.RUNNING, observer.getStatus()) + assertEquals("NORMAL_CUR", cursorStore.loadStellarRpcCursor()) + } + + // ── helpers ────────────────────────────────────────────────────────────────── + + private fun setupSequentialResponses( + poisonValueBase64: String, + toSCVal: SCVal = Scv.toAddress(KeyPair.random().accountId), + ) { + val callCount = AtomicInteger(0) + every { sorobanServer.getEvents(any()) } answers + { + if (callCount.incrementAndGet() == 1) + mockPoisonResponse(poisonValueBase64, toSCVal, "POISON_CUR") + else mockEmptyBatchResponse("NORMAL_CUR") + } + } + + private fun mockPoisonResponse( + valueBase64: String, + toSCVal: SCVal, + cursor: String, + ): GetEventsResponse { + val topics = + listOf( + Scv.toSymbol("transfer").toXdrBase64(), + Scv.toAddress(KeyPair.random().accountId).toXdrBase64(), + toSCVal.toXdrBase64(), + Scv.toString("native").toXdrBase64(), + ) + val event = mockk() + every { event.topic } returns topics + every { event.value } returns valueBase64 + + return mockk().also { + every { it.events } returns listOf(event) + every { it.latestLedger } returns 100L + every { it.cursor } returns cursor + } + } + + private fun mockBatchResponse( + valuesBase64: List, + cursor: String, + ): GetEventsResponse { + val events = + valuesBase64.map { valueBase64 -> + val topics = + listOf( + Scv.toSymbol("transfer").toXdrBase64(), + Scv.toAddress(KeyPair.random().accountId).toXdrBase64(), + Scv.toAddress(KeyPair.random().accountId).toXdrBase64(), + Scv.toString("native").toXdrBase64(), + ) + mockk().also { + every { it.topic } returns topics + every { it.value } returns valueBase64 + } + } + return mockk().also { + every { it.events } returns events + every { it.latestLedger } returns 100L + every { it.cursor } returns cursor + } + } + + private fun mockEmptyBatchResponse(cursor: String): GetEventsResponse = + mockk().also { + every { it.events } returns emptyList() + every { it.latestLedger } returns 101L + every { it.cursor } returns cursor + } + + private fun waitForCursor(expected: String, timeoutMs: Long = 6000) { + val deadline = System.currentTimeMillis() + timeoutMs + while (System.currentTimeMillis() < deadline) { + if (cursorStore.loadStellarRpcCursor() == expected) return + Thread.sleep(100) + } + } +} + +private class InMemoryCursorStore : StellarPaymentStreamerCursorStore { + private var horizonCursor = "" + private var stellarRpcCursor = "" + + override fun saveHorizonCursor(cursor: String) { + horizonCursor = cursor + } + override fun loadHorizonCursor(): String = horizonCursor + override fun saveStellarRpcCursor(cursor: String) { + stellarRpcCursor = cursor + } + override fun loadStellarRpcCursor(): String = stellarRpcCursor +} From 3a4abce052771f929b24652624a09f92a78c446a Mon Sep 17 00:00:00 2001 From: Amanda Gonsalves <64379712+amandagonsalves@users.noreply.github.com> Date: Wed, 20 May 2026 17:57:36 -0300 Subject: [PATCH 10/13] test(payment-observer): update event wait timeout * update the default timeout for event capturing to 60 seconds * allow more time for payment observer events to be processed and captured in integration tests --- .../anchor/platform/integrationtest/PaymentObserverTests.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt b/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt index 855250541a..02c8fbd7e9 100644 --- a/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt +++ b/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt @@ -347,7 +347,7 @@ class PaymentObserverTests { private suspend fun waitForEventsCoroutine( fromAccountId: String, listener: EventCapturingListener, - timeout: Long = 30000L, + timeout: Long = 60000L, ) { val startTime = System.currentTimeMillis() while (System.currentTimeMillis() - startTime <= timeout) { From 37c5447d4838e1c1d8fae463cf3e28609bcef289 Mon Sep 17 00:00:00 2001 From: Amanda Gonsalves <64379712+amandagonsalves@users.noreply.github.com> Date: Thu, 21 May 2026 15:23:02 -0300 Subject: [PATCH 11/13] refactor(test): update test failure assertion * update test assertion from `assertNotNull(null)` to `fail()` * refactor test failure logic for clarity and explicitness --- .../anchor/platform/integrationtest/PaymentObserverTests.kt | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt b/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt index 02c8fbd7e9..ec4c3981b2 100644 --- a/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt +++ b/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt @@ -11,6 +11,7 @@ import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.fail import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test import org.stellar.anchor.asset.AssetService @@ -358,9 +359,7 @@ class PaymentObserverTests { } delay(1000) } - assertNotNull(null) { - "Timed out after ${timeout}ms waiting for event from account $fromAccountId" - } + fail("Timed out after ${timeout}ms waiting for event from account $fromAccountId") } private fun sendTestPayment(fromKeyPair: KeyPair, toKeyPair: KeyPair): Transaction { From 6d6275b0a37f36661e14daf4439bf69aa1b8afca Mon Sep 17 00:00:00 2001 From: Amanda Gonsalves <64379712+amandagonsalves@users.noreply.github.com> Date: Thu, 21 May 2026 15:24:13 -0300 Subject: [PATCH 12/13] fix(observer): shut down executor service * shut down the executor service when observer is stopped * prevent potential resource leaks --- .../platform/observer/stellar/StellarRpcPaymentObserver.java | 1 + 1 file changed, 1 insertion(+) diff --git a/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java b/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java index ef5e8644af..4fe514d7a6 100644 --- a/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java +++ b/platform/src/main/java/org/stellar/anchor/platform/observer/stellar/StellarRpcPaymentObserver.java @@ -82,6 +82,7 @@ void startInternal() { @Override void shutdownInternal() { task.cancel(true); + executorService.shutdownNow(); status = ObserverStatus.SHUTDOWN; } From 53b8d7ce90e01f07eca9e9db2c236a4ae60913fa Mon Sep 17 00:00:00 2001 From: Amanda Gonsalves <64379712+amandagonsalves@users.noreply.github.com> Date: Thu, 21 May 2026 16:36:01 -0300 Subject: [PATCH 13/13] refactor(tests): update assertion failure mechanism * delete junit assertions.fail import * refactor test timeout failure to directly throw assertionerror --- .../anchor/platform/integrationtest/PaymentObserverTests.kt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt b/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt index ec4c3981b2..08499f3782 100644 --- a/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt +++ b/essential-tests/src/testFixtures/kotlin/org/stellar/anchor/platform/integrationtest/PaymentObserverTests.kt @@ -11,7 +11,6 @@ import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Assertions.assertNull -import org.junit.jupiter.api.Assertions.fail import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test import org.stellar.anchor.asset.AssetService @@ -359,7 +358,9 @@ class PaymentObserverTests { } delay(1000) } - fail("Timed out after ${timeout}ms waiting for event from account $fromAccountId") + throw AssertionError( + "Timed out after ${timeout}ms waiting for event from account $fromAccountId" + ) } private fun sendTestPayment(fromKeyPair: KeyPair, toKeyPair: KeyPair): Transaction {