Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions zetaclient/chains/sui/observer/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ func (ob *Observer) observeGatewayInbound(ctx context.Context, packageID string)
Int("events", len(events)).
Msg("processing inbound events")

txCache := make(map[string]*models.SuiTransactionBlockResponse)

for _, event := range events {
// Note: we can make this concurrent if needed.
// Let's revisit later
err := ob.processInboundEvent(ctx, event, nil, false, false)
err := ob.processInboundEvent(ctx, event, nil, txCache, false, false)

switch {
case errors.Is(err, errTxNotFound),
Expand Down Expand Up @@ -159,6 +161,7 @@ func (ob *Observer) processInboundEvent(
ctx context.Context,
raw models.SuiEventResponse,
tx *models.SuiTransactionBlockResponse,
txCache map[string]*models.SuiTransactionBlockResponse,
fromTracker bool,
isInternalTracker bool,
) error {
Expand All @@ -169,11 +172,12 @@ func (ob *Observer) processInboundEvent(
return nil
case err != nil:
return errors.Wrap(err, "unable to parse event")
case event.EventIndex != 0:
// Is it possible to have multiple events per tx?
// e.g. contract "A" calls Gateway multiple times in a single tx (deposit to multiple accounts)
// most likely not, so let's explicitly fail to prevent undefined behavior.
return errors.Errorf("unexpected event index %d for tx %s", event.EventIndex, event.TxHash)
}

if tx == nil && txCache != nil {
if cached, ok := txCache[event.TxHash]; ok {
tx = cached
}
}

if tx == nil {
Expand All @@ -187,6 +191,9 @@ func (ob *Observer) processInboundEvent(
}

tx = &txFresh
if txCache != nil {
txCache[event.TxHash] = tx
}
}

msg, err := ob.constructInboundVote(event, *tx)
Expand Down Expand Up @@ -226,7 +233,7 @@ func (ob *Observer) processInboundTracker(ctx context.Context, tracker cctypes.I
}

for _, event := range tx.Events {
if err := ob.processInboundEvent(ctx, event, &tx, true, isInternal); err != nil {
if err := ob.processInboundEvent(ctx, event, &tx, nil, true, isInternal); err != nil {
return errors.Wrapf(err, "unable to process inbound event %s", event.Id.EventSeq)
}
}
Expand Down
120 changes: 119 additions & 1 deletion zetaclient/chains/sui/observer/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,63 @@ func TestObserver(t *testing.T) {
require.Empty(t, ts.inboundVotesBag)
})

t.Run("ObserveInbound handles multiple gateway events in the same tx", func(t *testing.T) {
// ARRANGE
ts := newTestSuite(t)
packageID := ts.gateway.PackageID()
txHash := "TX_MULTI_EVENT"

evmBob := sample.EthAddress()
evmAlice := sample.EthAddress()

expectedQuery := client.EventQuery{
PackageID: packageID,
Module: sui.GatewayModule,
Cursor: "",
Limit: client.DefaultEventsLimit,
}

events := []models.SuiEventResponse{
ts.SampleEventWithSeq(packageID, txHash, "0", string(sui.DepositEvent), map[string]any{
"coin_type": string(sui.SUI),
"amount": "200",
"sender": "SUI_BOB",
"receiver": evmBob.String(),
}),
ts.SampleEventWithSeq(packageID, txHash, "1", string(sui.DepositAndCallEvent), map[string]any{
"coin_type": string(sui.SUI),
"amount": "300",
"sender": "SUI_ALICE",
"receiver": evmAlice.String(),
"payload": preparePayload([]byte{1, 2, 3}),
}),
}

ts.suiMock.On("QueryModuleEvents", mock.Anything, expectedQuery).Return(events, "", nil)
ts.OnGetTx(txHash, "10000", true, false, nil)

getCctxByHashErr := grpcstatus.Error(grpccodes.InvalidArgument, "anything")
ts.zetaMock.MockGetCctxByHash("", getCctxByHashErr)
ts.CatchInboundVotes()

// ACT
err := ts.ObserveInbound(ts.ctx)

// ASSERT
require.NoError(t, err)
require.Equal(t, "TX_MULTI_EVENT,1", ts.GetAuxString(packageID))
require.Len(t, ts.inboundVotesBag, 2)

require.Equal(t, uint64(0), ts.inboundVotesBag[0].EventIndex)
require.Equal(t, evmBob.String(), ts.inboundVotesBag[0].Receiver)
require.Equal(t, math.NewUint(200), ts.inboundVotesBag[0].Amount)

require.Equal(t, uint64(1), ts.inboundVotesBag[1].EventIndex)
require.Equal(t, evmAlice.String(), ts.inboundVotesBag[1].Receiver)
require.Equal(t, math.NewUint(300), ts.inboundVotesBag[1].Amount)
require.Equal(t, "010203", ts.inboundVotesBag[1].Message)
})

t.Run("ObserveInbound restricted address", func(t *testing.T) {
// ARRANGE
ts := newTestSuite(t)
Expand Down Expand Up @@ -387,6 +444,57 @@ func TestObserver(t *testing.T) {
require.Equal(t, evmAlice.String(), vote.Receiver)
})

t.Run("ProcessInboundTrackers handles multiple gateway events in the same tx", func(t *testing.T) {
// ARRANGE
ts := newTestSuite(t)
packageID := ts.gateway.PackageID()
txHash := "TX_TRACKER_MULTI_EVENT"
chainID := ts.Chain().ChainId

trackers := []cctypes.InboundTracker{
{
ChainId: chainID,
TxHash: txHash,
CoinType: coin.CoinType_Gas,
},
}
ts.zetaMock.On("GetInboundTrackersForChain", mock.Anything, chainID).Return(trackers, nil)

evmBob := sample.EthAddress()
evmAlice := sample.EthAddress()
ts.OnGetTx(txHash, "15000", true, true, []models.SuiEventResponse{
ts.SampleEventWithSeq(packageID, txHash, "0", string(sui.DepositEvent), map[string]any{
"coin_type": string(sui.SUI),
"amount": "1000",
"sender": "SUI_BOB",
"receiver": evmBob.String(),
}),
ts.SampleEventWithSeq(packageID, txHash, "1", string(sui.DepositEvent), map[string]any{
"coin_type": string(sui.SUI),
"amount": "2000",
"sender": "SUI_ALICE",
"receiver": evmAlice.String(),
}),
})

getCctxByHashErr := grpcstatus.Error(grpccodes.InvalidArgument, "anything")
ts.zetaMock.MockGetCctxByHash("", getCctxByHashErr)
ts.CatchInboundVotes()

// ACT
err := ts.ProcessInboundTrackers(ts.ctx)

// ASSERT
require.NoError(t, err)
require.Len(t, ts.inboundVotesBag, 2)
require.Equal(t, uint64(0), ts.inboundVotesBag[0].EventIndex)
require.Equal(t, evmBob.String(), ts.inboundVotesBag[0].Receiver)
require.Equal(t, math.NewUint(1000), ts.inboundVotesBag[0].Amount)
require.Equal(t, uint64(1), ts.inboundVotesBag[1].EventIndex)
require.Equal(t, evmAlice.String(), ts.inboundVotesBag[1].Receiver)
require.Equal(t, math.NewUint(2000), ts.inboundVotesBag[1].Amount)
})

t.Run("ProcessOutboundTrackers", func(t *testing.T) {
// ARRANGE
ts := newTestSuite(t)
Expand Down Expand Up @@ -804,12 +912,22 @@ func newTestSuite(t *testing.T, opts ...func(*testSuiteConfig)) *testSuite {
}

func (ts *testSuite) SampleEvent(packageID, txHash, event string, kv map[string]any) models.SuiEventResponse {
return ts.SampleEventWithSeq(packageID, txHash, "0", event, kv)
}

func (ts *testSuite) SampleEventWithSeq(
packageID,
txHash,
eventSeq,
event string,
kv map[string]any,
) models.SuiEventResponse {
eventType := fmt.Sprintf("%s::%s::%s", packageID, sui.GatewayModule, event)

return models.SuiEventResponse{
Id: models.EventId{
TxDigest: txHash,
EventSeq: "0",
EventSeq: eventSeq,
},
PackageId: packageID,
TransactionModule: "gateway",
Expand Down
Loading