diff --git a/Cargo.lock b/Cargo.lock index 780a4ae..52d0cb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -177,7 +177,7 @@ dependencies = [ "serde_json", "serde_repr", "serde_urlencoded", - "thiserror", + "thiserror 2.0.17", "tokio", "tokio-util", "tower-service", @@ -232,6 +232,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.42" @@ -268,7 +274,7 @@ version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68578f196d2a33ff61b27fae256c3164f65e36382648e30666dde05b8cc9dfdf" dependencies = [ - "nom", + "nom 7.1.3", "pathdiff", "serde", "serde_json", @@ -281,6 +287,15 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.10.1" @@ -321,6 +336,21 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "critical-section" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" + +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-epoch" version = "0.9.18" @@ -406,6 +436,12 @@ dependencies = [ "syn", ] +[[package]] +name = "data-encoding" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" + [[package]] name = "der" version = "0.7.10" @@ -525,6 +561,18 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "enum-as-inner" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -590,7 +638,7 @@ dependencies = [ "secrecy", "serde", "sqlx", - "thiserror", + "thiserror 2.0.17", "tokio-postgres", ] @@ -606,7 +654,7 @@ dependencies = [ "serde", "serde_json", "sqlx", - "thiserror", + "thiserror 2.0.17", "tokio", "tokio-postgres", "tracing", @@ -831,8 +879,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -842,9 +892,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi", "wasip2", + "wasm-bindgen", ] [[package]] @@ -932,6 +984,52 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hickory-proto" +version = "0.25.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8a6fe56c0038198998a6f217ca4e7ef3a5e51f46163bd6dd60b5c71ca6c6502" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna", + "ipnet", + "once_cell", + "rand 0.9.2", + "ring", + "thiserror 2.0.17", + "tinyvec", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "hickory-resolver" +version = "0.25.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc62a9a99b0bfb44d2ab95a7208ac952d31060efc16241c87eaf36406fecf87a" +dependencies = [ + "cfg-if", + "futures-util", + "hickory-proto", + "ipconfig", + "moka", + "once_cell", + "parking_lot", + "rand 0.9.2", + "resolv-conf", + "smallvec", + "thiserror 2.0.17", + "tokio", + "tracing", +] + [[package]] name = "hkdf" version = "0.12.4" @@ -1052,10 +1150,12 @@ dependencies = [ "hyper", "hyper-util", "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", "tower-service", + "webpki-roots 1.0.4", ] [[package]] @@ -1064,6 +1164,7 @@ version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" dependencies = [ + "base64 0.22.1", "bytes", "futures-channel", "futures-core", @@ -1071,7 +1172,9 @@ dependencies = [ "http", "http-body", "hyper", + "ipnet", "libc", + "percent-encoding", "pin-project-lite", "socket2 0.6.1", "tokio", @@ -1249,12 +1352,43 @@ dependencies = [ "serde_core", ] +[[package]] +name = "ipconfig" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" +dependencies = [ + "socket2 0.5.10", + "widestring", + "windows-sys 0.48.0", + "winreg", +] + [[package]] name = "ipnet" version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" +[[package]] +name = "iri-string" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" +dependencies = [ + "memchr", + "serde", +] + +[[package]] +name = "iso8601" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1082f0c48f143442a1ac6122f67e360ceee130b967af4d50996e5154a45df46" +dependencies = [ + "nom 8.0.0", +] + [[package]] name = "itoa" version = "1.0.15" @@ -1281,6 +1415,19 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonwebtoken" +version = "9.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +dependencies = [ + "base64 0.22.1", + "js-sys", + "ring", + "serde", + "serde_json", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -1350,6 +1497,12 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "matchers" version = "0.2.0" @@ -1369,6 +1522,46 @@ dependencies = [ "digest", ] +[[package]] +name = "meilisearch-index-setting-macro" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "420f67f5943a0236eea7f199720cc465e806c48978d9b0fdc1fb62eceaee7556" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "structmeta", + "syn", +] + +[[package]] +name = "meilisearch-sdk" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2325355c73c96667178c09675389cfa7afc2382d5aa0e0d34d0cf29793d89090" +dependencies = [ + "async-trait", + "bytes", + "either", + "futures", + "futures-io", + "iso8601", + "jsonwebtoken", + "log", + "meilisearch-index-setting-macro", + "pin-project-lite", + "reqwest", + "serde", + "serde_json", + "thiserror 1.0.69", + "time", + "uuid", + "wasm-bindgen-futures", + "web-sys", + "yaup", +] + [[package]] name = "memchr" version = "2.7.6" @@ -1400,7 +1593,7 @@ dependencies = [ "metrics", "metrics-util", "quanta", - "thiserror", + "thiserror 2.0.17", "tokio", "tracing", ] @@ -1421,6 +1614,12 @@ dependencies = [ "sketches-ddsketch", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1438,6 +1637,23 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "moka" +version = "0.12.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3dec6bd31b08944e08b58fd99373893a6c17054d6f3ea5006cc894f4f4eee2a" +dependencies = [ + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "equivalent", + "parking_lot", + "portable-atomic", + "smallvec", + "tagptr", + "uuid", +] + [[package]] name = "nom" version = "7.1.3" @@ -1448,6 +1664,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nom" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405" +dependencies = [ + "memchr", +] + [[package]] name = "num-bigint-dig" version = "0.8.6" @@ -1505,6 +1730,10 @@ name = "once_cell" version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +dependencies = [ + "critical-section", + "portable-atomic", +] [[package]] name = "openssl-probe" @@ -1733,6 +1962,7 @@ dependencies = [ "etl", "etl-postgres", "futures", + "meilisearch-sdk", "metrics", "metrics-exporter-prometheus", "pin-project-lite", @@ -1748,7 +1978,7 @@ dependencies = [ "tempfile", "testcontainers", "testcontainers-modules", - "thiserror", + "thiserror 2.0.17", "tikv-jemalloc-ctl", "tikv-jemallocator", "tokio", @@ -1822,6 +2052,61 @@ dependencies = [ "winapi", ] +[[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2 0.6.1", + "thiserror 2.0.17", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +dependencies = [ + "bytes", + "getrandom 0.3.4", + "lru-slab", + "rand 0.9.2", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "slab", + "thiserror 2.0.17", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2 0.6.1", + "tracing", + "windows-sys 0.60.2", +] + [[package]] name = "quote" version = "1.0.42" @@ -1981,6 +2266,59 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +[[package]] +name = "reqwest" +version = "0.12.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +dependencies = [ + "base64 0.22.1", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "hickory-resolver", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-native-certs", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tokio-util", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", + "webpki-roots 1.0.4", +] + +[[package]] +name = "resolv-conf" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e061d1b48cb8d38042de4ae0a7a6401009d6143dc80d2e2d6f31f0bdd6470c7" + [[package]] name = "ring" version = "0.17.14" @@ -2015,6 +2353,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustix" version = "1.1.2" @@ -2071,6 +2415,7 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "708c0f9d5f54ba0272468c1d306a52c495b31fa155e91bc25371e6df7996908c" dependencies = [ + "web-time", "zeroize", ] @@ -2444,7 +2789,7 @@ dependencies = [ "serde_json", "sha2", "smallvec", - "thiserror", + "thiserror 2.0.17", "tokio", "tokio-stream", "tracing", @@ -2529,7 +2874,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 2.0.17", "tracing", "uuid", "whoami", @@ -2568,7 +2913,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 2.0.17", "tracing", "uuid", "whoami", @@ -2594,7 +2939,7 @@ dependencies = [ "serde", "serde_urlencoded", "sqlx-core", - "thiserror", + "thiserror 2.0.17", "tracing", "url", "uuid", @@ -2663,6 +3008,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] + [[package]] name = "synstructure" version = "0.13.2" @@ -2674,6 +3028,12 @@ dependencies = [ "syn", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "temp-env" version = "0.3.6" @@ -2714,10 +3074,11 @@ dependencies = [ "memchr", "parse-display", "pin-project-lite", + "reqwest", "serde", "serde_json", "serde_with", - "thiserror", + "thiserror 2.0.17", "tokio", "tokio-stream", "tokio-tar", @@ -2731,16 +3092,37 @@ version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d43ed4e8f58424c3a2c6c56dbea6643c3c23e8666a34df13c54f0a184e6c707" dependencies = [ + "parse-display", "testcontainers", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.17", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -2951,6 +3333,45 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-http" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" +dependencies = [ + "bitflags 2.10.0", + "bytes", + "futures-util", + "http", + "http-body", + "iri-string", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -3043,6 +3464,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7df058c713841ad818f1dc5d3fd88063241cc61f49f5fbea4b951e8cf5a8d71d" +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + [[package]] name = "unsafe-libyaml" version = "0.2.11" @@ -3139,6 +3566,19 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "836d9622d604feee9e5de25ac10e3ea5f2d65b41eac0d9ce72eb5deae707ce7c" +dependencies = [ + "cfg-if", + "js-sys", + "once_cell", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.106" @@ -3171,6 +3611,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.83" @@ -3181,6 +3634,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.26.11" @@ -3210,6 +3673,12 @@ dependencies = [ "web-sys", ] +[[package]] +name = "widestring" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72069c3113ab32ab29e5584db3c6ec55d416895e60715417b5b883a357c3e471" + [[package]] name = "winapi" version = "0.3.9" @@ -3513,6 +3982,16 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "wit-bindgen" version = "0.46.0" @@ -3557,6 +4036,17 @@ dependencies = [ "hashlink 0.8.4", ] +[[package]] +name = "yaup" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0144f1a16a199846cb21024da74edd930b43443463292f536b7110b4855b5c6" +dependencies = [ + "form_urlencoded", + "serde", + "thiserror 1.0.69", +] + [[package]] name = "yoke" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index b53db85..b1a9e35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,9 @@ version = "0.1.0" default = [] test-utils = ["dep:ctor", "dep:testcontainers", "dep:testcontainers-modules"] +# Sink features. +sink-meilisearch = ["dep:meilisearch-sdk"] + [dependencies] anyhow = { version = "1.0.98", default-features = false, features = ["std"] } chrono = { version = "0.4.41", default-features = false } @@ -54,9 +57,12 @@ tikv-jemallocator = { version = "0.6.1", default-features = false, features = [ ] } +# Optional sink dependencies. +meilisearch-sdk = { version = "0.28", optional = true } + ctor = { version = "0.4", optional = true } testcontainers = { version = "0.23", optional = true, features = ["blocking"] } -testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "blocking"] } +testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "meilisearch", "blocking"] } [dev-dependencies] temp-env = "0.3" diff --git a/src/config/sink.rs b/src/config/sink.rs index 3f16033..36b0572 100644 --- a/src/config/sink.rs +++ b/src/config/sink.rs @@ -1,5 +1,12 @@ +//! Sink configuration types. +//! +//! Defines configuration variants for different event destinations. + use serde::Deserialize; +#[cfg(feature = "sink-meilisearch")] +use crate::sink::meilisearch::MeilisearchSinkConfig; + /// Sink destination configuration. /// /// Determines where replicated events are sent. @@ -8,4 +15,8 @@ use serde::Deserialize; pub enum SinkConfig { /// In-memory sink for testing and development. Memory, + + /// Meilisearch sink for search engine indexing. + #[cfg(feature = "sink-meilisearch")] + Meilisearch(MeilisearchSinkConfig), } diff --git a/src/core.rs b/src/core.rs index 1304429..d02418c 100644 --- a/src/core.rs +++ b/src/core.rs @@ -77,6 +77,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> { // Create sink based on configuration. let sink = match &config.sink { SinkConfig::Memory => AnySink::Memory(MemorySink::new()), + + #[cfg(feature = "sink-meilisearch")] + SinkConfig::Meilisearch(cfg) => { + use crate::sink::meilisearch::MeilisearchSink; + let s = MeilisearchSink::new(cfg.clone()).await.map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::InvalidData, + "Failed to create Meilisearch sink", + e.to_string() + ) + })?; + AnySink::Meilisearch(s) + } }; // Create PgStream as an ETL destination @@ -122,6 +135,13 @@ fn log_sink_config(config: &SinkConfig) { SinkConfig::Memory => { debug!("using memory sink"); } + + #[cfg(feature = "sink-meilisearch")] + SinkConfig::Meilisearch(cfg) => { + use crate::sink::meilisearch::MeilisearchSinkConfigWithoutSecrets; + let safe_cfg: MeilisearchSinkConfigWithoutSecrets = cfg.into(); + debug!(config = ?safe_cfg, "using meilisearch sink"); + } } } diff --git a/src/sink/meilisearch.rs b/src/sink/meilisearch.rs new file mode 100644 index 0000000..44a271f --- /dev/null +++ b/src/sink/meilisearch.rs @@ -0,0 +1,244 @@ +//! Meilisearch sink for indexing events as searchable documents. +//! +//! Indexes each event's payload as a JSON document in the configured Meilisearch index. +//! The sink uses bulk document addition for efficient batch operations. +//! +//! # Dynamic Routing +//! +//! The target index can come from event metadata or sink config: +//! +//! ```sql +//! -- Via metadata_extensions (dynamic per-event) +//! metadata_extensions = '[{"json_path": "index", "expression": "new.index_name"}]' +//! +//! -- Via static metadata +//! metadata = '{"index": "products"}' +//! ``` +//! +//! Priority: event.metadata["index"] > config.index +//! +//! # Primary Key +//! +//! Meilisearch requires each document to have a primary key. Configure the primary +//! key field name in your Meilisearch index settings, or use `payload_extensions` +//! to add/transform the primary key field before sending. + +use std::collections::HashMap; +use std::sync::Arc; + +use etl::error::EtlResult; +use futures::future::try_join_all; +use meilisearch_sdk::client::Client; +use serde::{Deserialize, Serialize}; + +use crate::sink::Sink; +use crate::types::TriggeredEvent; + +/// Configuration for the Meilisearch sink. +/// +/// This intentionally does not implement [`Serialize`] to avoid accidentally +/// leaking sensitive information in serialized forms. +#[derive(Clone, Debug, Deserialize)] +pub struct MeilisearchSinkConfig { + /// Meilisearch URL (e.g., "http://localhost:7700"). + pub url: String, + + /// Index name for document storage. + /// Can be overridden per-event via metadata["index"]. + #[serde(default)] + pub index: Option, + + /// Optional API key for authentication. + #[serde(default)] + pub api_key: Option, +} + +/// Configuration for the Meilisearch sink without sensitive data. +/// +/// Safe to serialize and log. Use this for debugging and metrics. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct MeilisearchSinkConfigWithoutSecrets { + /// Meilisearch URL. + pub url: String, + + /// Index name for document storage. + pub index: Option, + + /// Whether an API key is configured. + pub has_api_key: bool, +} + +impl From for MeilisearchSinkConfigWithoutSecrets { + fn from(config: MeilisearchSinkConfig) -> Self { + Self { + url: config.url, + index: config.index, + has_api_key: config.api_key.is_some(), + } + } +} + +impl From<&MeilisearchSinkConfig> for MeilisearchSinkConfigWithoutSecrets { + fn from(config: &MeilisearchSinkConfig) -> Self { + Self { + url: config.url.clone(), + index: config.index.clone(), + has_api_key: config.api_key.is_some(), + } + } +} + +/// Sink that indexes events in Meilisearch. +/// +/// Events are serialized as JSON documents and batch indexed. +/// The sink handles connection management and task waiting. +#[derive(Clone)] +pub struct MeilisearchSink { + /// Meilisearch client. + client: Arc, + + /// Default index name from config (can be overridden per-event). + index: Option, +} + +impl MeilisearchSink { + /// Creates a new Meilisearch sink from configuration. + /// + /// # Errors + /// + /// Returns an error if the client cannot be created. + pub async fn new( + config: MeilisearchSinkConfig, + ) -> Result> { + let client = Client::new(&config.url, config.api_key.as_deref())?; + + Ok(Self { + client: Arc::new(client), + index: config.index, + }) + } + + /// Resolves the index name for an event. + /// + /// Priority: event.metadata["index"] > config.index + fn resolve_index<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> { + // Check event metadata first. + if let Some(ref metadata) = event.metadata { + if let Some(index) = metadata.get("index").and_then(|v| v.as_str()) { + return Some(index); + } + } + // Fall back to config. + self.index.as_deref() + } +} + +impl Sink for MeilisearchSink { + fn name() -> &'static str { + "meilisearch" + } + + async fn publish_events(&self, events: Vec) -> EtlResult<()> { + if events.is_empty() { + return Ok(()); + } + + // Group events by target index (supports per-event routing). + let mut index_documents: HashMap> = HashMap::new(); + + for event in events { + let index_name = self.resolve_index(&event).ok_or_else(|| { + etl::etl_error!( + etl::error::ErrorKind::ConfigError, + "No index in config or event metadata" + ) + })?; + + index_documents + .entry(index_name.to_string()) + .or_default() + .push(event.payload); + } + + // Index documents to all target indexes concurrently. + try_join_all(index_documents.into_iter().map(|(index_name, documents)| { + let client = self.client.clone(); + async move { + let index = client.index(&index_name); + + let task = index.add_documents(&documents, None).await.map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::DestinationError, + "Failed to add documents to Meilisearch", + e.to_string() + ) + })?; + + // Wait for the task to complete and check status. + let completed_task = task + .wait_for_completion(&client, None, None) + .await + .map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::DestinationError, + "Failed to wait for Meilisearch task", + e.to_string() + ) + })?; + + // Check if task failed. + if completed_task.is_failure() { + let error = completed_task.unwrap_failure(); + return Err(etl::etl_error!( + etl::error::ErrorKind::DestinationError, + "Meilisearch task failed", + error.error_message + )); + } + + Ok::<_, etl::error::EtlError>(()) + } + })) + .await?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sink_name() { + assert_eq!(MeilisearchSink::name(), "meilisearch"); + } + + #[test] + fn test_config_without_secrets() { + let config = MeilisearchSinkConfig { + url: "http://localhost:7700".to_string(), + index: Some("events".to_string()), + api_key: Some("secret-api-key".to_string()), + }; + + let without_secrets: MeilisearchSinkConfigWithoutSecrets = (&config).into(); + + assert_eq!(without_secrets.url, "http://localhost:7700"); + assert_eq!(without_secrets.index, Some("events".to_string())); + assert!(without_secrets.has_api_key); + } + + #[test] + fn test_config_without_secrets_no_api_key() { + let config = MeilisearchSinkConfig { + url: "http://localhost:7700".to_string(), + index: Some("events".to_string()), + api_key: None, + }; + + let without_secrets: MeilisearchSinkConfigWithoutSecrets = (&config).into(); + + assert!(!without_secrets.has_api_key); + } +} diff --git a/src/sink/mod.rs b/src/sink/mod.rs index 4f8ab01..42fabed 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -1,21 +1,35 @@ +//! Sink implementations for event publishing. +//! +//! Provides destinations for replicated PostgreSQL events. + mod base; pub mod memory; +#[cfg(feature = "sink-meilisearch")] +pub mod meilisearch; + pub use base::Sink; use etl::error::EtlResult; use memory::MemorySink; +#[cfg(feature = "sink-meilisearch")] +use meilisearch::MeilisearchSink; + use crate::types::TriggeredEvent; /// Wrapper enum for all supported sink types. /// -/// Enables runtime sink selection while maintaining static dispatch. -/// Each variant wraps a concrete sink implementation gated by its feature flag. +/// Enables runtime sink selection while maintaining static dispatch +/// for better performance. Each variant wraps a concrete sink type. #[derive(Clone)] pub enum AnySink { /// In-memory sink for testing and development. Memory(MemorySink), + + #[cfg(feature = "sink-meilisearch")] + /// Meilisearch sink for search engine indexing. + Meilisearch(MeilisearchSink), } impl Sink for AnySink { @@ -26,6 +40,9 @@ impl Sink for AnySink { async fn publish_events(&self, events: Vec) -> EtlResult<()> { match self { AnySink::Memory(sink) => sink.publish_events(events).await, + + #[cfg(feature = "sink-meilisearch")] + AnySink::Meilisearch(sink) => sink.publish_events(events).await, } } } diff --git a/src/test_utils/container.rs b/src/test_utils/container.rs index 82625ea..b24d319 100644 --- a/src/test_utils/container.rs +++ b/src/test_utils/container.rs @@ -2,6 +2,7 @@ use ctor::dtor; use etl::config::{PgConnectionConfig, TlsConfig}; use std::sync::{Mutex, OnceLock}; use testcontainers::{ContainerRequest, ImageExt, runners::SyncRunner}; +use testcontainers_modules::meilisearch::Meilisearch; use testcontainers_modules::postgres::Postgres; use uuid::Uuid; @@ -74,3 +75,46 @@ pub async fn test_pg_config() -> PgConnectionConfig { keepalive: None, } } + +static MEILISEARCH_PORT: OnceLock = OnceLock::new(); +/// Using Mutex> so we can take ownership for cleanup. +static MEILISEARCH_CONTAINER: OnceLock>>> = + OnceLock::new(); + +/// Cleanup function that runs at program exit to stop and remove the Meilisearch container. +#[dtor] +fn cleanup_meilisearch_container() { + if let Some(mutex) = MEILISEARCH_CONTAINER.get() { + if let Ok(mut guard) = mutex.lock() { + if let Some(container) = guard.take() { + // rm() stops and removes the container. + let _ = container.rm(); + } + } + } +} + +/// Ensures a Meilisearch container is running and returns its HTTP API port. +/// +/// The container is reused across tests. Used for testing Meilisearch sink. +pub async fn ensure_meilisearch() -> u16 { + *MEILISEARCH_PORT.get_or_init(|| { + std::thread::spawn(|| { + let container: ContainerRequest = Meilisearch::default().into(); + + let container = container + .start() + .expect("Failed to start Meilisearch container"); + + let port = container + .get_host_port_ipv4(7700) + .expect("Failed to get Meilisearch port"); + + let _ = MEILISEARCH_CONTAINER.set(Mutex::new(Some(container))); + + port + }) + .join() + .expect("Failed to join container startup thread") + }) +} diff --git a/tests/meilisearch_sink_tests.rs b/tests/meilisearch_sink_tests.rs new file mode 100644 index 0000000..4803558 --- /dev/null +++ b/tests/meilisearch_sink_tests.rs @@ -0,0 +1,242 @@ +//! Integration tests for Meilisearch sink. +//! +//! Uses Meilisearch testcontainer to test document indexing. + +#![cfg(feature = "sink-meilisearch")] + +use chrono::Utc; +use meilisearch_sdk::client::Client; +use postgres_stream::sink::Sink; +use postgres_stream::sink::meilisearch::{MeilisearchSink, MeilisearchSinkConfig}; +use postgres_stream::test_utils::ensure_meilisearch; +use postgres_stream::types::{EventIdentifier, PgLsn, StreamId, TriggeredEvent}; +use uuid::Uuid; + +/// Creates a test event with the given payload key. +/// Includes an `id` field in payload (as users would do via payload_extensions). +fn make_test_event(key: &str) -> TriggeredEvent { + let id = Uuid::new_v4().to_string(); + TriggeredEvent { + id: EventIdentifier::new(id.clone(), Utc::now()), + stream_id: StreamId::default(), + payload: serde_json::json!({ "id": id, "key": key, "value": "test_data" }), + metadata: Some(serde_json::json!({ "source": "test" })), + lsn: Some(PgLsn::from(12345u64)), + } +} + +/// Creates a Meilisearch client for testing. +fn create_test_client(port: u16) -> Client { + let url = format!("http://127.0.0.1:{port}"); + Client::new(&url, Option::<&str>::None).expect("Failed to create client") +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_meilisearch_sink_indexes_events() { + let port = ensure_meilisearch().await; + let index_name = format!("test-index-{}", Uuid::new_v4()); + + // Create sink. + let config = MeilisearchSinkConfig { + url: format!("http://127.0.0.1:{port}"), + index: Some(index_name.clone()), + api_key: None, + }; + + let sink = MeilisearchSink::new(config) + .await + .expect("Failed to create sink"); + + // Publish events. + let event1 = make_test_event("event1"); + let event2 = make_test_event("event2"); + let event1_id = event1.id.id.clone(); + let event2_id = event2.id.id.clone(); + + sink.publish_events(vec![event1, event2]) + .await + .expect("Failed to publish"); + + // Verify documents exist. + let client = create_test_client(port); + let index = client.index(&index_name); + + let doc1: serde_json::Value = index + .get_document(&event1_id) + .await + .expect("Failed to get document"); + assert_eq!(doc1["id"], event1_id); + + let doc2: serde_json::Value = index + .get_document(&event2_id) + .await + .expect("Failed to get document"); + assert_eq!(doc2["id"], event2_id); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_meilisearch_sink_handles_empty_batch() { + let port = ensure_meilisearch().await; + let index_name = format!("test-index-{}", Uuid::new_v4()); + + let config = MeilisearchSinkConfig { + url: format!("http://127.0.0.1:{port}"), + index: Some(index_name), + api_key: None, + }; + + let sink = MeilisearchSink::new(config) + .await + .expect("Failed to create sink"); + + // Publishing empty batch should succeed without making any API calls. + sink.publish_events(vec![]) + .await + .expect("Empty batch should succeed"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_meilisearch_sink_indexes_only_payload() { + let port = ensure_meilisearch().await; + let index_name = format!("test-index-{}", Uuid::new_v4()); + + let config = MeilisearchSinkConfig { + url: format!("http://127.0.0.1:{port}"), + index: Some(index_name.clone()), + api_key: None, + }; + + let sink = MeilisearchSink::new(config) + .await + .expect("Failed to create sink"); + + // Create event with metadata. + // Include `id` in payload (as users would do via payload_extensions). + let event_id = Uuid::new_v4().to_string(); + let event = TriggeredEvent { + id: EventIdentifier::new(event_id.clone(), Utc::now()), + stream_id: StreamId::default(), + payload: serde_json::json!({ "id": event_id, "action": "created", "user": 456 }), + metadata: Some(serde_json::json!({ "source": "api" })), + lsn: Some(PgLsn::from(99999u64)), + }; + + sink.publish_events(vec![event]) + .await + .expect("Failed to publish"); + + // Retrieve document (with retry for eventual consistency). + let client = create_test_client(port); + let index = client.index(&index_name); + + let doc: serde_json::Value = index + .get_document(&event_id) + .await + .expect("Failed to get document"); + + // Only payload fields should be present (id from payload, not injected). + assert_eq!(doc["action"], "created"); + assert_eq!(doc["user"], 456); + assert_eq!(doc["id"], event_id); // From payload, not injected. + // No envelope fields. + assert!(doc.get("created_at").is_none()); + assert!(doc.get("metadata").is_none()); + assert!(doc.get("lsn").is_none()); + assert!(doc.get("stream_id").is_none()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_meilisearch_sink_searchable() { + let port = ensure_meilisearch().await; + let index_name = format!("test-index-{}", Uuid::new_v4()); + + let config = MeilisearchSinkConfig { + url: format!("http://127.0.0.1:{port}"), + index: Some(index_name.clone()), + api_key: None, + }; + + let sink = MeilisearchSink::new(config) + .await + .expect("Failed to create sink"); + + // Publish events with different keys. + let events = vec![ + make_test_event("alpha"), + make_test_event("beta"), + make_test_event("gamma"), + ]; + + sink.publish_events(events) + .await + .expect("Failed to publish"); + + // Search for documents. + let client = create_test_client(port); + let index = client.index(&index_name); + + // Wait for indexing to complete. + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + let results = index + .search() + .execute::() + .await + .expect("Failed to search"); + + assert_eq!( + results.hits.len(), + 3, + "Expected 3 documents, got {}", + results.hits.len() + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_meilisearch_sink_uses_index_from_metadata() { + let port = ensure_meilisearch().await; + let metadata_index = format!("metadata-index-{}", Uuid::new_v4()); + + // Create sink with NO default index. + let config = MeilisearchSinkConfig { + url: format!("http://127.0.0.1:{port}"), + index: None, + api_key: None, + }; + + let sink = MeilisearchSink::new(config) + .await + .expect("Failed to create sink"); + + // Create event with index in metadata. + // Include `id` in payload (as users would do via payload_extensions). + let event_id = Uuid::new_v4().to_string(); + let event = TriggeredEvent { + id: EventIdentifier::new(event_id.clone(), Utc::now()), + stream_id: StreamId::default(), + payload: serde_json::json!({ "id": event_id, "routed": true }), + metadata: Some(serde_json::json!({ "index": metadata_index })), + lsn: None, + }; + + sink.publish_events(vec![event]) + .await + .expect("Failed to publish"); + + // Verify document was indexed to metadata-specified index. + let client = create_test_client(port); + let index = client.index(&metadata_index); + + let doc: serde_json::Value = index + .get_document(&event_id) + .await + .expect("Failed to get document from metadata-specified index"); + + assert_eq!(doc["routed"], true); + assert_eq!(doc["id"], event_id); +} + +#[test] +fn test_sink_name() { + assert_eq!(MeilisearchSink::name(), "meilisearch"); +}