Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ name = "postgres-stream"
version = "0.1.0"

[features]
default = []
test-utils = ["dep:ctor", "dep:testcontainers", "dep:testcontainers-modules"]
default = []
sink-redis-strings = ["dep:redis"]
test-utils = ["dep:ctor", "dep:testcontainers", "dep:testcontainers-modules"]

[dependencies]
anyhow = { version = "1.0.98", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -46,6 +47,9 @@ etl = { git = "https://github.com/supabase/etl", rev = "dd2987a55efc16a
etl-postgres = { git = "https://github.com/supabase/etl", rev = "dd2987a55efc16aeb4402e4b06853c7731a6155a" }
uuid = { version = "1.19.0", default-features = false, features = ["v4"] }

# Optional sink dependencies.
redis = { version = "0.27", default-features = false, features = ["tokio-comp", "connection-manager"], optional = true }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemalloc-ctl = { version = "0.6.0", default-features = false, features = ["stats"] }
tikv-jemallocator = { version = "0.6.1", default-features = false, features = [
Expand All @@ -56,7 +60,7 @@ tikv-jemallocator = { version = "0.6.1", default-features = false, features = [

ctor = { version = "0.4", optional = true }
testcontainers = { version = "0.23", optional = true, features = ["blocking"] }
testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "blocking"] }
testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "redis", "blocking"] }

[dev-dependencies]
temp-env = "0.3"
Expand Down
8 changes: 8 additions & 0 deletions src/config/sink.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use serde::Deserialize;

#[cfg(feature = "sink-redis-strings")]
use crate::sink::redis_strings::RedisStringsSinkConfig;

/// Sink destination configuration.
///
/// Determines where replicated events are sent.
Expand All @@ -8,4 +11,9 @@ use serde::Deserialize;
pub enum SinkConfig {
/// In-memory sink for testing and development.
Memory,

/// Redis strings sink for key-value storage.
#[cfg(feature = "sink-redis-strings")]
#[serde(rename = "redis-strings")]
RedisStrings(RedisStringsSinkConfig),
}
18 changes: 18 additions & 0 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> {
// Create sink based on configuration.
let sink = match &config.sink {
SinkConfig::Memory => AnySink::Memory(MemorySink::new()),

#[cfg(feature = "sink-redis-strings")]
SinkConfig::RedisStrings(cfg) => {
use crate::sink::redis_strings::RedisStringsSink;
let s = RedisStringsSink::new(cfg.clone()).await.map_err(|e| {
etl::etl_error!(
etl::error::ErrorKind::InvalidData,
"Failed to create Redis Strings sink",
e.to_string()
)
})?;
AnySink::RedisStrings(s)
}
};

// Create PgStream as an ETL destination
Expand Down Expand Up @@ -122,6 +135,11 @@ fn log_sink_config(config: &SinkConfig) {
SinkConfig::Memory => {
debug!("using memory sink");
}

#[cfg(feature = "sink-redis-strings")]
SinkConfig::RedisStrings(_cfg) => {
debug!("using redis-strings sink");
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
mod base;
pub mod memory;

#[cfg(feature = "sink-redis-strings")]
pub mod redis_strings;

pub use base::Sink;

use etl::error::EtlResult;
use memory::MemorySink;

#[cfg(feature = "sink-redis-strings")]
use redis_strings::RedisStringsSink;

use crate::types::TriggeredEvent;

/// Wrapper enum for all supported sink types.
Expand All @@ -16,6 +22,10 @@ use crate::types::TriggeredEvent;
pub enum AnySink {
/// In-memory sink for testing and development.
Memory(MemorySink),

/// Redis strings sink for key-value storage.
#[cfg(feature = "sink-redis-strings")]
RedisStrings(RedisStringsSink),
}

impl Sink for AnySink {
Expand All @@ -26,6 +36,9 @@ impl Sink for AnySink {
async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
match self {
AnySink::Memory(sink) => sink.publish_events(events).await,

#[cfg(feature = "sink-redis-strings")]
AnySink::RedisStrings(sink) => sink.publish_events(events).await,
}
}
}
153 changes: 153 additions & 0 deletions src/sink/redis_strings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
//! Redis Strings sink for publishing events as key-value pairs.
//!
//! Stores each event's payload as a Redis string. The key is determined by:
//! 1. `key` in event metadata (from subscription's metadata/metadata_extensions)
//! 2. Fallback to event ID (optionally with key_prefix from config)
//!
//! # Dynamic Routing
//!
//! The Redis key can be configured per-event using metadata_extensions:
//!
//! ```sql
//! metadata_extensions = '[
//! {"json_path": "key", "expression": "''user:'' || (payload->''user_id'')::text"}
//! ]'
//! ```

use etl::error::EtlResult;
use redis::aio::ConnectionManager;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::sink::Sink;
use crate::types::TriggeredEvent;

/// Configuration for the Redis Strings sink.
///
/// This intentionally does not implement [`Serialize`] to avoid accidentally
/// leaking secrets (URL credentials) in serialized forms.
#[derive(Clone, Debug, Deserialize)]
pub struct RedisStringsSinkConfig {
/// Redis connection URL (e.g., "redis://localhost:6379").
/// Contains credentials and should be treated as sensitive.
pub url: String,

/// Optional prefix for all keys.
#[serde(default)]
pub key_prefix: Option<String>,
}

/// Configuration for the Redis Strings sink without sensitive data.
///
/// Safe to serialize and log. Use this for debugging and metrics.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RedisStringsSinkConfigWithoutSecrets {
/// Optional prefix for all keys.
pub key_prefix: Option<String>,
}

impl From<RedisStringsSinkConfig> for RedisStringsSinkConfigWithoutSecrets {
fn from(config: RedisStringsSinkConfig) -> Self {
Self {
key_prefix: config.key_prefix,
}
}
}

impl From<&RedisStringsSinkConfig> for RedisStringsSinkConfigWithoutSecrets {
fn from(config: &RedisStringsSinkConfig) -> Self {
Self {
key_prefix: config.key_prefix.clone(),
}
}
}

/// Sink that stores events as Redis string key-value pairs.
///
/// Each event's payload is stored with a dynamic or default key.
/// The sink uses a connection manager for automatic reconnection handling.
#[derive(Clone)]
pub struct RedisStringsSink {
/// Shared Redis connection manager.
connection: Arc<Mutex<ConnectionManager>>,

/// Optional prefix prepended to all keys.
key_prefix: Option<String>,
}

impl RedisStringsSink {
/// Creates a new Redis Strings sink from configuration.
///
/// # Errors
///
/// Returns an error if the Redis connection cannot be established.
pub async fn new(
config: RedisStringsSinkConfig,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let client = redis::Client::open(config.url)?;
let connection = ConnectionManager::new(client).await?;

Ok(Self {
connection: Arc::new(Mutex::new(connection)),
key_prefix: config.key_prefix,
})
}

/// Resolves the Redis key for an event from metadata or default (event ID).
fn resolve_key(&self, event: &TriggeredEvent) -> String {
// First check event metadata for dynamic key.
if let Some(ref metadata) = event.metadata {
if let Some(key) = metadata.get("key").and_then(|v| v.as_str()) {
return key.to_string();
}
}
// Fall back to event ID with optional prefix.
match &self.key_prefix {
Some(prefix) => format!("{}:{}", prefix, event.id.id),
None => event.id.id.clone(),
}
}
}

impl Sink for RedisStringsSink {
fn name() -> &'static str {
"redis-strings"
}

async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
if events.is_empty() {
return Ok(());
}

let mut conn = self.connection.lock().await;

// Use pipeline for batch efficiency.
let mut pipe = redis::pipe();

for event in events {
let key = self.resolve_key(&event);
pipe.set(&key, event.payload.to_string());
}

pipe.query_async::<()>(&mut *conn).await.map_err(|e| {
etl::etl_error!(
etl::error::ErrorKind::DestinationError,
"Failed to publish events to Redis",
e.to_string()
)
})?;

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_sink_name() {
assert_eq!(RedisStringsSink::name(), "redis-strings");
}
}
Loading