Skip to content
Closed
514 changes: 502 additions & 12 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ version = "0.1.0"
default = []
test-utils = ["dep:ctor", "dep:testcontainers", "dep:testcontainers-modules"]

# Sink features.
sink-meilisearch = ["dep:meilisearch-sdk"]

[dependencies]
anyhow = { version = "1.0.98", default-features = false, features = ["std"] }
chrono = { version = "0.4.41", default-features = false }
Expand Down Expand Up @@ -54,9 +57,12 @@ tikv-jemallocator = { version = "0.6.1", default-features = false, features = [
] }


# Optional sink dependencies.
meilisearch-sdk = { version = "0.28", optional = true }

ctor = { version = "0.4", optional = true }
testcontainers = { version = "0.23", optional = true, features = ["blocking"] }
testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "blocking"] }
testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "meilisearch", "blocking"] }

[dev-dependencies]
temp-env = "0.3"
Expand Down
11 changes: 11 additions & 0 deletions src/config/sink.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
//! Sink configuration types.
//!
//! Defines configuration variants for different event destinations.

use serde::Deserialize;

#[cfg(feature = "sink-meilisearch")]
use crate::sink::meilisearch::MeilisearchSinkConfig;

/// Sink destination configuration.
///
/// Determines where replicated events are sent.
Expand All @@ -8,4 +15,8 @@ use serde::Deserialize;
pub enum SinkConfig {
/// In-memory sink for testing and development.
Memory,

/// Meilisearch sink for search engine indexing.
#[cfg(feature = "sink-meilisearch")]
Meilisearch(MeilisearchSinkConfig),
}
20 changes: 20 additions & 0 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> {
// Create sink based on configuration.
let sink = match &config.sink {
SinkConfig::Memory => AnySink::Memory(MemorySink::new()),

#[cfg(feature = "sink-meilisearch")]
SinkConfig::Meilisearch(cfg) => {
use crate::sink::meilisearch::MeilisearchSink;
let s = MeilisearchSink::new(cfg.clone()).await.map_err(|e| {
etl::etl_error!(
etl::error::ErrorKind::InvalidData,
"Failed to create Meilisearch sink",
e.to_string()
)
})?;
AnySink::Meilisearch(s)
}
};

// Create PgStream as an ETL destination
Expand Down Expand Up @@ -122,6 +135,13 @@ fn log_sink_config(config: &SinkConfig) {
SinkConfig::Memory => {
debug!("using memory sink");
}

#[cfg(feature = "sink-meilisearch")]
SinkConfig::Meilisearch(cfg) => {
use crate::sink::meilisearch::MeilisearchSinkConfigWithoutSecrets;
let safe_cfg: MeilisearchSinkConfigWithoutSecrets = cfg.into();
debug!(config = ?safe_cfg, "using meilisearch sink");
}
}
}

Expand Down
244 changes: 244 additions & 0 deletions src/sink/meilisearch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
//! Meilisearch sink for indexing events as searchable documents.
//!
//! Indexes each event's payload as a JSON document in the configured Meilisearch index.
//! The sink uses bulk document addition for efficient batch operations.
//!
//! # Dynamic Routing
//!
//! The target index can come from event metadata or sink config:
//!
//! ```sql
//! -- Via metadata_extensions (dynamic per-event)
//! metadata_extensions = '[{"json_path": "index", "expression": "new.index_name"}]'
//!
//! -- Via static metadata
//! metadata = '{"index": "products"}'
//! ```
//!
//! Priority: event.metadata["index"] > config.index
//!
//! # Primary Key
//!
//! Meilisearch requires each document to have a primary key. Configure the primary
//! key field name in your Meilisearch index settings, or use `payload_extensions`
//! to add/transform the primary key field before sending.

use std::collections::HashMap;
use std::sync::Arc;

use etl::error::EtlResult;
use futures::future::try_join_all;
use meilisearch_sdk::client::Client;
use serde::{Deserialize, Serialize};

use crate::sink::Sink;
use crate::types::TriggeredEvent;

/// Configuration for the Meilisearch sink.
///
/// This intentionally does not implement [`Serialize`] to avoid accidentally
/// leaking sensitive information in serialized forms.
#[derive(Clone, Debug, Deserialize)]
pub struct MeilisearchSinkConfig {
/// Meilisearch URL (e.g., "http://localhost:7700").
pub url: String,

/// Index name for document storage.
/// Can be overridden per-event via metadata["index"].
#[serde(default)]
pub index: Option<String>,

/// Optional API key for authentication.
#[serde(default)]
pub api_key: Option<String>,
}

/// Configuration for the Meilisearch sink without sensitive data.
///
/// Safe to serialize and log. Use this for debugging and metrics.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MeilisearchSinkConfigWithoutSecrets {
/// Meilisearch URL.
pub url: String,

/// Index name for document storage.
pub index: Option<String>,

/// Whether an API key is configured.
pub has_api_key: bool,
}

impl From<MeilisearchSinkConfig> for MeilisearchSinkConfigWithoutSecrets {
fn from(config: MeilisearchSinkConfig) -> Self {
Self {
url: config.url,
index: config.index,
has_api_key: config.api_key.is_some(),
}
}
}

