Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
stats::delete_stats(&stream_name, "json")
.unwrap_or_else(|e| warn!("failed to delete stats for stream {}: {:?}", stream_name, e));


// clear filters associated to the deleted logstream
if let Err(e) = PARSEABLE.metastore.delete_zombie_filters(&stream_name).await {
warn!("failed to delete zombie filters associated to stream {}: {:?}", stream_name, e);
}

Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
}

Expand Down
1 change: 1 addition & 0 deletions src/metastore/metastore_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub trait Metastore: std::fmt::Debug + Send + Sync {
async fn get_filters(&self) -> Result<Vec<Filter>, MetastoreError>;
async fn put_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
async fn delete_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
async fn delete_zombie_filters(&self, stream_name: &str) -> Result<bool, MetastoreError>;

/// correlations
async fn get_correlations(&self) -> Result<Vec<Bytes>, MetastoreError>;
Expand Down
34 changes: 33 additions & 1 deletion src/metastore/metastores/object_store_metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::{
parseable_json_path, schema_path, stream_json_path, to_bytes,
},
},
users::filters::{Filter, migrate_v1_v2},
users::filters::{FILTERS, Filter, migrate_v1_v2},
};

/// Using PARSEABLE's storage as a metastore (default)
Expand Down Expand Up @@ -546,6 +546,38 @@ impl Metastore for ObjectStoreMetastore {
.await?)
}

// clear filters associated to a deleted stream
async fn delete_zombie_filters(&self, stream_name: &str) -> Result<bool, MetastoreError> {
// stream should not exist in order to have zombie filters
if PARSEABLE.check_stream_exists(stream_name) {
warn!("no zombie filters cleared for [undeleted] stream {}", stream_name);
return Ok(false);
}

let all_filters = match self.get_filters().await {
Ok(all_f) => all_f,
Err(e) => {
return Err(e);
}
};

// collect filters associated with the logstream being deleted
let filters_for_stream: Vec<Filter> = all_filters
.into_iter()
.filter(|filter| filter.stream_name == stream_name)
.collect();

for filter in filters_for_stream.iter() {
self.delete_filter(filter).await?;

if let Some(filter_id) = filter.filter_id.as_ref() {
FILTERS.delete_filter(filter_id).await;
}
}

return Ok(true);
}

/// Get all correlations
async fn get_correlations(&self) -> Result<Vec<Bytes>, MetastoreError> {
let mut correlations = Vec::new();
Expand Down
5 changes: 5 additions & 0 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ impl Parseable {
.unwrap_or_default()
}

// check if a stream exists
pub fn check_stream_exists(&self, stream_name: &str) -> bool {
self.streams.contains(stream_name)
}

// validate the storage, if the proper path for staging directory is provided
// if the proper data directory is provided, or s3 bucket is provided etc
pub async fn validate_storage(&self) -> Result<Option<Bytes>, ObjectStorageError> {
Expand Down
Loading