Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -301,9 +302,11 @@ class PaymentObserverTests {
fromEvent: List<PaymentTransferEvent>?,
toEvent: List<PaymentTransferEvent>?,
) {
assertEquals(1, fromEvent?.size)
assertEquals(1, toEvent?.size)
assertEquals(fromKeyPair.accountId, fromEvent!![0].from)
assertNotNull(fromEvent) { "Observer did not capture a fromEvent within timeout" }
assertNotNull(toEvent) { "Observer did not capture a toEvent within timeout" }
assertEquals(1, fromEvent!!.size)
assertEquals(1, toEvent!!.size)
assertEquals(fromKeyPair.accountId, fromEvent[0].from)
assertEquals(toKeyPair.accountId, fromEvent[0].to)
val paymentOperation: PaymentOperation = txn.operations[0] as PaymentOperation
assertEquals(
Expand All @@ -322,9 +325,11 @@ class PaymentObserverTests {
fromEvent: List<PaymentTransferEvent>?,
toEvent: List<PaymentTransferEvent>?,
) {
assertEquals(1, fromEvent?.size)
assertEquals(1, toEvent?.size)
assertEquals(fromKeyPair.accountId, fromEvent!![0].from)
assertNotNull(fromEvent) { "Observer did not capture a fromEvent within timeout" }
assertNotNull(toEvent) { "Observer did not capture a toEvent within timeout" }
assertEquals(1, fromEvent!!.size)
assertEquals(1, toEvent!!.size)
assertEquals(fromKeyPair.accountId, fromEvent[0].from)
assertEquals(toKeyPair.accountId, fromEvent[0].to)
val paymentOperation: PathPaymentStrictSendOperation =
txn.operations[0] as PathPaymentStrictSendOperation
Expand All @@ -342,7 +347,7 @@ class PaymentObserverTests {
private suspend fun waitForEventsCoroutine(
fromAccountId: String,
listener: EventCapturingListener,
timeout: Long = 10000L,
timeout: Long = 60000L,
) {
Comment thread
amandagonsalves marked this conversation as resolved.
val startTime = System.currentTimeMillis()
while (System.currentTimeMillis() - startTime <= timeout) {
Expand All @@ -353,7 +358,9 @@ class PaymentObserverTests {
}
delay(1000)
}
info("Timeout waiting for event for account: $fromAccountId")
throw AssertionError(
"Timed out after ${timeout}ms waiting for event from account $fromAccountId"
)
}

private fun sendTestPayment(fromKeyPair: KeyPair, toKeyPair: KeyPair): Transaction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ void startInternal() {
@Override
void shutdownInternal() {
task.cancel(true);
executorService.shutdownNow();
status = ObserverStatus.SHUTDOWN;
}

Expand Down Expand Up @@ -123,17 +124,18 @@ void fetchEvents() {
lastActivityTime = Instant.now();
silenceTimeoutCount = 0;
metricLatestBlockRead.set(response.getLatestLedger());
if (response.getEvents() != null && !response.getEvents().isEmpty()) {
processEvents(response.getEvents());
}
// Save the cursor for the next request
cursor = response.getCursor();
try {
saveCursor(cursor);
metricLatestBlockProcessed.set(response.getLatestLedger());
} catch (Exception tex) {
warnF("Failed to persist RPC cursor. Will retry next tick. ex={}", tex.getMessage());
setStatus(ObserverStatus.DATABASE_ERROR);
if (response.getEvents() != null && !response.getEvents().isEmpty()) {
processEvents(response.getEvents());
}
} finally {
try {
saveCursor(response.getCursor());
metricLatestBlockProcessed.set(response.getLatestLedger());
Comment on lines +133 to +134
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need streamBackoffTimer.reset(); here after a successful saveCursor. This is reporter's Fix #4

Without this, an operator could still observe slow walks toward self-shutdown that have nothing to do with the malformed-event attack.

} catch (Exception tex) {
warnF("Failed to persist RPC cursor. Will retry next tick. ex={}", tex.getMessage());
setStatus(ObserverStatus.DATABASE_ERROR);
}
}
} catch (IOException ioex) {
warnF(
Expand All @@ -153,9 +155,16 @@ private void processEvents(List<EventInfo> events) {
debugF("Processing {} 'transfer' events", events.size());

for (EventInfo event : events) {
ShouldProcessResult result = shouldProcess(event);
if (result.shouldProcess) {
processTransferEvent(result);
try {
ShouldProcessResult result = shouldProcess(event);
if (result.shouldProcess) {
processTransferEvent(result);
}
} catch (Exception ex) {
warnF(
"Skip event due to unexpected error: {}. ex={}",
GsonUtils.getInstance().toJson(event),
ex.toString());
}
}
}
Expand Down Expand Up @@ -219,8 +228,16 @@ private ShouldProcessResult shouldProcess(EventInfo event) {
if (scValue.getDiscriminant() == SCValType.SCV_I128) {
amount = Scv.fromInt128(scValue).longValue();
} else if (scValue.getDiscriminant() == SCValType.SCV_MAP) {
amount = Scv.fromInt128(scValue.getMap().getSCMap()[0].getVal()).longValue();
SCVal memoVal = scValue.getMap().getSCMap()[1].getVal();
var entries = scValue.getMap() == null ? null : scValue.getMap().getSCMap();
if (entries == null || entries.length < 2) {
return builder.build();
}
SCVal amountVal = entries[0].getVal();
SCVal memoVal = entries[1].getVal();
if (amountVal.getDiscriminant() != SCValType.SCV_I128) {
return builder.build();
}
amount = Scv.fromInt128(amountVal).longValue();
eventMemo =
switch (memoVal.getDiscriminant()) {
case SCV_STRING -> memoVal.getStr().getSCString().toString();
Expand All @@ -229,12 +246,17 @@ private ShouldProcessResult shouldProcess(EventInfo event) {
new String(Base64.getEncoder().encode(memoVal.getBytes().getSCBytes()));
default -> null;
};
if (scValue.getMap().getSCMap()[1].getVal().getDiscriminant() == SCValType.SCV_U64) {
toAddr =
new MuxedAccount(
Scv.fromAddress(to).toString(),
Scv.fromUint64(scValue.getMap().getSCMap()[1].getVal()))
.getAddress();
if (memoVal.getDiscriminant() == SCValType.SCV_U64) {
try {
toAddr =
new MuxedAccount(Scv.fromAddress(to).toString(), Scv.fromUint64(memoVal))
.getAddress();
} catch (IllegalArgumentException iae) {
warnF(
"Cannot build MuxedAccount for address '{}', using unmuxed value. ex={}",
toAddr,
iae.getMessage());
}
}
}

Expand All @@ -252,11 +274,9 @@ private ShouldProcessResult shouldProcess(EventInfo event) {
.eventMemo(eventMemo)
.sep11Asset(asset.getStr().getSCString().toString())
.build();
} catch (IOException ioex) {
} catch (Exception ex) {
warnF(
"Skip processing event: {}. ex={}",
GsonUtils.getInstance().toJson(event),
ioex.getMessage());
"Skip processing event: {}. ex={}", GsonUtils.getInstance().toJson(event), ex.toString());
return builder.build();
}
}
Expand Down
Loading
Loading