impl From<&MeilisearchSinkConfig> for MeilisearchSinkConfigWithoutSecrets {
fn from(config: &MeilisearchSinkConfig) -> Self {
Self {
url: config.url.clone(),
index: config.index.clone(),
has_api_key: config.api_key.is_some(),
}
}
}

/// Sink that indexes events in Meilisearch.
///
/// Events are serialized as JSON documents and batch indexed.
/// The sink handles connection management and task waiting.
#[derive(Clone)]
pub struct MeilisearchSink {
/// Meilisearch client.
client: Arc<Client>,

/// Default index name from config (can be overridden per-event).
index: Option<String>,
}

impl MeilisearchSink {
/// Creates a new Meilisearch sink from configuration.
///
/// # Errors
///
/// Returns an error if the client cannot be created.
pub async fn new(
config: MeilisearchSinkConfig,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let client = Client::new(&config.url, config.api_key.as_deref())?;

Ok(Self {
client: Arc::new(client),
index: config.index,
})
}

/// Resolves the index name for an event.
///
/// Priority: event.metadata["index"] > config.index
fn resolve_index<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> {
// Check event metadata first.
if let Some(ref metadata) = event.metadata {
if let Some(index) = metadata.get("index").and_then(|v| v.as_str()) {
return Some(index);
}
}
// Fall back to config.
self.index.as_deref()
}
}

impl Sink for MeilisearchSink {
fn name() -> &'static str {
"meilisearch"
}

async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
if events.is_empty() {
return Ok(());
}

// Group events by target index (supports per-event routing).
let mut index_documents: HashMap<String, Vec<serde_json::Value>> = HashMap::new();

for event in events {
let index_name = self.resolve_index(&event).ok_or_else(|| {
etl::etl_error!(
etl::error::ErrorKind::ConfigError,
"No index in config or event metadata"
)
})?;

index_documents
.entry(index_name.to_string())
.or_default()
.push(event.payload);
}

// Index documents to all target indexes concurrently.
try_join_all(index_documents.into_iter().map(|(index_name, documents)| {
let client = self.client.clone();
async move {
let index = client.index(&index_name);

let task = index.add_documents(&documents, None).await.map_err(|e| {
etl::etl_error!(
etl::error::ErrorKind::DestinationError,
"Failed to add documents to Meilisearch",
e.to_string()
)
})?;

// Wait for the task to complete and check status.
let completed_task = task
.wait_for_completion(&client, None, None)
.await
.map_err(|e| {
etl::etl_error!(
etl::error::ErrorKind::DestinationError,
"Failed to wait for Meilisearch task",
e.to_string()
)
})?;

// Check if task failed.
if completed_task.is_failure() {
let error = completed_task.unwrap_failure();
return Err(etl::etl_error!(
etl::error::ErrorKind::DestinationError,
"Meilisearch task failed",
error.error_message
));
}

Ok::<_, etl::error::EtlError>(())
}
}))
.await?;

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_sink_name() {
assert_eq!(MeilisearchSink::name(), "meilisearch");
}

#[test]
fn test_config_without_secrets() {
let config = MeilisearchSinkConfig {
url: "http://localhost:7700".to_string(),
index: Some("events".to_string()),
api_key: Some("secret-api-key".to_string()),
};

let without_secrets: MeilisearchSinkConfigWithoutSecrets = (&config).into();

assert_eq!(without_secrets.url, "http://localhost:7700");
assert_eq!(without_secrets.index, Some("events".to_string()));
assert!(without_secrets.has_api_key);
}

#[test]
fn test_config_without_secrets_no_api_key() {
let config = MeilisearchSinkConfig {
url: "http://localhost:7700".to_string(),
index: Some("events".to_string()),
api_key: None,
};

let without_secrets: MeilisearchSinkConfigWithoutSecrets = (&config).into();

assert!(!without_secrets.has_api_key);
}
}
21 changes: 19 additions & 2 deletions src/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
//! Sink implementations for event publishing.
//!
//! Provides destinations for replicated PostgreSQL events.

mod base;
pub mod memory;

#[cfg(feature = "sink-meilisearch")]
pub mod meilisearch;

pub use base::Sink;

use etl::error::EtlResult;
use memory::MemorySink;

#[cfg(feature = "sink-meilisearch")]
use meilisearch::MeilisearchSink;

use crate::types::TriggeredEvent;

/// Wrapper enum for all supported sink types.
///
/// Enables runtime sink selection while maintaining static dispatch.
/// Each variant wraps a concrete sink implementation gated by its feature flag.
/// Enables runtime sink selection while maintaining static dispatch
/// for better performance. Each variant wraps a concrete sink type.
#[derive(Clone)]
pub enum AnySink {
/// In-memory sink for testing and development.
Memory(MemorySink),

#[cfg(feature = "sink-meilisearch")]
/// Meilisearch sink for search engine indexing.
Meilisearch(MeilisearchSink),
}

impl Sink for AnySink {
Expand All @@ -26,6 +40,9 @@ impl Sink for AnySink {
async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
match self {
AnySink::Memory(sink) => sink.publish_events(events).await,

#[cfg(feature = "sink-meilisearch")]
AnySink::Meilisearch(sink) => sink.publish_events(events).await,
}
}
}
Loading