Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
DROP TABLE IF EXISTS default.libp2p_join ON CLUSTER '{cluster}' SYNC;
DROP TABLE IF EXISTS default.libp2p_join_local ON CLUSTER '{cluster}' SYNC;
DROP TABLE IF EXISTS default.libp2p_leave ON CLUSTER '{cluster}' SYNC;
DROP TABLE IF EXISTS default.libp2p_leave_local ON CLUSTER '{cluster}' SYNC;

CREATE TABLE IF NOT EXISTS default.libp2p_join_local ON CLUSTER '{cluster}'
(
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`event_date_time` DateTime64(3) COMMENT 'Timestamp of the event' CODEC(DoubleDelta, ZSTD(1)),
`topic_layer` LowCardinality(String) COMMENT 'Layer of the topic',
`topic_fork_digest_value` LowCardinality(String) COMMENT 'Fork digest value of the topic',
`topic_name` LowCardinality(String) COMMENT 'Name of the topic',
`topic_encoding` LowCardinality(String) COMMENT 'Encoding of the topic',
`peer_id_unique_key` Int64 COMMENT 'Unique key associated with the identifier of the peer that joined the topic',
`meta_client_name` LowCardinality(String) COMMENT 'Name of the client that generated the event',
`meta_client_version` LowCardinality(String) COMMENT 'Version of the client that generated the event',
`meta_client_implementation` LowCardinality(String) COMMENT 'Implementation of the client that generated the event',
`meta_client_os` LowCardinality(String) COMMENT 'Operating system of the client that generated the event',
`meta_client_ip` Nullable(IPv6) COMMENT 'IP address of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_city` LowCardinality(String) COMMENT 'City of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country` LowCardinality(String) COMMENT 'Country of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country_code` LowCardinality(String) COMMENT 'Country code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_continent_code` LowCardinality(String) COMMENT 'Continent code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_longitude` Nullable(Float64) COMMENT 'Longitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_latitude` Nullable(Float64) COMMENT 'Latitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_number` Nullable(UInt32) COMMENT 'Autonomous system number of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_organization` Nullable(String) COMMENT 'Autonomous system organization of the client that generated the event' CODEC(ZSTD(1)),
`meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name'
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time)
PARTITION BY (meta_network_name, toYYYYMM(event_date_time))
ORDER BY (meta_network_name, event_date_time, meta_client_name, peer_id_unique_key, topic_fork_digest_value, topic_name)
COMMENT 'Contains the details of the JOIN events from the libp2p client.';

CREATE TABLE IF NOT EXISTS default.libp2p_leave_local ON CLUSTER '{cluster}'
(
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`event_date_time` DateTime64(3) COMMENT 'Timestamp of the event' CODEC(DoubleDelta, ZSTD(1)),
`topic_layer` LowCardinality(String) COMMENT 'Layer of the topic',
`topic_fork_digest_value` LowCardinality(String) COMMENT 'Fork digest value of the topic',
`topic_name` LowCardinality(String) COMMENT 'Name of the topic',
`topic_encoding` LowCardinality(String) COMMENT 'Encoding of the topic',
`peer_id_unique_key` Int64 COMMENT 'Unique key associated with the identifier of the peer that left the topic',
`meta_client_name` LowCardinality(String) COMMENT 'Name of the client that generated the event',
`meta_client_version` LowCardinality(String) COMMENT 'Version of the client that generated the event',
`meta_client_implementation` LowCardinality(String) COMMENT 'Implementation of the client that generated the event',
`meta_client_os` LowCardinality(String) COMMENT 'Operating system of the client that generated the event',
`meta_client_ip` Nullable(IPv6) COMMENT 'IP address of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_city` LowCardinality(String) COMMENT 'City of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country` LowCardinality(String) COMMENT 'Country of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country_code` LowCardinality(String) COMMENT 'Country code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_continent_code` LowCardinality(String) COMMENT 'Continent code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_longitude` Nullable(Float64) COMMENT 'Longitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_latitude` Nullable(Float64) COMMENT 'Latitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_number` Nullable(UInt32) COMMENT 'Autonomous system number of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_organization` Nullable(String) COMMENT 'Autonomous system organization of the client that generated the event' CODEC(ZSTD(1)),
`meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name'
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time)
PARTITION BY (meta_network_name, toYYYYMM(event_date_time))
ORDER BY (meta_network_name, event_date_time, meta_client_name, peer_id_unique_key, topic_fork_digest_value, topic_name)
COMMENT 'Contains the details of the LEAVE events from the libp2p client.';

CREATE TABLE IF NOT EXISTS default.libp2p_join ON CLUSTER '{cluster}'
AS default.libp2p_join_local
ENGINE = Distributed('{cluster}', 'default', 'libp2p_join_local', cityHash64(event_date_time, meta_network_name, meta_client_name, peer_id_unique_key, topic_fork_digest_value, topic_name));

