Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ uuid = { version = "^1", default-features = false, optional = true }
ouroboros = { version = "0.15", default-features = false }
url = { version = "^2.2", default-features = false }
thiserror = { version = "^1", default-features = false }
tokio = { version = "^1.6", default-features = false, features = ["sync"] }
Comment thread
tyt2y3 marked this conversation as resolved.
Outdated
async-channel = { version = "^1.7", default-features = false }
Comment thread
tyt2y3 marked this conversation as resolved.
Outdated

[dev-dependencies]
smol = { version = "^1.2" }
Expand All @@ -59,6 +61,7 @@ pretty_assertions = { version = "^0.7" }
time = { version = "^0.3", features = ["macros"] }
uuid = { version = "^1", features = ["v4"] }
once_cell = "1.8"
async-channel = { version = "^1.7" }

[features]
debug-print = []
Expand Down
13 changes: 11 additions & 2 deletions src/database/db_connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
error::*, ConnectionTrait, DatabaseTransaction, ExecResult, QueryResult, Statement,
StatementBuilder, StreamTrait, TransactionError, TransactionTrait,
error::*, ConnectionTrait, DatabaseTransaction, EventStream, ExecResult, QueryResult,
Statement, StatementBuilder, StreamTrait, TransactionError, TransactionTrait,
};
use sea_query::{MysqlQueryBuilder, PostgresQueryBuilder, QueryBuilder, SqliteQueryBuilder};
use std::{future::Future, pin::Pin};
Expand Down Expand Up @@ -283,6 +283,15 @@ impl DatabaseConnection {
_ => {}
}
}

pub fn set_event_stream<E>(&mut self, event_stream: E) -> E::Receiver
where
E: EventStream,
{
let (sender, receiver) = event_stream.subscribe();
// TODO: Save the `sender` in `DatabaseConnection`
receiver
}
}

impl DbBackend {
Expand Down
81 changes: 81 additions & 0 deletions src/database/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use crate::DbErr;

pub trait EventStream {
type Sender: EventSender;
type Receiver: EventReceiver;

fn subscribe(self) -> (Self::Sender, Self::Receiver);
}

#[async_trait::async_trait]
pub trait EventSender {
async fn send(&self, event: Event) -> Result<(), DbErr>;
}

#[async_trait::async_trait]
pub trait EventReceiver {
async fn recv(&mut self) -> Result<Event, DbErr>;
}

#[derive(Debug, Clone)]
pub enum Event {
Insert,
Update,
Delete,
}

mod event_stream_tokio {
use super::*;
use tokio::sync::broadcast::{Receiver, Sender};

impl EventStream for (Sender<Event>, Receiver<Event>) {
type Sender = Sender<Event>;
type Receiver = Receiver<Event>;

fn subscribe(self) -> (Self::Sender, Self::Receiver) {
self
}
}

#[async_trait::async_trait]
impl EventSender for Sender<Event> {
async fn send(&self, event: Event) -> Result<(), DbErr> {
self.send(event).map(|_| ()).map_err(|e| todo!())
}
}

#[async_trait::async_trait]
impl EventReceiver for Receiver<Event> {
async fn recv(&mut self) -> Result<Event, DbErr> {
self.recv().await.map_err(|e| todo!())
}
}
}

mod event_stream_async_channel {
use super::*;
use async_channel::{Receiver, Sender};

impl EventStream for (Sender<Event>, Receiver<Event>) {
type Sender = Sender<Event>;
type Receiver = Receiver<Event>;

fn subscribe(self) -> (Self::Sender, Self::Receiver) {
self
}
}

#[async_trait::async_trait]
impl EventSender for Sender<Event> {
async fn send(&self, event: Event) -> Result<(), DbErr> {
self.send(event).await.map_err(|e| todo!())
}
}

#[async_trait::async_trait]
impl EventReceiver for Receiver<Event> {
async fn recv(&mut self) -> Result<Event, DbErr> {
self.recv().await.map_err(|e| todo!())
}
}
}
2 changes: 2 additions & 0 deletions src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

mod connection;
mod db_connection;
mod event;
#[cfg(feature = "mock")]
mod mock;
mod statement;
Expand All @@ -10,6 +11,7 @@ mod transaction;

pub use connection::*;
pub use db_connection::*;
pub use event::*;
#[cfg(feature = "mock")]
pub use mock::*;
pub use statement::*;
Expand Down
7 changes: 6 additions & 1 deletion tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ pub use sea_orm::{entity::*, error::*, query::*, sea_query, tests_cfg::*, Databa
async fn main() -> Result<(), DbErr> {
let base_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| "sqlite::memory:".to_owned());

let db: DbConn = Database::connect(&base_url).await?;
let mut db: DbConn = Database::connect(&base_url).await?;

let tokio_receiver = db.set_event_stream(tokio::sync::broadcast::channel(10));

let async_channel_receiver = db.set_event_stream(async_channel::bounded(10));

setup_schema(&db).await?;
crud_cake(&db).await?;

Expand Down