CREATE TABLE IF NOT EXISTS default.libp2p_leave ON CLUSTER '{cluster}'
AS default.libp2p_leave_local
ENGINE = Distributed('{cluster}', 'default', 'libp2p_leave_local', cityHash64(event_date_time, meta_network_name, meta_client_name, peer_id_unique_key, topic_fork_digest_value, topic_name));
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
DROP TABLE IF EXISTS default.libp2p_join ON CLUSTER '{cluster}' SYNC;
DROP TABLE IF EXISTS default.libp2p_join_local ON CLUSTER '{cluster}' SYNC;
DROP TABLE IF EXISTS default.libp2p_leave ON CLUSTER '{cluster}' SYNC;
DROP TABLE IF EXISTS default.libp2p_leave_local ON CLUSTER '{cluster}' SYNC;

CREATE TABLE IF NOT EXISTS default.libp2p_join_local ON CLUSTER '{cluster}'
(
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`event_date_time` DateTime64(3) COMMENT 'Timestamp of the event' CODEC(DoubleDelta, ZSTD(1)),
`topic_layer` LowCardinality(String) COMMENT 'Layer of the topic',
`topic_fork_digest_value` LowCardinality(String) COMMENT 'Fork digest value of the topic',
`topic_name` LowCardinality(String) COMMENT 'Name of the topic',
`topic_encoding` LowCardinality(String) COMMENT 'Encoding of the topic',
`local_peer_id_unique_key` Int64 COMMENT 'Unique key derived from the libp2p peer.ID of the local host that joined the topic',
`meta_client_name` LowCardinality(String) COMMENT 'Name of the client that generated the event',
`meta_client_version` LowCardinality(String) COMMENT 'Version of the client that generated the event',
`meta_client_implementation` LowCardinality(String) COMMENT 'Implementation of the client that generated the event',
`meta_client_os` LowCardinality(String) COMMENT 'Operating system of the client that generated the event',
`meta_client_ip` Nullable(IPv6) COMMENT 'IP address of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_city` LowCardinality(String) COMMENT 'City of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country` LowCardinality(String) COMMENT 'Country of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country_code` LowCardinality(String) COMMENT 'Country code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_continent_code` LowCardinality(String) COMMENT 'Continent code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_longitude` Nullable(Float64) COMMENT 'Longitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_latitude` Nullable(Float64) COMMENT 'Latitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_number` Nullable(UInt32) COMMENT 'Autonomous system number of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_organization` Nullable(String) COMMENT 'Autonomous system organization of the client that generated the event' CODEC(ZSTD(1)),
`meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name'
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time)
PARTITION BY (meta_network_name, toYYYYMM(event_date_time))
ORDER BY (meta_network_name, event_date_time, meta_client_name, local_peer_id_unique_key, topic_fork_digest_value, topic_name)
COMMENT 'Contains the details of the JOIN events from the libp2p client.';

CREATE TABLE IF NOT EXISTS default.libp2p_leave_local ON CLUSTER '{cluster}'
(
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`event_date_time` DateTime64(3) COMMENT 'Timestamp of the event' CODEC(DoubleDelta, ZSTD(1)),
`topic_layer` LowCardinality(String) COMMENT 'Layer of the topic',
`topic_fork_digest_value` LowCardinality(String) COMMENT 'Fork digest value of the topic',
`topic_name` LowCardinality(String) COMMENT 'Name of the topic',
`topic_encoding` LowCardinality(String) COMMENT 'Encoding of the topic',
`local_peer_id_unique_key` Int64 COMMENT 'Unique key derived from the libp2p peer.ID of the local host that left the topic',
`meta_client_name` LowCardinality(String) COMMENT 'Name of the client that generated the event',
`meta_client_version` LowCardinality(String) COMMENT 'Version of the client that generated the event',
`meta_client_implementation` LowCardinality(String) COMMENT 'Implementation of the client that generated the event',
`meta_client_os` LowCardinality(String) COMMENT 'Operating system of the client that generated the event',
`meta_client_ip` Nullable(IPv6) COMMENT 'IP address of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_city` LowCardinality(String) COMMENT 'City of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country` LowCardinality(String) COMMENT 'Country of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country_code` LowCardinality(String) COMMENT 'Country code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_continent_code` LowCardinality(String) COMMENT 'Continent code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_longitude` Nullable(Float64) COMMENT 'Longitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_latitude` Nullable(Float64) COMMENT 'Latitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_number` Nullable(UInt32) COMMENT 'Autonomous system number of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_organization` Nullable(String) COMMENT 'Autonomous system organization of the client that generated the event' CODEC(ZSTD(1)),
`meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name'
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time)
PARTITION BY (meta_network_name, toYYYYMM(event_date_time))
ORDER BY (meta_network_name, event_date_time, meta_client_name, local_peer_id_unique_key, topic_fork_digest_value, topic_name)
COMMENT 'Contains the details of the LEAVE events from the libp2p client.';

CREATE TABLE IF NOT EXISTS default.libp2p_join ON CLUSTER '{cluster}'
AS default.libp2p_join_local
ENGINE = Distributed('{cluster}', 'default', 'libp2p_join_local', cityHash64(event_date_time, meta_network_name, meta_client_name, local_peer_id_unique_key, topic_fork_digest_value, topic_name));

CREATE TABLE IF NOT EXISTS default.libp2p_leave ON CLUSTER '{cluster}'
AS default.libp2p_leave_local
ENGINE = Distributed('{cluster}', 'default', 'libp2p_leave_local', cityHash64(event_date_time, meta_network_name, meta_client_name, local_peer_id_unique_key, topic_fork_digest_value, topic_name));
8 changes: 4 additions & 4 deletions pkg/clickhouse/route/libp2p/libp2p_join.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 4 additions & 12 deletions pkg/clickhouse/route/libp2p/libp2p_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,8 @@ func (b *libp2pJoinBatch) FlattenTo(
}

func (b *libp2pJoinBatch) validate(event *xatu.DecoratedEvent) error {
peerID := peerIDFromMetadata(event, func(c *xatu.ClientMeta) peerIDMetadataProvider {
return c.GetLibp2PTraceJoin()
})

if peerID == "" {
return fmt.Errorf("nil PeerId: %w", route.ErrInvalidEvent)
if event.GetMeta().GetClient().GetLibp2PTraceJoin().GetLocalPeerId() == "" {
return fmt.Errorf("nil LocalPeerId: %w", route.ErrInvalidEvent)
}

return nil
Expand Down Expand Up @@ -92,11 +88,7 @@ func (b *libp2pJoinBatch) appendPayload(
b.TopicEncoding.Append("")
}

// Compute peer_id_unique_key from client metadata peer ID.
peerID := peerIDFromMetadata(event, func(c *xatu.ClientMeta) peerIDMetadataProvider {
return c.GetLibp2PTraceJoin()
})

localPeerID := event.GetMeta().GetClient().GetLibp2PTraceJoin().GetLocalPeerId()
networkName := event.GetMeta().GetClient().GetEthereum().GetNetwork().GetName()
b.PeerIDUniqueKey.Append(computePeerIDUniqueKey(peerID, networkName))
b.LocalPeerIDUniqueKey.Append(computePeerIDUniqueKey(localPeerID, networkName))
}
22 changes: 14 additions & 8 deletions pkg/clickhouse/route/libp2p/libp2p_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@ package libp2p
import (
"testing"

"github.com/ethpandaops/xatu/pkg/clickhouse/route"
"github.com/ethpandaops/xatu/pkg/clickhouse/route/testfixture"
libp2ppb "github.com/ethpandaops/xatu/pkg/proto/libp2p"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"google.golang.org/protobuf/types/known/wrapperspb"
)

func TestSnapshot_libp2p_join(t *testing.T) {
const testTopic = "/eth2/bba4da96/beacon_block/ssz_snappy"
const (
testPeerID = "16Uiu2HAmLocalHost"
testNetwork = "mainnet"
testTopic = "/eth2/bba4da96/beacon_block/ssz_snappy"
)

expectedPeerIDKey := route.SeaHashInt64(testPeerID + testNetwork)

testfixture.AssertSnapshot(t, newlibp2pJoinBatch(), &xatu.DecoratedEvent{
Event: &xatu.Event{
Expand All @@ -21,9 +28,7 @@ func TestSnapshot_libp2p_join(t *testing.T) {
Meta: testfixture.MetaWithAdditional(&xatu.ClientMeta{
AdditionalData: &xatu.ClientMeta_Libp2PTraceJoin{
Libp2PTraceJoin: &xatu.ClientMeta_AdditionalLibP2PTraceJoinData{
Metadata: &libp2ppb.TraceEventMetadata{
PeerId: wrapperspb.String("16Uiu2HAmPeer1"),
},
LocalPeerId: testPeerID,
},
},
}),
Expand All @@ -33,9 +38,10 @@ func TestSnapshot_libp2p_join(t *testing.T) {
},
},
}, 1, map[string]any{
"topic_layer": "eth2",
"topic_fork_digest_value": "bba4da96",
"topic_name": "beacon_block",
"topic_encoding": "ssz_snappy",
"local_peer_id_unique_key": expectedPeerIDKey,
"topic_layer": "eth2",
"topic_fork_digest_value": "bba4da96",
"topic_name": "beacon_block",
"topic_encoding": "ssz_snappy",
})
}
8 changes: 4 additions & 4 deletions pkg/clickhouse/route/libp2p/libp2p_leave.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading