From 9c451fd1b44bbfd52c87cdde82927b39ecff0505 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Tue, 5 Mar 2024 16:36:57 +0100 Subject: [PATCH 01/16] Make dataset alias comparisons case-insensitive --- .../src/identity/dataset_identity.rs | 63 +++++++++++++++++-- .../src/repos/dataset_repository_local_fs.rs | 9 ++- .../core/src/repos/dataset_repository_s3.rs | 4 +- .../core/src/utils/datasets_filtering.rs | 7 ++- 4 files changed, 73 insertions(+), 10 deletions(-) diff --git a/src/domain/opendatafabric/src/identity/dataset_identity.rs b/src/domain/opendatafabric/src/identity/dataset_identity.rs index 6f4656ea2e..ecd44dba29 100644 --- a/src/domain/opendatafabric/src/identity/dataset_identity.rs +++ b/src/domain/opendatafabric/src/identity/dataset_identity.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. use std::convert::{AsRef, TryFrom}; +use std::hash::Hash; use std::sync::Arc; use std::{cmp, fmt, ops}; @@ -104,7 +105,7 @@ macro_rules! impl_serde { } pub(crate) use impl_serde; -use like::Like; +use like::ILike; //////////////////////////////////////////////////////////////////////////////// @@ -221,6 +222,10 @@ impl DatasetName { pub fn into_local_ref(self) -> DatasetRef { DatasetRef::Alias(DatasetAlias::new(None, self)) } + + fn lowercase_eq(&self, other: &Self) -> bool { + self.to_lowercase() == other.to_lowercase() + } } /////////////////////////////////////////////////////////////////////////////// @@ -233,7 +238,7 @@ newtype_str!( impl DatasetNamePattern { pub fn matches(&self, dataset_name: &DatasetName) -> bool { - Like::::like(dataset_name.as_str(), self).unwrap() + ILike::::ilike(dataset_name.as_str(), self).unwrap() } } @@ -305,13 +310,19 @@ newtype_str!( AccountNameSerdeVisitor ); +impl AccountName { + fn lowercase_eq(&self, other: &Self) -> bool { + self.to_lowercase() == other.to_lowercase() + } +} + //////////////////////////////////////////////////////////////////////////////// newtype_str!(RepoName, Grammar::match_repo_name, RepoNameSerdeVisitor); //////////////////////////////////////////////////////////////////////////////// -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Debug, Clone, Eq, PartialOrd, Ord)] pub struct DatasetAlias { pub account_name: Option, pub dataset_name: DatasetName, @@ -358,6 +369,27 @@ impl DatasetAlias { } } +impl PartialEq for DatasetAlias { + fn eq(&self, other: &Self) -> bool { + ((self.account_name.is_none() && other.account_name.is_none()) + || (self.account_name.is_some() + && other.account_name.is_some() + && self + .account_name + .as_ref() + .unwrap() + .lowercase_eq(other.account_name.as_ref().unwrap()))) + && self.dataset_name.lowercase_eq(&other.dataset_name) + } +} + +impl Hash for DatasetAlias { + fn hash(&self, state: &mut H) { + self.account_name.hash(state); + self.dataset_name.hash(state); + } +} + impl std::str::FromStr for DatasetAlias { type Err = ParseError; fn from_str(s: &str) -> Result { @@ -388,7 +420,7 @@ impl_serde!(DatasetAlias, DatasetAliasSerdeVisitor); //////////////////////////////////////////////////////////////////////////////// -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Debug, Clone, Eq, PartialOrd, Ord)] pub struct DatasetAliasRemote { pub repo_name: RepoName, pub account_name: Option, @@ -437,6 +469,29 @@ impl DatasetAliasRemote { } } +impl PartialEq for DatasetAliasRemote { + fn eq(&self, other: &Self) -> bool { + (self.repo_name == other.repo_name) + && ((self.account_name.is_some() + && other.account_name.is_some() + && self + .account_name + .as_ref() + .unwrap() + .lowercase_eq(other.account_name.as_ref().unwrap())) + || (self.account_name.is_none() && other.account_name.is_none())) + && self.dataset_name.lowercase_eq(&other.dataset_name) + } +} + +impl Hash for DatasetAliasRemote { + fn hash(&self, state: &mut H) { + self.repo_name.hash(state); + self.account_name.hash(state); + self.dataset_name.hash(state); + } +} + impl std::str::FromStr for DatasetAliasRemote { type Err = ParseError; fn from_str(s: &str) -> Result { diff --git a/src/infra/core/src/repos/dataset_repository_local_fs.rs b/src/infra/core/src/repos/dataset_repository_local_fs.rs index a6840487ea..e694f384b4 100644 --- a/src/infra/core/src/repos/dataset_repository_local_fs.rs +++ b/src/infra/core/src/repos/dataset_repository_local_fs.rs @@ -516,7 +516,7 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { } fn get_datasets_by_owner(&self, account_name: AccountName) -> DatasetHandleStream<'_> { - if account_name == DEFAULT_ACCOUNT_NAME { + if account_name.to_lowercase() == DEFAULT_ACCOUNT_NAME { self.get_all_datasets() } else { Box::pin(futures::stream::empty()) @@ -529,7 +529,8 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { ) -> Result { assert!( !dataset_alias.is_multi_tenant() - || dataset_alias.account_name.as_ref().unwrap() == DEFAULT_ACCOUNT_NAME, + || dataset_alias.account_name.as_ref().unwrap().to_lowercase() + == DEFAULT_ACCOUNT_NAME, "Multi-tenant refs shouldn't have reached down to here with earlier validations" ); @@ -809,7 +810,9 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy { ) .await?; - if candidate_dataset_alias.dataset_name == dataset_alias.dataset_name { + if candidate_dataset_alias.dataset_name.to_lowercase() + == dataset_alias.dataset_name.to_lowercase() + { return Ok(DatasetHandle::new(dataset_id, candidate_dataset_alias)); } } diff --git a/src/infra/core/src/repos/dataset_repository_s3.rs b/src/infra/core/src/repos/dataset_repository_s3.rs index e7ed7d4dce..9d6c6e72fb 100644 --- a/src/infra/core/src/repos/dataset_repository_s3.rs +++ b/src/infra/core/src/repos/dataset_repository_s3.rs @@ -197,13 +197,13 @@ impl DatasetRepository for DatasetRepositoryS3 { } fn get_datasets_by_owner(&self, account_name: AccountName) -> DatasetHandleStream<'_> { - if !self.is_multi_tenant() && account_name != DEFAULT_ACCOUNT_NAME { + if !self.is_multi_tenant() && account_name.to_lowercase() != DEFAULT_ACCOUNT_NAME { return Box::pin(futures::stream::empty()); } self.stream_datasets_if(move |dataset_alias| { if let Some(dataset_account_name) = &dataset_alias.account_name { - dataset_account_name == &account_name + dataset_account_name.to_lowercase() == account_name.to_lowercase() } else { true } diff --git a/src/infra/core/src/utils/datasets_filtering.rs b/src/infra/core/src/utils/datasets_filtering.rs index 19d950fb2f..d62162111b 100644 --- a/src/infra/core/src/utils/datasets_filtering.rs +++ b/src/infra/core/src/utils/datasets_filtering.rs @@ -161,7 +161,12 @@ pub fn matches_remote_ref_pattern( DatasetRefAnyPattern::PatternRemote(repo_name, account_name, dataset_name_pattern) => { repo_name == &dataset_alias_remote.repo_name && (dataset_alias_remote.account_name.is_some() - && account_name == dataset_alias_remote.account_name.as_ref().unwrap()) + && account_name.to_lowercase() + == dataset_alias_remote + .account_name + .as_ref() + .unwrap() + .to_lowercase()) && dataset_name_pattern.matches(&dataset_alias_remote.dataset_name) } } From 24ff5c4593c7d6b3deec3046484475aff7a1b422 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Tue, 5 Mar 2024 18:34:12 +0100 Subject: [PATCH 02/16] Add unit tests --- .../src/identity/dataset_identity.rs | 13 +++-- .../tests/tests/test_dataset_identity.rs | 52 +++++++++++++++++++ .../tests/tests/test_dataset_refs.rs | 2 +- .../src/repos/dataset_repository_local_fs.rs | 5 +- .../core/src/utils/datasets_filtering.rs | 11 ++-- 5 files changed, 70 insertions(+), 13 deletions(-) diff --git a/src/domain/opendatafabric/src/identity/dataset_identity.rs b/src/domain/opendatafabric/src/identity/dataset_identity.rs index ecd44dba29..f9421033dc 100644 --- a/src/domain/opendatafabric/src/identity/dataset_identity.rs +++ b/src/domain/opendatafabric/src/identity/dataset_identity.rs @@ -223,7 +223,7 @@ impl DatasetName { DatasetRef::Alias(DatasetAlias::new(None, self)) } - fn lowercase_eq(&self, other: &Self) -> bool { + pub fn lowercase_eq(&self, other: &Self) -> bool { self.to_lowercase() == other.to_lowercase() } } @@ -262,7 +262,14 @@ impl DatasetAliasPattern { } pub fn matches(&self, dataset_handle: &DatasetHandle) -> bool { - (self.account_name.is_none() || self.account_name == dataset_handle.alias.account_name) + ((self.account_name.is_none() && dataset_handle.alias.account_name.is_none()) + || (self.account_name.is_some() + && dataset_handle.alias.account_name.is_some() + && self + .account_name + .as_ref() + .unwrap() + .lowercase_eq(dataset_handle.alias.account_name.as_ref().unwrap()))) && self .dataset_name_pattern .matches(&dataset_handle.alias.dataset_name) @@ -311,7 +318,7 @@ newtype_str!( ); impl AccountName { - fn lowercase_eq(&self, other: &Self) -> bool { + pub fn lowercase_eq(&self, other: &Self) -> bool { self.to_lowercase() == other.to_lowercase() } } diff --git a/src/domain/opendatafabric/tests/tests/test_dataset_identity.rs b/src/domain/opendatafabric/tests/tests/test_dataset_identity.rs index 2272df140f..a0149ab626 100644 --- a/src/domain/opendatafabric/tests/tests/test_dataset_identity.rs +++ b/src/domain/opendatafabric/tests/tests/test_dataset_identity.rs @@ -233,3 +233,55 @@ fn test_dataset_refs_conversions() { alias: DatasetAlias::try_from("bar").unwrap(), }); } + +#[test] +fn test_dataset_alias_eq() { + assert_eq!( + DatasetAlias::from_str("account/net.example.com").unwrap(), + DatasetAlias::from_str("aCCouNt/net.ExaMplE.coM").unwrap(), + ); + assert_eq!( + DatasetAlias::from_str("account/net.example.com").unwrap(), + DatasetAlias::from_str("account/net.example.com").unwrap(), + ); + assert_eq!( + DatasetAlias::from_str("net.example.com").unwrap(), + DatasetAlias::from_str("net.ExaMplE.coM").unwrap(), + ); + assert_ne!( + DatasetAlias::from_str("account/net.example.com").unwrap(), + DatasetAlias::from_str("aCCouNt1/net.eXamPle.cOm").unwrap(), + ); + assert_ne!( + DatasetAlias::from_str("account1/net.example.com").unwrap(), + DatasetAlias::from_str("account/net.example.com").unwrap(), + ); + assert_ne!( + DatasetAlias::from_str("net.example.com").unwrap(), + DatasetAlias::from_str("account/net.example.com").unwrap(), + ); +} + +#[test] +fn test_dataset_remote_alias_eq() { + assert_eq!( + DatasetAliasRemote::from_str("repository/net.example.com").unwrap(), + DatasetAliasRemote::from_str("repository/net.ExaMplE.coM").unwrap(), + ); + assert_eq!( + DatasetAliasRemote::from_str("repository/net.example.com").unwrap(), + DatasetAliasRemote::from_str("repository/net.example.com").unwrap(), + ); + assert_eq!( + DatasetAliasRemote::from_str("repository/account/net.example.com").unwrap(), + DatasetAliasRemote::from_str("repository/AccOuNt/net.ExaMplE.coM").unwrap(), + ); + assert_ne!( + DatasetAliasRemote::from_str("repository/net.example.com").unwrap(), + DatasetAliasRemote::from_str("rEpoSitOry/net.ExaMplE.coM").unwrap(), + ); + assert_ne!( + DatasetAliasRemote::from_str("repository/account/net.example.com").unwrap(), + DatasetAliasRemote::from_str("repository/net.example.com").unwrap(), + ); +} diff --git a/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs b/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs index 8359b59712..045c440b70 100644 --- a/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs +++ b/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs @@ -117,7 +117,7 @@ fn test_dataset_ref_pattern_match() { }, }; - assert!(pattern.matches(&dataset_handle)); + assert!(!pattern.matches(&dataset_handle)); let dataset_account = "account1"; let dataset_name_pattern = "net%"; diff --git a/src/infra/core/src/repos/dataset_repository_local_fs.rs b/src/infra/core/src/repos/dataset_repository_local_fs.rs index e694f384b4..8d4f347cee 100644 --- a/src/infra/core/src/repos/dataset_repository_local_fs.rs +++ b/src/infra/core/src/repos/dataset_repository_local_fs.rs @@ -810,8 +810,9 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy { ) .await?; - if candidate_dataset_alias.dataset_name.to_lowercase() - == dataset_alias.dataset_name.to_lowercase() + if candidate_dataset_alias + .dataset_name + .lowercase_eq(&dataset_alias.dataset_name) { return Ok(DatasetHandle::new(dataset_id, candidate_dataset_alias)); } diff --git a/src/infra/core/src/utils/datasets_filtering.rs b/src/infra/core/src/utils/datasets_filtering.rs index d62162111b..7c3171bfb4 100644 --- a/src/infra/core/src/utils/datasets_filtering.rs +++ b/src/infra/core/src/utils/datasets_filtering.rs @@ -161,12 +161,8 @@ pub fn matches_remote_ref_pattern( DatasetRefAnyPattern::PatternRemote(repo_name, account_name, dataset_name_pattern) => { repo_name == &dataset_alias_remote.repo_name && (dataset_alias_remote.account_name.is_some() - && account_name.to_lowercase() - == dataset_alias_remote - .account_name - .as_ref() - .unwrap() - .to_lowercase()) + && account_name + .lowercase_eq(dataset_alias_remote.account_name.as_ref().unwrap())) && dataset_name_pattern.matches(&dataset_alias_remote.dataset_name) } } @@ -202,7 +198,8 @@ pub fn matches_local_ref_pattern( } DatasetRefAnyPattern::PatternAmbiguous(account_name, dataset_name_pattern) => { let account_name = AccountName::from_str(&account_name.pattern).unwrap(); - Some(account_name) == dataset_handle.alias.account_name + (dataset_handle.alias.account_name.is_some() + && account_name.lowercase_eq(dataset_handle.alias.account_name.as_ref().unwrap())) && dataset_name_pattern.matches(&dataset_handle.alias.dataset_name) } } From 1e65a57c713abae8ebe661a7ce41bcc718bfd3c9 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Thu, 7 Mar 2024 19:13:09 +0100 Subject: [PATCH 03/16] Incapsulate lowercase in str type --- .../src/services/accounts/account_service.rs | 9 +- .../src/identity/dataset_identity.rs | 107 ++++++------------ src/domain/opendatafabric/src/lib.rs | 1 + .../src/repos/dataset_repository_local_fs.rs | 10 +- .../core/src/repos/dataset_repository_s3.rs | 4 +- .../core/src/utils/datasets_filtering.rs | 10 +- .../repos/test_dataset_repository_local_fs.rs | 36 ++++++ .../tests/repos/test_dataset_repository_s3.rs | 16 +++ .../repos/test_dataset_repository_shared.rs | 60 ++++++++++ 9 files changed, 162 insertions(+), 91 deletions(-) diff --git a/src/app/cli/src/services/accounts/account_service.rs b/src/app/cli/src/services/accounts/account_service.rs index f3d869a8da..acf99a7926 100644 --- a/src/app/cli/src/services/accounts/account_service.rs +++ b/src/app/cli/src/services/accounts/account_service.rs @@ -122,7 +122,9 @@ impl AccountService { fn find_account_info_impl(&self, account_name: &String) -> Option { // The account might be predefined in the configuration - self.predefined_accounts.get(account_name).cloned() + self.predefined_accounts + .get(&account_name.to_ascii_lowercase()) + .cloned() } fn get_account_info_impl( @@ -130,7 +132,10 @@ impl AccountService { account_name: &String, ) -> Result { // The account might be predefined in the configuration - match self.predefined_accounts.get(account_name) { + match self + .predefined_accounts + .get(&account_name.to_ascii_lowercase()) + { // Use the predefined record Some(account_info) => Ok(account_info.clone()), diff --git a/src/domain/opendatafabric/src/identity/dataset_identity.rs b/src/domain/opendatafabric/src/identity/dataset_identity.rs index f9421033dc..c2cf8ecba0 100644 --- a/src/domain/opendatafabric/src/identity/dataset_identity.rs +++ b/src/domain/opendatafabric/src/identity/dataset_identity.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::borrow::Cow; use std::convert::{AsRef, TryFrom}; use std::hash::Hash; use std::sync::Arc; @@ -109,14 +110,14 @@ use like::ILike; //////////////////////////////////////////////////////////////////////////////// -macro_rules! newtype_str { +macro_rules! newtype_istr { ($typ:ident, $parse:expr, $visitor:ident) => { - #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] + #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] pub struct $typ(Arc); impl $typ { pub fn new_unchecked + ?Sized>(s: &S) -> Self { - Self(Arc::from(s.as_ref())) + Self(Arc::from(Self::into_lowercase(s.as_ref()))) } pub fn as_str(&self) -> &str { @@ -130,6 +131,15 @@ macro_rules! newtype_str { pub fn from_inner_unchecked(s: Arc) -> Self { Self(s) } + + pub fn into_lowercase<'a>(s: &'a str) -> Cow<'a, str> { + let bytes = s.as_bytes(); + if !bytes.iter().any(u8::is_ascii_uppercase) { + Cow::Borrowed(s) + } else { + Cow::Owned(s.to_ascii_lowercase()) + } + } } impl From<$typ> for String { @@ -153,7 +163,7 @@ macro_rules! newtype_str { impl std::str::FromStr for $typ { type Err = ::multiformats::ParseError<$typ>; fn from_str(s: &str) -> Result { - match $parse(s) { + match $parse(&$typ::into_lowercase(s)) { Some((_, "")) => Ok(Self::new_unchecked(s)), _ => Err(ParseError::new(s)), } @@ -208,7 +218,7 @@ macro_rules! newtype_str { //////////////////////////////////////////////////////////////////////////////// -newtype_str!( +newtype_istr!( DatasetName, Grammar::match_dataset_name, DatasetNameSerdeVisitor @@ -222,15 +232,11 @@ impl DatasetName { pub fn into_local_ref(self) -> DatasetRef { DatasetRef::Alias(DatasetAlias::new(None, self)) } - - pub fn lowercase_eq(&self, other: &Self) -> bool { - self.to_lowercase() == other.to_lowercase() - } } /////////////////////////////////////////////////////////////////////////////// -newtype_str!( +newtype_istr!( DatasetNamePattern, Grammar::match_dataset_name_pattern, DatasetNamePatternSerdeVisitor @@ -262,17 +268,20 @@ impl DatasetAliasPattern { } pub fn matches(&self, dataset_handle: &DatasetHandle) -> bool { - ((self.account_name.is_none() && dataset_handle.alias.account_name.is_none()) - || (self.account_name.is_some() - && dataset_handle.alias.account_name.is_some() - && self - .account_name - .as_ref() - .unwrap() - .lowercase_eq(dataset_handle.alias.account_name.as_ref().unwrap()))) + if self.account_name.is_some() && dataset_handle.alias.account_name.is_none() + || self.account_name.is_none() && dataset_handle.alias.account_name.is_some() + { + return false; + } + if self.account_name.as_ref().unwrap() + == dataset_handle.alias.account_name.as_ref().unwrap() && self .dataset_name_pattern .matches(&dataset_handle.alias.dataset_name) + { + return true; + } + false } } @@ -311,25 +320,19 @@ pub const FAKE_ACCOUNT_ID: &str = "12345"; //////////////////////////////////////////////////////////////////////////////// -newtype_str!( +newtype_istr!( AccountName, Grammar::match_account_name, AccountNameSerdeVisitor ); -impl AccountName { - pub fn lowercase_eq(&self, other: &Self) -> bool { - self.to_lowercase() == other.to_lowercase() - } -} - //////////////////////////////////////////////////////////////////////////////// -newtype_str!(RepoName, Grammar::match_repo_name, RepoNameSerdeVisitor); +newtype_istr!(RepoName, Grammar::match_repo_name, RepoNameSerdeVisitor); //////////////////////////////////////////////////////////////////////////////// -#[derive(Debug, Clone, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] pub struct DatasetAlias { pub account_name: Option, pub dataset_name: DatasetName, @@ -376,31 +379,10 @@ impl DatasetAlias { } } -impl PartialEq for DatasetAlias { - fn eq(&self, other: &Self) -> bool { - ((self.account_name.is_none() && other.account_name.is_none()) - || (self.account_name.is_some() - && other.account_name.is_some() - && self - .account_name - .as_ref() - .unwrap() - .lowercase_eq(other.account_name.as_ref().unwrap()))) - && self.dataset_name.lowercase_eq(&other.dataset_name) - } -} - -impl Hash for DatasetAlias { - fn hash(&self, state: &mut H) { - self.account_name.hash(state); - self.dataset_name.hash(state); - } -} - impl std::str::FromStr for DatasetAlias { type Err = ParseError; fn from_str(s: &str) -> Result { - match Grammar::match_dataset_alias(s) { + match Grammar::match_dataset_alias(&s.to_ascii_lowercase()) { Some((acc, ds, "")) => Ok(Self::new( acc.map(AccountName::new_unchecked), DatasetName::new_unchecked(ds), @@ -427,7 +409,7 @@ impl_serde!(DatasetAlias, DatasetAliasSerdeVisitor); //////////////////////////////////////////////////////////////////////////////// -#[derive(Debug, Clone, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] pub struct DatasetAliasRemote { pub repo_name: RepoName, pub account_name: Option, @@ -476,33 +458,10 @@ impl DatasetAliasRemote { } } -impl PartialEq for DatasetAliasRemote { - fn eq(&self, other: &Self) -> bool { - (self.repo_name == other.repo_name) - && ((self.account_name.is_some() - && other.account_name.is_some() - && self - .account_name - .as_ref() - .unwrap() - .lowercase_eq(other.account_name.as_ref().unwrap())) - || (self.account_name.is_none() && other.account_name.is_none())) - && self.dataset_name.lowercase_eq(&other.dataset_name) - } -} - -impl Hash for DatasetAliasRemote { - fn hash(&self, state: &mut H) { - self.repo_name.hash(state); - self.account_name.hash(state); - self.dataset_name.hash(state); - } -} - impl std::str::FromStr for DatasetAliasRemote { type Err = ParseError; fn from_str(s: &str) -> Result { - match Grammar::match_dataset_alias_remote(s) { + match Grammar::match_dataset_alias_remote(&s.to_ascii_lowercase()) { Some((repo, acc, ds, "")) => Ok(Self::new( RepoName::new_unchecked(repo), acc.map(AccountName::new_unchecked), diff --git a/src/domain/opendatafabric/src/lib.rs b/src/domain/opendatafabric/src/lib.rs index 6c6ef631db..3c2dfceef2 100644 --- a/src/domain/opendatafabric/src/lib.rs +++ b/src/domain/opendatafabric/src/lib.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. #![feature(error_generic_member_access)] +#![feature(let_chains)] pub mod dtos; pub use dtos::*; diff --git a/src/infra/core/src/repos/dataset_repository_local_fs.rs b/src/infra/core/src/repos/dataset_repository_local_fs.rs index 8d4f347cee..a6840487ea 100644 --- a/src/infra/core/src/repos/dataset_repository_local_fs.rs +++ b/src/infra/core/src/repos/dataset_repository_local_fs.rs @@ -516,7 +516,7 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { } fn get_datasets_by_owner(&self, account_name: AccountName) -> DatasetHandleStream<'_> { - if account_name.to_lowercase() == DEFAULT_ACCOUNT_NAME { + if account_name == DEFAULT_ACCOUNT_NAME { self.get_all_datasets() } else { Box::pin(futures::stream::empty()) @@ -529,8 +529,7 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { ) -> Result { assert!( !dataset_alias.is_multi_tenant() - || dataset_alias.account_name.as_ref().unwrap().to_lowercase() - == DEFAULT_ACCOUNT_NAME, + || dataset_alias.account_name.as_ref().unwrap() == DEFAULT_ACCOUNT_NAME, "Multi-tenant refs shouldn't have reached down to here with earlier validations" ); @@ -810,10 +809,7 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy { ) .await?; - if candidate_dataset_alias - .dataset_name - .lowercase_eq(&dataset_alias.dataset_name) - { + if candidate_dataset_alias.dataset_name == dataset_alias.dataset_name { return Ok(DatasetHandle::new(dataset_id, candidate_dataset_alias)); } } diff --git a/src/infra/core/src/repos/dataset_repository_s3.rs b/src/infra/core/src/repos/dataset_repository_s3.rs index 9d6c6e72fb..e7ed7d4dce 100644 --- a/src/infra/core/src/repos/dataset_repository_s3.rs +++ b/src/infra/core/src/repos/dataset_repository_s3.rs @@ -197,13 +197,13 @@ impl DatasetRepository for DatasetRepositoryS3 { } fn get_datasets_by_owner(&self, account_name: AccountName) -> DatasetHandleStream<'_> { - if !self.is_multi_tenant() && account_name.to_lowercase() != DEFAULT_ACCOUNT_NAME { + if !self.is_multi_tenant() && account_name != DEFAULT_ACCOUNT_NAME { return Box::pin(futures::stream::empty()); } self.stream_datasets_if(move |dataset_alias| { if let Some(dataset_account_name) = &dataset_alias.account_name { - dataset_account_name.to_lowercase() == account_name.to_lowercase() + dataset_account_name == &account_name } else { true } diff --git a/src/infra/core/src/utils/datasets_filtering.rs b/src/infra/core/src/utils/datasets_filtering.rs index 7c3171bfb4..21838447c4 100644 --- a/src/infra/core/src/utils/datasets_filtering.rs +++ b/src/infra/core/src/utils/datasets_filtering.rs @@ -8,7 +8,6 @@ // by the Apache License, Version 2.0. use std::pin::Pin; -use std::str::FromStr; use std::sync::Arc; use futures::{future, StreamExt, TryStreamExt}; @@ -154,15 +153,14 @@ pub fn matches_remote_ref_pattern( match remote_ref_pattern { DatasetRefAnyPattern::Ref(_) | DatasetRefAnyPattern::PatternLocal(_) => unreachable!(), DatasetRefAnyPattern::PatternAmbiguous(repo_name, dataset_name_pattern) => { - let repo_name = RepoName::from_str(&repo_name.pattern).unwrap(); + let repo_name = RepoName::new_unchecked(&repo_name.pattern); repo_name == dataset_alias_remote.repo_name && dataset_name_pattern.matches(&dataset_alias_remote.dataset_name) } DatasetRefAnyPattern::PatternRemote(repo_name, account_name, dataset_name_pattern) => { repo_name == &dataset_alias_remote.repo_name && (dataset_alias_remote.account_name.is_some() - && account_name - .lowercase_eq(dataset_alias_remote.account_name.as_ref().unwrap())) + && account_name == dataset_alias_remote.account_name.as_ref().unwrap()) && dataset_name_pattern.matches(&dataset_alias_remote.dataset_name) } } @@ -197,9 +195,9 @@ pub fn matches_local_ref_pattern( dataset_name_pattern.matches(&dataset_handle.alias.dataset_name) } DatasetRefAnyPattern::PatternAmbiguous(account_name, dataset_name_pattern) => { - let account_name = AccountName::from_str(&account_name.pattern).unwrap(); + let account_name = AccountName::new_unchecked(&account_name.pattern); (dataset_handle.alias.account_name.is_some() - && account_name.lowercase_eq(dataset_handle.alias.account_name.as_ref().unwrap())) + && &account_name == dataset_handle.alias.account_name.as_ref().unwrap()) && dataset_name_pattern.matches(&dataset_handle.alias.dataset_name) } } diff --git a/src/infra/core/tests/tests/repos/test_dataset_repository_local_fs.rs b/src/infra/core/tests/tests/repos/test_dataset_repository_local_fs.rs index 6ab53836ae..85ccbe3d42 100644 --- a/src/infra/core/tests/tests/repos/test_dataset_repository_local_fs.rs +++ b/src/infra/core/tests/tests/repos/test_dataset_repository_local_fs.rs @@ -299,3 +299,39 @@ async fn test_iterate_datasets_multi_tenant() { } ///////////////////////////////////////////////////////////////////////////////////////// + +#[tokio::test] +async fn test_create_and_get_case_insensetive_dataset() { + let tempdir = tempfile::tempdir().unwrap(); + let harness = LocalFsRepoHarness::create( + &tempdir, + auth::AlwaysHappyDatasetActionAuthorizer::new(), + false, + ); + + test_dataset_repository_shared::test_create_and_get_case_insensetive_dataset( + harness.dataset_repo.as_ref(), + None, + ) + .await; +} + +///////////////////////////////////////////////////////////////////////////////////////// + +#[tokio::test] +async fn test_create_and_get_case_insensetive_dataset_multi_tenant() { + let tempdir = tempfile::tempdir().unwrap(); + let harness = LocalFsRepoHarness::create( + &tempdir, + auth::AlwaysHappyDatasetActionAuthorizer::new(), + true, + ); + + test_dataset_repository_shared::test_create_and_get_case_insensetive_dataset( + harness.dataset_repo.as_ref(), + Some(AccountName::new_unchecked(auth::DEFAULT_ACCOUNT_NAME)), + ) + .await; +} + +///////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/core/tests/tests/repos/test_dataset_repository_s3.rs b/src/infra/core/tests/tests/repos/test_dataset_repository_s3.rs index a16e9ff5d8..9e9a754e33 100644 --- a/src/infra/core/tests/tests/repos/test_dataset_repository_s3.rs +++ b/src/infra/core/tests/tests/repos/test_dataset_repository_s3.rs @@ -295,3 +295,19 @@ async fn test_iterate_datasets_multi_tenant() { } ///////////////////////////////////////////////////////////////////////////////////////// + +#[test_group::group(containerized)] +#[tokio::test] +async fn test_create_and_get_case_insensetive_dataset() { + let s3 = LocalS3Server::new().await; + let harness = + S3RepoHarness::create(&s3, auth::AlwaysHappyDatasetActionAuthorizer::new(), false).await; + + test_dataset_repository_shared::test_create_and_get_case_insensetive_dataset( + harness.dataset_repo.as_ref(), + None, + ) + .await; +} + +///////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/core/tests/tests/repos/test_dataset_repository_shared.rs b/src/infra/core/tests/tests/repos/test_dataset_repository_shared.rs index 2140a07e95..57d1e0cd84 100644 --- a/src/infra/core/tests/tests/repos/test_dataset_repository_shared.rs +++ b/src/infra/core/tests/tests/repos/test_dataset_repository_shared.rs @@ -61,6 +61,66 @@ pub async fn test_create_dataset(repo: &dyn DatasetRepository, account_name: Opt ///////////////////////////////////////////////////////////////////////////////////////// +pub async fn test_create_and_get_case_insensetive_dataset( + repo: &dyn DatasetRepository, + account_name: Option, +) { + let dataset_alias_to_create = + DatasetAlias::new(account_name.clone(), DatasetName::new_unchecked("foo")); + + assert_matches!( + repo.get_dataset(&dataset_alias_to_create.as_local_ref()) + .await + .err() + .unwrap(), + GetDatasetError::NotFound(_) + ); + + let create_result = repo + .create_dataset( + &dataset_alias_to_create, + MetadataFactory::metadata_block(MetadataFactory::seed(DatasetKind::Root).build()) + .build_typed(), + ) + .await + .unwrap(); + + assert_eq!(create_result.dataset_handle.alias, dataset_alias_to_create); + + let account_name_uppercase = if account_name.is_some() { + Some(AccountName::new_unchecked( + &account_name.unwrap().to_ascii_uppercase(), + )) + } else { + None + }; + + let dataset_alias_in_another_registry = + DatasetAlias::new(account_name_uppercase, DatasetName::new_unchecked("foO")); + + // We should see the dataset + assert!(repo + .get_dataset(&dataset_alias_in_another_registry.as_local_ref()) + .await + .is_ok()); + + // Now test name collision + let create_result = repo + .create_dataset( + &dataset_alias_in_another_registry, + MetadataFactory::metadata_block(MetadataFactory::seed(DatasetKind::Root).build()) + .build_typed(), + ) + .await; + + assert_matches!( + create_result.err(), + Some(CreateDatasetError::NameCollision(_)) + ); +} + +///////////////////////////////////////////////////////////////////////////////////////// + pub async fn test_create_dataset_same_name_multiple_tenants(repo: &dyn DatasetRepository) { let dataset_alias_my = DatasetAlias::new( Some(AccountName::new_unchecked("my")), From c4788bab98339e182ff933b98ad6a44ad04d7814 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Thu, 7 Mar 2024 19:38:58 +0100 Subject: [PATCH 04/16] Add tests --- .../graphql/tests/tests/test_accounts.rs | 32 +++++++++- .../graphql/tests/tests/test_gql_data.rs | 59 +++++++++++++++++++ src/adapter/http/tests/tests/test_routing.rs | 26 ++++++++ .../src/services/accounts/account_service.rs | 8 +-- .../src/identity/dataset_identity.rs | 20 ++----- .../tests/tests/test_dataset_identity.rs | 2 +- .../tests/tests/test_dataset_refs.rs | 12 ++++ .../tests/repos/test_dataset_repository_s3.rs | 16 +++++ .../repos/test_dataset_repository_shared.rs | 10 +--- 9 files changed, 156 insertions(+), 29 deletions(-) diff --git a/src/adapter/graphql/tests/tests/test_accounts.rs b/src/adapter/graphql/tests/tests/test_accounts.rs index 4e26fa49fc..355b07b447 100644 --- a/src/adapter/graphql/tests/tests/test_accounts.rs +++ b/src/adapter/graphql/tests/tests/test_accounts.rs @@ -76,7 +76,7 @@ async fn test_account_by_name() { "#, "unknown", )) - .data(cat), + .data(cat.clone()), ) .await; @@ -89,6 +89,36 @@ async fn test_account_by_name() { } }) ); + + let res = schema + .execute( + async_graphql::Request::new(format!( + r#" + query {{ + accounts {{ + byName (name: "{}") {{ + accountName + }} + }} + }} + "#, + DEFAULT_ACCOUNT_NAME.to_ascii_uppercase(), + )) + .data(cat), + ) + .await; + + assert!(res.is_ok(), "{res:?}"); + assert_eq!( + res.data, + value!({ + "accounts": { + "byName": { + "accountName": DEFAULT_ACCOUNT_NAME + } + } + }) + ); } //////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/tests/tests/test_gql_data.rs b/src/adapter/graphql/tests/tests/test_gql_data.rs index 27c5865c6c..7dca80d03d 100644 --- a/src/adapter/graphql/tests/tests/test_gql_data.rs +++ b/src/adapter/graphql/tests/tests/test_gql_data.rs @@ -158,6 +158,65 @@ async fn test_dataset_schema_local_fs() { ///////////////////////////////////////////////////////////////////////////////////////// +#[test_group::group(engine, datafusion)] +#[test_log::test(tokio::test)] +async fn test_dataset_case_insensetive_schema_local_fs() { + let tempdir = tempfile::tempdir().unwrap(); + let catalog = create_catalog_with_local_workspace(tempdir.path()); + create_test_dataset(&catalog, tempdir.path()).await; + + let schema = kamu_adapter_graphql::schema_quiet(); + let res = schema + .execute( + async_graphql::Request::new(indoc::indoc!( + r#" + { + datasets { + byOwnerAndName(accountName: "kAmU", datasetName: "FOo") { + name + data { + tail(limit: 1, schemaFormat: PARQUET_JSON, dataFormat: JSON) { + ... on DataQueryResultSuccess { + schema { content } + } + } + } + } + } + } + "# + )) + .data(catalog), + ) + .await; + assert!(res.is_ok(), "{res:?}"); + let json = serde_json::to_string(&res.data).unwrap(); + let json = serde_json::from_str::(&json).unwrap(); + let data_schema = &json["datasets"]["byOwnerAndName"]["data"]["tail"]["schema"]["content"]; + let data_schema = + serde_json::from_str::(data_schema.as_str().unwrap()).unwrap(); + assert_eq!( + data_schema, + serde_json::json!({ + "name": "arrow_schema", + "type": "struct", + "fields": [{ + "name": "offset", + "repetition": "REQUIRED", + "type": "INT64", + "logicalType": "INTEGER(64,false)" + }, { + "name": "blah", + "repetition": "REQUIRED", + "type": "BYTE_ARRAY", + "logicalType": "STRING" + }] + }) + ); +} + +///////////////////////////////////////////////////////////////////////////////////////// + #[test_group::group(engine, datafusion)] #[test_log::test(tokio::test)] async fn test_dataset_tail_local_fs() { diff --git a/src/adapter/http/tests/tests/test_routing.rs b/src/adapter/http/tests/tests/test_routing.rs index a0b0ce7b59..904e170810 100644 --- a/src/adapter/http/tests/tests/test_routing.rs +++ b/src/adapter/http/tests/tests/test_routing.rs @@ -209,6 +209,32 @@ async fn test_routing_dataset_name() { await_client_server_flow!(server, client); } +#[test_log::test(tokio::test)] +async fn test_routing_dataset_name_case_insensetive() { + let repo = setup_repo().await; + + let server = setup_server( + repo.catalog, + "/:dataset_name", + |Path(p): Path| DatasetAlias::new(None, p.dataset_name).into_local_ref(), + ); + + let dataset_url = url::Url::parse(&format!( + "http://{}/{}/", + server.local_addr(), + repo.created_dataset + .dataset_handle + .alias + .dataset_name + .to_ascii_uppercase() + )) + .unwrap(); + + let client = setup_client(dataset_url, repo.created_dataset.head); + + await_client_server_flow!(server, client); +} + ///////////////////////////////////////////////////////////////////////////////////////// #[allow(dead_code)] diff --git a/src/app/cli/src/services/accounts/account_service.rs b/src/app/cli/src/services/accounts/account_service.rs index acf99a7926..93dd447216 100644 --- a/src/app/cli/src/services/accounts/account_service.rs +++ b/src/app/cli/src/services/accounts/account_service.rs @@ -120,11 +120,9 @@ impl AccountService { RelatedAccountIndication::new(target_account) } - fn find_account_info_impl(&self, account_name: &String) -> Option { + fn find_account_info_impl(&self, account_name: &AccountName) -> Option { // The account might be predefined in the configuration - self.predefined_accounts - .get(&account_name.to_ascii_lowercase()) - .cloned() + self.predefined_accounts.get(account_name.as_str()).cloned() } fn get_account_info_impl( @@ -231,7 +229,7 @@ impl auth::AuthenticationProvider for AccountService { &'a self, account_name: &'a AccountName, ) -> Result, InternalError> { - Ok(self.find_account_info_impl(&account_name.into())) + Ok(self.find_account_info_impl(account_name)) } } diff --git a/src/domain/opendatafabric/src/identity/dataset_identity.rs b/src/domain/opendatafabric/src/identity/dataset_identity.rs index c2cf8ecba0..b148b389f0 100644 --- a/src/domain/opendatafabric/src/identity/dataset_identity.rs +++ b/src/domain/opendatafabric/src/identity/dataset_identity.rs @@ -112,7 +112,7 @@ use like::ILike; macro_rules! newtype_istr { ($typ:ident, $parse:expr, $visitor:ident) => { - #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct $typ(Arc); impl $typ { @@ -132,7 +132,7 @@ macro_rules! newtype_istr { Self(s) } - pub fn into_lowercase<'a>(s: &'a str) -> Cow<'a, str> { + pub fn into_lowercase(s: &str) -> Cow<'_, str> { let bytes = s.as_bytes(); if !bytes.iter().any(u8::is_ascii_uppercase) { Cow::Borrowed(s) @@ -268,20 +268,10 @@ impl DatasetAliasPattern { } pub fn matches(&self, dataset_handle: &DatasetHandle) -> bool { - if self.account_name.is_some() && dataset_handle.alias.account_name.is_none() - || self.account_name.is_none() && dataset_handle.alias.account_name.is_some() - { - return false; - } - if self.account_name.as_ref().unwrap() - == dataset_handle.alias.account_name.as_ref().unwrap() + self.account_name == dataset_handle.alias.account_name && self .dataset_name_pattern .matches(&dataset_handle.alias.dataset_name) - { - return true; - } - false } } @@ -332,7 +322,7 @@ newtype_istr!(RepoName, Grammar::match_repo_name, RepoNameSerdeVisitor); //////////////////////////////////////////////////////////////////////////////// -#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct DatasetAlias { pub account_name: Option, pub dataset_name: DatasetName, @@ -409,7 +399,7 @@ impl_serde!(DatasetAlias, DatasetAliasSerdeVisitor); //////////////////////////////////////////////////////////////////////////////// -#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct DatasetAliasRemote { pub repo_name: RepoName, pub account_name: Option, diff --git a/src/domain/opendatafabric/tests/tests/test_dataset_identity.rs b/src/domain/opendatafabric/tests/tests/test_dataset_identity.rs index a0149ab626..01901f7058 100644 --- a/src/domain/opendatafabric/tests/tests/test_dataset_identity.rs +++ b/src/domain/opendatafabric/tests/tests/test_dataset_identity.rs @@ -276,7 +276,7 @@ fn test_dataset_remote_alias_eq() { DatasetAliasRemote::from_str("repository/account/net.example.com").unwrap(), DatasetAliasRemote::from_str("repository/AccOuNt/net.ExaMplE.coM").unwrap(), ); - assert_ne!( + assert_eq!( DatasetAliasRemote::from_str("repository/net.example.com").unwrap(), DatasetAliasRemote::from_str("rEpoSitOry/net.ExaMplE.coM").unwrap(), ); diff --git a/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs b/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs index 045c440b70..e310de5833 100644 --- a/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs +++ b/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs @@ -169,6 +169,18 @@ fn test_dataset_ref_pattern_match() { }, }; assert!(pattern.matches(&dataset_handle)); + + let expression = "nEt.eXample%"; + let dataset_name = "net.example.com"; + let pattern = DatasetRefPattern::from_str(expression).unwrap(); + let dataset_handle = DatasetHandle { + id: default_dataset_id.clone(), + alias: DatasetAlias { + account_name: None, + dataset_name: DatasetName::from_str(dataset_name).unwrap(), + }, + }; + assert!(pattern.matches(&dataset_handle)); } #[test] diff --git a/src/infra/core/tests/tests/repos/test_dataset_repository_s3.rs b/src/infra/core/tests/tests/repos/test_dataset_repository_s3.rs index 9e9a754e33..80d55f9ae7 100644 --- a/src/infra/core/tests/tests/repos/test_dataset_repository_s3.rs +++ b/src/infra/core/tests/tests/repos/test_dataset_repository_s3.rs @@ -311,3 +311,19 @@ async fn test_create_and_get_case_insensetive_dataset() { } ///////////////////////////////////////////////////////////////////////////////////////// + +#[test_group::group(containerized)] +#[tokio::test] +async fn test_create_and_get_case_insensetive_dataset_multi_tenant() { + let s3 = LocalS3Server::new().await; + let harness = + S3RepoHarness::create(&s3, auth::AlwaysHappyDatasetActionAuthorizer::new(), true).await; + + test_dataset_repository_shared::test_create_and_get_case_insensetive_dataset( + harness.dataset_repo.as_ref(), + Some(AccountName::new_unchecked(auth::DEFAULT_ACCOUNT_NAME)), + ) + .await; +} + +///////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/core/tests/tests/repos/test_dataset_repository_shared.rs b/src/infra/core/tests/tests/repos/test_dataset_repository_shared.rs index 57d1e0cd84..8d095f8736 100644 --- a/src/infra/core/tests/tests/repos/test_dataset_repository_shared.rs +++ b/src/infra/core/tests/tests/repos/test_dataset_repository_shared.rs @@ -87,13 +87,9 @@ pub async fn test_create_and_get_case_insensetive_dataset( assert_eq!(create_result.dataset_handle.alias, dataset_alias_to_create); - let account_name_uppercase = if account_name.is_some() { - Some(AccountName::new_unchecked( - &account_name.unwrap().to_ascii_uppercase(), - )) - } else { - None - }; + let account_name_uppercase = account_name.map(|account_name_value| { + AccountName::new_unchecked(&account_name_value.to_ascii_uppercase()) + }); let dataset_alias_in_another_registry = DatasetAlias::new(account_name_uppercase, DatasetName::new_unchecked("foO")); From 96771a24079a41075854d9d7d10ca2d6f630c2b6 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Mon, 25 Mar 2024 16:23:40 +0100 Subject: [PATCH 05/16] Add case insensetive comparing --- .../graphql/src/queries/datasets/datasets.rs | 5 +- .../graphql/tests/tests/test_accounts.rs | 45 +++++---- .../graphql/tests/tests/test_gql_datasets.rs | 97 ++++++++++++++----- .../src/services/accounts/account_service.rs | 23 +++-- .../src/identity/dataset_identity.rs | 45 +++++++-- .../tests/repos/test_dataset_repository_s3.rs | 18 +++- 6 files changed, 164 insertions(+), 69 deletions(-) diff --git a/src/adapter/graphql/src/queries/datasets/datasets.rs b/src/adapter/graphql/src/queries/datasets/datasets.rs index a05de1ea4a..5382bf4d6b 100644 --- a/src/adapter/graphql/src/queries/datasets/datasets.rs +++ b/src/adapter/graphql/src/queries/datasets/datasets.rs @@ -39,15 +39,14 @@ impl Datasets { account_name: AccountName, dataset_name: DatasetName, ) -> Result> { - let account = Account::from_account_name(account_name.clone().into()); - let dataset_alias = odf::DatasetAlias::new(Some(account_name.into()), dataset_name.into()); let dataset_repo = from_catalog::(ctx).unwrap(); let hdl = dataset_repo .try_resolve_dataset_ref(&dataset_alias.into_local_ref()) .await?; - Ok(hdl.map(|h| Dataset::new(account, h))) + + Ok(hdl.map(|h| Dataset::new(Account::from_dataset_alias(ctx, &h.alias), h))) } #[graphql(skip)] diff --git a/src/adapter/graphql/tests/tests/test_accounts.rs b/src/adapter/graphql/tests/tests/test_accounts.rs index 355b07b447..61f44add89 100644 --- a/src/adapter/graphql/tests/tests/test_accounts.rs +++ b/src/adapter/graphql/tests/tests/test_accounts.rs @@ -17,20 +17,7 @@ use opendatafabric::AccountName; #[test_log::test(tokio::test)] async fn test_account_by_name() { - let mut mock_authentication_service = MockAuthenticationService::new(); - mock_authentication_service - .expect_find_account_info_by_name() - .with(eq(AccountName::new_unchecked(DEFAULT_ACCOUNT_NAME))) - .returning(|_| Ok(Some(AccountInfo::dummy()))); - mock_authentication_service - .expect_find_account_info_by_name() - .with(eq(AccountName::new_unchecked("unknown"))) - .returning(|_| Ok(None)); - - let cat = dill::CatalogBuilder::new() - .add_value(mock_authentication_service) - .bind::() - .build(); + let harness = GraphQLAccountsHarness::new(); let schema = kamu_adapter_graphql::schema_quiet(); let res = schema @@ -46,7 +33,7 @@ async fn test_account_by_name() { }} "#, )) - .data(cat.clone()), + .data(harness.catalog.clone()), ) .await; @@ -76,7 +63,7 @@ async fn test_account_by_name() { "#, "unknown", )) - .data(cat.clone()), + .data(harness.catalog.clone()), ) .await; @@ -104,7 +91,7 @@ async fn test_account_by_name() { "#, DEFAULT_ACCOUNT_NAME.to_ascii_uppercase(), )) - .data(cat), + .data(harness.catalog), ) .await; @@ -122,3 +109,27 @@ async fn test_account_by_name() { } //////////////////////////////////////////////////////////////////////////////////////// + +struct GraphQLAccountsHarness { + catalog: dill::Catalog, +} + +impl GraphQLAccountsHarness { + pub fn new() -> Self { + let mut mock_authentication_service = MockAuthenticationService::new(); + mock_authentication_service + .expect_find_account_info_by_name() + .with(eq(AccountName::new_unchecked(DEFAULT_ACCOUNT_NAME))) + .returning(|_| Ok(Some(AccountInfo::dummy()))); + mock_authentication_service + .expect_find_account_info_by_name() + .with(eq(AccountName::new_unchecked("unknown"))) + .returning(|_| Ok(None)); + let catalog = dill::CatalogBuilder::new() + .add_value(mock_authentication_service) + .bind::() + .build(); + + Self { catalog } + } +} diff --git a/src/adapter/graphql/tests/tests/test_gql_datasets.rs b/src/adapter/graphql/tests/tests/test_gql_datasets.rs index b704495ac7..544b182dba 100644 --- a/src/adapter/graphql/tests/tests/test_gql_datasets.rs +++ b/src/adapter/graphql/tests/tests/test_gql_datasets.rs @@ -24,7 +24,7 @@ use crate::utils::{authentication_catalogs, expect_anonymous_access_error}; #[test_log::test(tokio::test)] async fn dataset_by_id_does_not_exist() { - let harness = GraphQLDatasetsHarness::new(); + let harness = GraphQLDatasetsHarness::new(false); let res = harness.execute_anonymous_query(indoc!( r#" { @@ -52,10 +52,10 @@ async fn dataset_by_id_does_not_exist() { #[test_log::test(tokio::test)] async fn dataset_by_id() { - let harness = GraphQLDatasetsHarness::new(); + let harness = GraphQLDatasetsHarness::new(false); let foo_result = harness - .create_root_dataset(DatasetName::new_unchecked("foo")) + .create_root_dataset(None, DatasetName::new_unchecked("foo")) .await; let res = harness @@ -92,9 +92,56 @@ async fn dataset_by_id() { //////////////////////////////////////////////////////////////////////////////////////// +#[test_log::test(tokio::test)] +async fn dataset_by_account_and_name_case_insensetive() { + let harness = GraphQLDatasetsHarness::new(true); + + let account_name_str = "KaMu"; + harness + .create_root_dataset( + Some(AccountName::new_unchecked(account_name_str)), + DatasetName::new_unchecked("Foo"), + ) + .await; + + let res = harness + .execute_anonymous_query( + indoc!( + r#" + { + datasets { + byOwnerAndName(accountName: "kAmU", datasetName: "") { + name, + owner { accountName }, + } + } + } + "# + ) + .replace("", "FoO"), + ) + .await; + assert!(res.is_ok(), "{res:?}"); + assert_eq!( + res.data, + value!({ + "datasets": { + "byOwnerAndName": { + "name": "Foo", + "owner": { + "accountName": account_name_str, + } + } + } + }) + ); +} + +//////////////////////////////////////////////////////////////////////////////////////// + #[test_log::test(tokio::test)] async fn dataset_create_empty() { - let harness = GraphQLDatasetsHarness::new(); + let harness = GraphQLDatasetsHarness::new(false); let request_code = indoc::indoc!( r#" @@ -136,7 +183,7 @@ async fn dataset_create_empty() { #[test_log::test(tokio::test)] async fn dataset_create_from_snapshot() { - let harness = GraphQLDatasetsHarness::new(); + let harness = GraphQLDatasetsHarness::new(true); let snapshot = MetadataFactory::dataset_snapshot() .name("foo") @@ -192,7 +239,7 @@ async fn dataset_create_from_snapshot() { #[test_log::test(tokio::test)] async fn dataset_create_from_snapshot_malformed() { - let harness = GraphQLDatasetsHarness::new(); + let harness = GraphQLDatasetsHarness::new(false); let res = harness .execute_authorized_query(indoc!( @@ -226,10 +273,10 @@ async fn dataset_create_from_snapshot_malformed() { #[test_log::test(tokio::test)] async fn dataset_rename_success() { - let harness = GraphQLDatasetsHarness::new(); + let harness = GraphQLDatasetsHarness::new(false); let foo_result = harness - .create_root_dataset(DatasetName::new_unchecked("foo")) + .create_root_dataset(None, DatasetName::new_unchecked("foo")) .await; let request_code = indoc!( @@ -278,10 +325,10 @@ async fn dataset_rename_success() { #[test_log::test(tokio::test)] async fn dataset_rename_no_changes() { - let harness = GraphQLDatasetsHarness::new(); + let harness = GraphQLDatasetsHarness::new(false); let foo_result = harness - .create_root_dataset(DatasetName::new_unchecked("foo")) + .create_root_dataset(None, DatasetName::new_unchecked("foo")) .await; let res = harness @@ -328,13 +375,13 @@ async fn dataset_rename_no_changes() { #[test_log::test(tokio::test)] async fn dataset_rename_name_collision() { - let harness = GraphQLDatasetsHarness::new(); + let harness = GraphQLDatasetsHarness::new(false); let foo_result = harness - .create_root_dataset(DatasetName::new_unchecked("foo")) + .create_root_dataset(None, DatasetName::new_unchecked("foo")) .await; let _bar_result = harness - .create_root_dataset(DatasetName::new_unchecked("bar")) + .create_root_dataset(None, DatasetName::new_unchecked("bar")) .await; let res = harness @@ -381,11 +428,11 @@ async fn dataset_rename_name_collision() { #[test_log::test(tokio::test)] async fn dataset_delete_success() { - let harness = GraphQLDatasetsHarness::new(); + let harness = GraphQLDatasetsHarness::new(false); harness.init_dependencies_graph().await; let foo_result = harness - .create_root_dataset(DatasetName::new_unchecked("foo")) + .create_root_dataset(None, DatasetName::new_unchecked("foo")) .await; let request_code = indoc!( @@ -431,11 +478,11 @@ async fn dataset_delete_success() { #[test_log::test(tokio::test)] async fn dataset_delete_dangling_ref() { - let harness = GraphQLDatasetsHarness::new(); + let harness = GraphQLDatasetsHarness::new(false); harness.init_dependencies_graph().await; let foo_result = harness - .create_root_dataset(DatasetName::new_unchecked("foo")) + .create_root_dataset(None, DatasetName::new_unchecked("foo")) .await; let _bar_result = harness .create_derived_dataset( @@ -489,10 +536,10 @@ async fn dataset_delete_dangling_ref() { #[test_log::test(tokio::test)] async fn dataset_view_permissions() { - let harness = GraphQLDatasetsHarness::new(); + let harness = GraphQLDatasetsHarness::new(false); let foo_result = harness - .create_root_dataset(DatasetName::new_unchecked("foo")) + .create_root_dataset(None, DatasetName::new_unchecked("foo")) .await; let request_code = indoc!( @@ -544,7 +591,7 @@ struct GraphQLDatasetsHarness { } impl GraphQLDatasetsHarness { - pub fn new() -> Self { + pub fn new(is_multi_tenant: bool) -> Self { let tempdir = tempfile::tempdir().unwrap(); let datasets_dir = tempdir.path().join("datasets"); std::fs::create_dir(&datasets_dir).unwrap(); @@ -555,7 +602,7 @@ impl GraphQLDatasetsHarness { .add_builder( DatasetRepositoryLocalFs::builder() .with_root(datasets_dir) - .with_multi_tenant(false), + .with_multi_tenant(is_multi_tenant), ) .bind::() .add_value(kamu::testing::MockAuthenticationService::built_in()) @@ -588,7 +635,11 @@ impl GraphQLDatasetsHarness { .unwrap(); } - pub async fn create_root_dataset(&self, name: DatasetName) -> CreateDatasetResult { + pub async fn create_root_dataset( + &self, + account_name: Option, + name: DatasetName, + ) -> CreateDatasetResult { let dataset_repo = self .catalog_authorized .get_one::() @@ -596,7 +647,7 @@ impl GraphQLDatasetsHarness { dataset_repo .create_dataset_from_snapshot( MetadataFactory::dataset_snapshot() - .name(DatasetAlias::new(None, name)) + .name(DatasetAlias::new(account_name, name)) .kind(DatasetKind::Root) .push_event(MetadataFactory::set_polling_source().build()) .build(), diff --git a/src/app/cli/src/services/accounts/account_service.rs b/src/app/cli/src/services/accounts/account_service.rs index 93dd447216..cf2f085706 100644 --- a/src/app/cli/src/services/accounts/account_service.rs +++ b/src/app/cli/src/services/accounts/account_service.rs @@ -27,7 +27,7 @@ pub const LOGIN_METHOD_PASSWORD: &str = "password"; ///////////////////////////////////////////////////////////////////////////////////////// pub struct AccountService { - pub predefined_accounts: HashMap, + pub predefined_accounts: HashMap, pub allow_login_unknown: bool, } @@ -40,7 +40,7 @@ impl AccountService { for predefined_account in &users_config.predefined { predefined_accounts.insert( - predefined_account.account_name.to_string(), + predefined_account.account_name.clone(), predefined_account.clone(), ); } @@ -122,18 +122,15 @@ impl AccountService { fn find_account_info_impl(&self, account_name: &AccountName) -> Option { // The account might be predefined in the configuration - self.predefined_accounts.get(account_name.as_str()).cloned() + self.predefined_accounts.get(account_name).cloned() } fn get_account_info_impl( &self, - account_name: &String, + account_name: &AccountName, ) -> Result { // The account might be predefined in the configuration - match self - .predefined_accounts - .get(&account_name.to_ascii_lowercase()) - { + match self.predefined_accounts.get(account_name) { // Use the predefined record Some(account_info) => Ok(account_info.clone()), @@ -143,9 +140,9 @@ impl AccountService { if self.allow_login_unknown { Ok(AccountInfo { account_id: FAKE_ACCOUNT_ID.to_string(), - account_name: AccountName::new_unchecked(account_name), + account_name: account_name.clone(), account_type: AccountType::User, - display_name: account_name.clone(), + display_name: account_name.to_string(), avatar_url: None, is_admin: false, }) @@ -193,7 +190,9 @@ impl auth::AuthenticationProvider for AccountService { // The account might be predefined in the configuration let account_info = self - .get_account_info_impl(&password_login_credentials.login) + .get_account_info_impl(&AccountName::new_unchecked( + &password_login_credentials.login, + )) .map_err(auth::ProviderLoginError::RejectedCredentials)?; // Store login as provider credentials @@ -219,7 +218,7 @@ impl auth::AuthenticationProvider for AccountService { .int_err()?; let account_info = self - .get_account_info_impl(&provider_credentials.account_name.to_string()) + .get_account_info_impl(&provider_credentials.account_name) .int_err()?; Ok(account_info) diff --git a/src/domain/opendatafabric/src/identity/dataset_identity.rs b/src/domain/opendatafabric/src/identity/dataset_identity.rs index b148b389f0..00c3f8ea9a 100644 --- a/src/domain/opendatafabric/src/identity/dataset_identity.rs +++ b/src/domain/opendatafabric/src/identity/dataset_identity.rs @@ -110,14 +110,14 @@ use like::ILike; //////////////////////////////////////////////////////////////////////////////// -macro_rules! newtype_istr { +macro_rules! newtype_str { ($typ:ident, $parse:expr, $visitor:ident) => { - #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] + #[derive(Debug, Clone, Eq)] pub struct $typ(Arc); impl $typ { pub fn new_unchecked + ?Sized>(s: &S) -> Self { - Self(Arc::from(Self::into_lowercase(s.as_ref()))) + Self(Arc::from(s.as_ref())) } pub fn as_str(&self) -> &str { @@ -163,13 +163,38 @@ macro_rules! newtype_istr { impl std::str::FromStr for $typ { type Err = ::multiformats::ParseError<$typ>; fn from_str(s: &str) -> Result { - match $parse(&$typ::into_lowercase(s)) { + match $parse(s) { Some((_, "")) => Ok(Self::new_unchecked(s)), _ => Err(ParseError::new(s)), } } } + impl PartialEq for $typ { + fn eq(&self, other: &$typ) -> bool { + self.eq_ignore_ascii_case(other) + } + } + + impl Hash for $typ { + fn hash(&self, state: &mut H) { + Self::into_lowercase(&self.0).hash(state); + Self::into_lowercase(&self.0).hash(state); + } + } + + impl PartialOrd for $typ { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl Ord for $typ { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + Self::into_lowercase(self).cmp(&Self::into_lowercase(other)) + } + } + impl ops::Deref for $typ { type Target = str; @@ -218,7 +243,7 @@ macro_rules! newtype_istr { //////////////////////////////////////////////////////////////////////////////// -newtype_istr!( +newtype_str!( DatasetName, Grammar::match_dataset_name, DatasetNameSerdeVisitor @@ -236,7 +261,7 @@ impl DatasetName { /////////////////////////////////////////////////////////////////////////////// -newtype_istr!( +newtype_str!( DatasetNamePattern, Grammar::match_dataset_name_pattern, DatasetNamePatternSerdeVisitor @@ -310,7 +335,7 @@ pub const FAKE_ACCOUNT_ID: &str = "12345"; //////////////////////////////////////////////////////////////////////////////// -newtype_istr!( +newtype_str!( AccountName, Grammar::match_account_name, AccountNameSerdeVisitor @@ -318,7 +343,7 @@ newtype_istr!( //////////////////////////////////////////////////////////////////////////////// -newtype_istr!(RepoName, Grammar::match_repo_name, RepoNameSerdeVisitor); +newtype_str!(RepoName, Grammar::match_repo_name, RepoNameSerdeVisitor); //////////////////////////////////////////////////////////////////////////////// @@ -372,7 +397,7 @@ impl DatasetAlias { impl std::str::FromStr for DatasetAlias { type Err = ParseError; fn from_str(s: &str) -> Result { - match Grammar::match_dataset_alias(&s.to_ascii_lowercase()) { + match Grammar::match_dataset_alias(s) { Some((acc, ds, "")) => Ok(Self::new( acc.map(AccountName::new_unchecked), DatasetName::new_unchecked(ds), @@ -451,7 +476,7 @@ impl DatasetAliasRemote { impl std::str::FromStr for DatasetAliasRemote { type Err = ParseError; fn from_str(s: &str) -> Result { - match Grammar::match_dataset_alias_remote(&s.to_ascii_lowercase()) { + match Grammar::match_dataset_alias_remote(s) { Some((repo, acc, ds, "")) => Ok(Self::new( RepoName::new_unchecked(repo), acc.map(AccountName::new_unchecked), diff --git a/src/infra/core/tests/tests/repos/test_dataset_repository_s3.rs b/src/infra/core/tests/tests/repos/test_dataset_repository_s3.rs index e3f27aeaa7..1c5877c704 100644 --- a/src/infra/core/tests/tests/repos/test_dataset_repository_s3.rs +++ b/src/infra/core/tests/tests/repos/test_dataset_repository_s3.rs @@ -401,8 +401,13 @@ async fn test_iterate_datasets_multi_tenant() { #[tokio::test] async fn test_create_and_get_case_insensetive_dataset() { let s3 = LocalS3Server::new().await; - let harness = - S3RepoHarness::create(&s3, auth::AlwaysHappyDatasetActionAuthorizer::new(), false).await; + let harness = S3RepoHarness::create( + &s3, + auth::AlwaysHappyDatasetActionAuthorizer::new(), + false, + false, + ) + .await; test_dataset_repository_shared::test_create_and_get_case_insensetive_dataset( harness.dataset_repo.as_ref(), @@ -417,8 +422,13 @@ async fn test_create_and_get_case_insensetive_dataset() { #[tokio::test] async fn test_create_and_get_case_insensetive_dataset_multi_tenant() { let s3 = LocalS3Server::new().await; - let harness = - S3RepoHarness::create(&s3, auth::AlwaysHappyDatasetActionAuthorizer::new(), true).await; + let harness = S3RepoHarness::create( + &s3, + auth::AlwaysHappyDatasetActionAuthorizer::new(), + true, + false, + ) + .await; test_dataset_repository_shared::test_create_and_get_case_insensetive_dataset( harness.dataset_repo.as_ref(), From c4a6bcf46330e0bfe5ba39a6226144cd24d104e0 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Mon, 25 Mar 2024 18:54:16 +0100 Subject: [PATCH 06/16] Fix tests --- src/adapter/graphql/tests/tests/test_gql_data.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/adapter/graphql/tests/tests/test_gql_data.rs b/src/adapter/graphql/tests/tests/test_gql_data.rs index 7dca80d03d..cef1e5dbcc 100644 --- a/src/adapter/graphql/tests/tests/test_gql_data.rs +++ b/src/adapter/graphql/tests/tests/test_gql_data.rs @@ -22,7 +22,7 @@ use opendatafabric::*; ///////////////////////////////////////////////////////////////////////////////////////// -fn create_catalog_with_local_workspace(tempdir: &Path) -> dill::Catalog { +fn create_catalog_with_local_workspace(tempdir: &Path, is_multitenant: bool) -> dill::Catalog { dill::CatalogBuilder::new() .add::() .add::() @@ -30,7 +30,7 @@ fn create_catalog_with_local_workspace(tempdir: &Path) -> dill::Catalog { DatasetRepositoryLocalFs::builder() .with_root(tempdir.join("datasets")) .with_current_account_subject(Arc::new(CurrentAccountSubject::new_test())) - .with_multi_tenant(false), + .with_multi_tenant(is_multitenant), ) .bind::() .add::() @@ -103,7 +103,7 @@ async fn create_test_dataset(catalog: &dill::Catalog, tempdir: &Path) { #[test_log::test(tokio::test)] async fn test_dataset_schema_local_fs() { let tempdir = tempfile::tempdir().unwrap(); - let catalog = create_catalog_with_local_workspace(tempdir.path()); + let catalog = create_catalog_with_local_workspace(tempdir.path(), false); create_test_dataset(&catalog, tempdir.path()).await; let schema = kamu_adapter_graphql::schema_quiet(); @@ -162,7 +162,7 @@ async fn test_dataset_schema_local_fs() { #[test_log::test(tokio::test)] async fn test_dataset_case_insensetive_schema_local_fs() { let tempdir = tempfile::tempdir().unwrap(); - let catalog = create_catalog_with_local_workspace(tempdir.path()); + let catalog = create_catalog_with_local_workspace(tempdir.path(), true); create_test_dataset(&catalog, tempdir.path()).await; let schema = kamu_adapter_graphql::schema_quiet(); @@ -221,7 +221,7 @@ async fn test_dataset_case_insensetive_schema_local_fs() { #[test_log::test(tokio::test)] async fn test_dataset_tail_local_fs() { let tempdir = tempfile::tempdir().unwrap(); - let catalog = create_catalog_with_local_workspace(tempdir.path()); + let catalog = create_catalog_with_local_workspace(tempdir.path(), false); create_test_dataset(&catalog, tempdir.path()).await; let schema = kamu_adapter_graphql::schema_quiet(); @@ -262,7 +262,7 @@ async fn test_dataset_tail_local_fs() { #[test_log::test(tokio::test)] async fn test_dataset_tail_empty_local_fs() { let tempdir = tempfile::tempdir().unwrap(); - let catalog = create_catalog_with_local_workspace(tempdir.path()); + let catalog = create_catalog_with_local_workspace(tempdir.path(), false); create_test_dataset(&catalog, tempdir.path()).await; let schema = kamu_adapter_graphql::schema_quiet(); From 61b674ad438e8d698575f3244b79ab34864eae56 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Mon, 25 Mar 2024 20:12:54 +0100 Subject: [PATCH 07/16] Improve datasets lookup --- .../src/repos/dataset_repository_local_fs.rs | 59 +++++++++++++------ 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/src/infra/core/src/repos/dataset_repository_local_fs.rs b/src/infra/core/src/repos/dataset_repository_local_fs.rs index de08bebd65..575b6bab25 100644 --- a/src/infra/core/src/repos/dataset_repository_local_fs.rs +++ b/src/infra/core/src/repos/dataset_repository_local_fs.rs @@ -98,8 +98,9 @@ impl DatasetRepositoryLocalFs { } // TODO: Make dataset factory (and thus the hashing algo) configurable - fn get_dataset_impl(&self, dataset_handle: &DatasetHandle) -> Arc { - let layout = DatasetLayout::new(self.storage_strategy.get_dataset_path(dataset_handle)); + async fn get_dataset_impl(&self, dataset_handle: &DatasetHandle) -> Arc { + let layout = + DatasetLayout::new(self.storage_strategy.get_dataset_path(dataset_handle).await); Self::build_dataset(layout, self.event_bus.clone()) } @@ -111,7 +112,9 @@ impl DatasetRepositoryLocalFs { ) -> Result { let dataset_handle = self.resolve_dataset_ref(dataset_ref).await?; Ok(DatasetLayout::new( - self.storage_strategy.get_dataset_path(&dataset_handle), + self.storage_strategy + .get_dataset_path(&dataset_handle) + .await, )) } } @@ -122,7 +125,10 @@ impl DatasetRepositoryLocalFs { impl DatasetRegistry for DatasetRepositoryLocalFs { async fn get_dataset_url(&self, dataset_ref: &DatasetRef) -> Result { let dataset_handle = self.resolve_dataset_ref(dataset_ref).await?; - let dataset_path = self.storage_strategy.get_dataset_path(&dataset_handle); + let dataset_path = self + .storage_strategy + .get_dataset_path(&dataset_handle) + .await; Ok(Url::from_directory_path(dataset_path).unwrap()) } } @@ -186,7 +192,7 @@ impl DatasetRepository for DatasetRepositoryLocalFs { dataset_ref: &DatasetRef, ) -> Result, GetDatasetError> { let dataset_handle = self.resolve_dataset_ref(dataset_ref).await?; - let dataset = self.get_dataset_impl(&dataset_handle); + let dataset = self.get_dataset_impl(&dataset_handle).await; Ok(dataset) } @@ -242,7 +248,10 @@ impl DatasetRepository for DatasetRepositoryLocalFs { // It's okay to create a new dataset by this point let dataset_id = seed_block.event.dataset_id.clone(); let dataset_handle = DatasetHandle::new(dataset_id, dataset_alias.clone()); - let dataset_path = self.storage_strategy.get_dataset_path(&dataset_handle); + let dataset_path = self + .storage_strategy + .get_dataset_path(&dataset_handle) + .await; let layout = DatasetLayout::create(&dataset_path).int_err()?; let dataset = Self::build_dataset(layout, self.event_bus.clone()); @@ -384,7 +393,10 @@ impl DatasetRepository for DatasetRepositoryLocalFs { // repo_info.datasets.remove(index); // self.write_repo_info(repo_info).await?; - let dataset_dir = self.storage_strategy.get_dataset_path(&dataset_handle); + let dataset_dir = self + .storage_strategy + .get_dataset_path(&dataset_handle) + .await; tokio::fs::remove_dir_all(dataset_dir).await.int_err()?; self.event_bus @@ -403,7 +415,7 @@ impl DatasetRepository for DatasetRepositoryLocalFs { trait DatasetStorageStrategy: Sync + Send { fn is_multi_tenant(&self) -> bool; - fn get_dataset_path(&self, dataset_handle: &DatasetHandle) -> PathBuf; + async fn get_dataset_path(&self, dataset_handle: &DatasetHandle) -> PathBuf; fn get_all_datasets(&self) -> DatasetHandleStream<'_>; @@ -467,9 +479,22 @@ impl DatasetSingleTenantStorageStrategy { &dataset_alias.dataset_name } - fn dataset_path_impl(&self, dataset_alias: &DatasetAlias) -> PathBuf { + async fn dataset_path_impl(&self, dataset_alias: &DatasetAlias) -> PathBuf { let dataset_name = self.dataset_name(dataset_alias); - self.root.join(dataset_name) + let path = self.root.join(dataset_name); + + if !path.exists() { + use tokio_stream::StreamExt; + + let mut all_datasets_stream = self.get_all_datasets(); + + while let Some(dataset_handle) = all_datasets_stream.try_next().await.unwrap() { + if &dataset_handle.alias == dataset_alias { + return self.root.join(&dataset_alias.dataset_name); + } + } + } + path } async fn attempt_resolving_summary_via_path( @@ -500,8 +525,8 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { false } - fn get_dataset_path(&self, dataset_handle: &DatasetHandle) -> PathBuf { - self.dataset_path_impl(&dataset_handle.alias) + async fn get_dataset_path(&self, dataset_handle: &DatasetHandle) -> PathBuf { + self.dataset_path_impl(&dataset_handle.alias).await } fn get_all_datasets(&self) -> DatasetHandleStream<'_> { @@ -544,7 +569,7 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { "Multi-tenant refs shouldn't have reached down to here with earlier validations" ); - let dataset_path = self.dataset_path_impl(dataset_alias); + let dataset_path = self.dataset_path_impl(dataset_alias).await; if !dataset_path.exists() { return Err(ResolveDatasetError::NotFound(DatasetNotFoundError { dataset_ref: dataset_alias.as_local_ref(), @@ -576,7 +601,7 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { DatasetName::try_from(&dataset_dir_entry.file_name()).int_err()?, ); - let dataset_path = self.dataset_path_impl(&alias); + let dataset_path = self.dataset_path_impl(&alias).await; let summary = self .attempt_resolving_summary_via_path(&dataset_path, &alias) @@ -606,7 +631,7 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { dataset_handle: &DatasetHandle, new_name: &DatasetName, ) -> Result<(), InternalError> { - let old_dataset_path = self.get_dataset_path(dataset_handle); + let old_dataset_path = self.get_dataset_path(dataset_handle).await; let new_dataset_path = old_dataset_path.parent().unwrap().join(new_name); std::fs::rename(old_dataset_path, new_dataset_path).int_err()?; Ok(()) @@ -735,7 +760,7 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy { true } - fn get_dataset_path(&self, dataset_handle: &DatasetHandle) -> PathBuf { + async fn get_dataset_path(&self, dataset_handle: &DatasetHandle) -> PathBuf { let account_name = self.effective_account_name(&dataset_handle.alias); self.root @@ -880,7 +905,7 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy { dataset_handle: &DatasetHandle, new_name: &DatasetName, ) -> Result<(), InternalError> { - let dataset_path = self.get_dataset_path(dataset_handle); + let dataset_path = self.get_dataset_path(dataset_handle).await; let layout = DatasetLayout::new(dataset_path); let dataset = DatasetRepositoryLocalFs::build_dataset(layout, self.event_bus.clone()); From 56e094ffa44e650002fe1c21b85b93c50a71d09b Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Wed, 27 Mar 2024 14:05:42 +0100 Subject: [PATCH 08/16] Fix tests --- src/adapter/graphql/tests/tests/test_gql_data.rs | 5 ++++- .../graphql/tests/tests/test_gql_dataset_flow_configs.rs | 4 +++- .../graphql/tests/tests/test_gql_dataset_flow_runs.rs | 4 +++- src/adapter/graphql/tests/tests/test_gql_metadata.rs | 4 +++- src/adapter/graphql/tests/tests/test_gql_metadata_chain.rs | 4 +++- src/adapter/graphql/tests/tests/test_gql_search.rs | 4 +++- .../http/tests/tests/test_dataset_authorization_layer.rs | 4 +++- src/adapter/odata/tests/tests/test_handlers.rs | 4 +++- src/infra/core/tests/tests/engine/test_engine_transform.rs | 4 +++- src/infra/core/tests/tests/ingest/test_polling_ingest.rs | 4 +++- src/infra/core/tests/tests/ingest/test_push_ingest.rs | 4 +++- src/infra/core/tests/tests/ingest/test_writer.rs | 5 ++++- .../tests/tests/repos/test_dataset_repository_local_fs.rs | 5 ++++- src/infra/core/tests/tests/test_query_service_impl.rs | 5 ++++- src/infra/core/tests/tests/test_reset_service_impl.rs | 4 +++- src/infra/core/tests/tests/test_search_service_impl.rs | 4 +++- src/infra/core/tests/tests/test_sync_service_impl.rs | 5 ++++- src/infra/core/tests/tests/test_transform_service_impl.rs | 4 +++- src/infra/core/tests/tests/test_verification_service_impl.rs | 4 +++- 19 files changed, 62 insertions(+), 19 deletions(-) diff --git a/src/adapter/graphql/tests/tests/test_gql_data.rs b/src/adapter/graphql/tests/tests/test_gql_data.rs index cef1e5dbcc..ae78ac2b76 100644 --- a/src/adapter/graphql/tests/tests/test_gql_data.rs +++ b/src/adapter/graphql/tests/tests/test_gql_data.rs @@ -23,12 +23,15 @@ use opendatafabric::*; ///////////////////////////////////////////////////////////////////////////////////////// fn create_catalog_with_local_workspace(tempdir: &Path, is_multitenant: bool) -> dill::Catalog { + let datasets_dir = tempdir.join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); + dill::CatalogBuilder::new() .add::() .add::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tempdir.join("datasets")) + .with_root(datasets_dir) .with_current_account_subject(Arc::new(CurrentAccountSubject::new_test())) .with_multi_tenant(is_multitenant), ) diff --git a/src/adapter/graphql/tests/tests/test_gql_dataset_flow_configs.rs b/src/adapter/graphql/tests/tests/test_gql_dataset_flow_configs.rs index 9127a0c540..0162239a2d 100644 --- a/src/adapter/graphql/tests/tests/test_gql_dataset_flow_configs.rs +++ b/src/adapter/graphql/tests/tests/test_gql_dataset_flow_configs.rs @@ -1099,12 +1099,14 @@ struct FlowConfigHarness { impl FlowConfigHarness { fn new() -> Self { let tempdir = tempfile::tempdir().unwrap(); + let datasets_dir = tempdir.path().join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); let catalog_base = dill::CatalogBuilder::new() .add::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tempdir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() diff --git a/src/adapter/graphql/tests/tests/test_gql_dataset_flow_runs.rs b/src/adapter/graphql/tests/tests/test_gql_dataset_flow_runs.rs index b52e61c424..c67a1f9f0b 100644 --- a/src/adapter/graphql/tests/tests/test_gql_dataset_flow_runs.rs +++ b/src/adapter/graphql/tests/tests/test_gql_dataset_flow_runs.rs @@ -1701,6 +1701,8 @@ impl FlowRunsHarness { let tempdir = tempfile::tempdir().unwrap(); let run_info_dir = tempdir.path().join("run"); let cache_dir = tempdir.path().join("cache"); + let datasets_dir = tempdir.path().join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); std::fs::create_dir(&run_info_dir).unwrap(); std::fs::create_dir(&cache_dir).unwrap(); @@ -1711,7 +1713,7 @@ impl FlowRunsHarness { .add::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tempdir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() diff --git a/src/adapter/graphql/tests/tests/test_gql_metadata.rs b/src/adapter/graphql/tests/tests/test_gql_metadata.rs index eced5d86b8..9355727983 100644 --- a/src/adapter/graphql/tests/tests/test_gql_metadata.rs +++ b/src/adapter/graphql/tests/tests/test_gql_metadata.rs @@ -23,12 +23,14 @@ use crate::utils::authentication_catalogs; #[test_log::test(tokio::test)] async fn test_current_push_sources() { let tempdir = tempfile::tempdir().unwrap(); + let datasets_dir = tempdir.path().join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); let base_catalog = CatalogBuilder::new() .add::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tempdir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() diff --git a/src/adapter/graphql/tests/tests/test_gql_metadata_chain.rs b/src/adapter/graphql/tests/tests/test_gql_metadata_chain.rs index 399df2b44c..53f74ea426 100644 --- a/src/adapter/graphql/tests/tests/test_gql_metadata_chain.rs +++ b/src/adapter/graphql/tests/tests/test_gql_metadata_chain.rs @@ -26,13 +26,15 @@ use crate::utils::{authentication_catalogs, expect_anonymous_access_error}; #[test_log::test(tokio::test)] async fn test_metadata_chain_events() { let tempdir = tempfile::tempdir().unwrap(); + let datasets_dir = tempdir.path().join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); let base_catalog = dill::CatalogBuilder::new() .add::() .add::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tempdir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() diff --git a/src/adapter/graphql/tests/tests/test_gql_search.rs b/src/adapter/graphql/tests/tests/test_gql_search.rs index 8fa72e9606..e024930df1 100644 --- a/src/adapter/graphql/tests/tests/test_gql_search.rs +++ b/src/adapter/graphql/tests/tests/test_gql_search.rs @@ -18,6 +18,8 @@ use opendatafabric::*; #[tokio::test] async fn query() { let tempdir = tempfile::tempdir().unwrap(); + let datasets_dir = tempdir.path().join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); let cat = dill::CatalogBuilder::new() .add::() @@ -26,7 +28,7 @@ async fn query() { .add::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tempdir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() diff --git a/src/adapter/http/tests/tests/test_dataset_authorization_layer.rs b/src/adapter/http/tests/tests/test_dataset_authorization_layer.rs index a50dee80b1..0ec30e0bb6 100644 --- a/src/adapter/http/tests/tests/test_dataset_authorization_layer.rs +++ b/src/adapter/http/tests/tests/test_dataset_authorization_layer.rs @@ -211,6 +211,8 @@ impl ServerHarness { dataset_action_authorizer: MockDatasetActionAuthorizer, ) -> Self { let temp_dir = tempfile::TempDir::new().unwrap(); + let datasets_dir = temp_dir.path().join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); let mut catalog_builder = dill::CatalogBuilder::new(); catalog_builder.add::(); @@ -227,7 +229,7 @@ impl ServerHarness { .add_builder( DatasetRepositoryLocalFs::builder() .with_multi_tenant(false) - .with_root(temp_dir.path().join("datasets")), + .with_root(datasets_dir), ) .bind::(); diff --git a/src/adapter/odata/tests/tests/test_handlers.rs b/src/adapter/odata/tests/tests/test_handlers.rs index ec0fa2f8da..cef2b22e78 100644 --- a/src/adapter/odata/tests/tests/test_handlers.rs +++ b/src/adapter/odata/tests/tests/test_handlers.rs @@ -261,8 +261,10 @@ impl TestHarness { let temp_dir = tempfile::tempdir().unwrap(); let run_info_dir = temp_dir.path().join("run"); let cache_dir = temp_dir.path().join("cache"); + let datasets_dir = temp_dir.path().join("datasets"); std::fs::create_dir(&run_info_dir).unwrap(); std::fs::create_dir(cache_dir).unwrap(); + std::fs::create_dir(&datasets_dir).unwrap(); let catalog = dill::CatalogBuilder::new() .add::() @@ -274,7 +276,7 @@ impl TestHarness { .bind::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(temp_dir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() diff --git a/src/infra/core/tests/tests/engine/test_engine_transform.rs b/src/infra/core/tests/tests/engine/test_engine_transform.rs index ae796943cc..c6558fadfd 100644 --- a/src/infra/core/tests/tests/engine/test_engine_transform.rs +++ b/src/infra/core/tests/tests/engine/test_engine_transform.rs @@ -211,8 +211,10 @@ async fn test_transform_common(transform: Transform) { let tempdir = tempfile::tempdir().unwrap(); let run_info_dir = tempdir.path().join("run"); let cache_dir = tempdir.path().join("cache"); + let datasets_dir = tempdir.path().join("datasets"); std::fs::create_dir(&run_info_dir).unwrap(); std::fs::create_dir(&cache_dir).unwrap(); + std::fs::create_dir(&datasets_dir).unwrap(); let catalog = dill::CatalogBuilder::new() .add_value(ContainerRuntimeConfig::default()) @@ -223,7 +225,7 @@ async fn test_transform_common(transform: Transform) { .add_value(CurrentAccountSubject::new_test()) .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tempdir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() diff --git a/src/infra/core/tests/tests/ingest/test_polling_ingest.rs b/src/infra/core/tests/tests/ingest/test_polling_ingest.rs index 91f6427a4b..42291a271f 100644 --- a/src/infra/core/tests/tests/ingest/test_polling_ingest.rs +++ b/src/infra/core/tests/tests/ingest/test_polling_ingest.rs @@ -1085,8 +1085,10 @@ impl IngestTestHarness { let temp_dir = tempfile::tempdir().unwrap(); let run_info_dir = temp_dir.path().join("run"); let cache_dir = temp_dir.path().join("cache"); + let datasets_dir = temp_dir.path().join("datasets"); std::fs::create_dir(&run_info_dir).unwrap(); std::fs::create_dir(&cache_dir).unwrap(); + std::fs::create_dir(&datasets_dir).unwrap(); let catalog = dill::CatalogBuilder::new() .add_value(ContainerRuntimeConfig::default()) @@ -1100,7 +1102,7 @@ impl IngestTestHarness { .bind::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(temp_dir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() diff --git a/src/infra/core/tests/tests/ingest/test_push_ingest.rs b/src/infra/core/tests/tests/ingest/test_push_ingest.rs index 4d0b806bbf..e6b6674a3b 100644 --- a/src/infra/core/tests/tests/ingest/test_push_ingest.rs +++ b/src/infra/core/tests/tests/ingest/test_push_ingest.rs @@ -473,8 +473,10 @@ impl IngestTestHarness { let temp_dir = tempfile::tempdir().unwrap(); let run_info_dir = temp_dir.path().join("run"); let cache_dir = temp_dir.path().join("cache"); + let datasets_dir = temp_dir.path().join("datasets"); std::fs::create_dir(&run_info_dir).unwrap(); std::fs::create_dir(cache_dir).unwrap(); + std::fs::create_dir(&datasets_dir).unwrap(); let catalog = dill::CatalogBuilder::new() .add::() @@ -484,7 +486,7 @@ impl IngestTestHarness { .bind::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(temp_dir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() diff --git a/src/infra/core/tests/tests/ingest/test_writer.rs b/src/infra/core/tests/tests/ingest/test_writer.rs index 546d530e48..5037b33122 100644 --- a/src/infra/core/tests/tests/ingest/test_writer.rs +++ b/src/infra/core/tests/tests/ingest/test_writer.rs @@ -933,6 +933,9 @@ struct Harness { impl Harness { async fn new(dataset_events: Vec) -> Self { let temp_dir = tempfile::tempdir().unwrap(); + let datasets_dir = temp_dir.path().join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); + let system_time = Utc.with_ymd_and_hms(2010, 1, 1, 12, 0, 0).unwrap(); let catalog = dill::CatalogBuilder::new() @@ -942,7 +945,7 @@ impl Harness { .add_value(CurrentAccountSubject::new_test()) .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(temp_dir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() diff --git a/src/infra/core/tests/tests/repos/test_dataset_repository_local_fs.rs b/src/infra/core/tests/tests/repos/test_dataset_repository_local_fs.rs index 85ccbe3d42..8abd7ed918 100644 --- a/src/infra/core/tests/tests/repos/test_dataset_repository_local_fs.rs +++ b/src/infra/core/tests/tests/repos/test_dataset_repository_local_fs.rs @@ -32,6 +32,9 @@ impl LocalFsRepoHarness { dataset_action_authorizer: TDatasetActionAuthorizer, multi_tenant: bool, ) -> Self { + let datasets_dir = tempdir.path().join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); + let catalog = dill::CatalogBuilder::new() .add::() .add::() @@ -40,7 +43,7 @@ impl LocalFsRepoHarness { .bind::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tempdir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(multi_tenant), ) .bind::() diff --git a/src/infra/core/tests/tests/test_query_service_impl.rs b/src/infra/core/tests/tests/test_query_service_impl.rs index 4ae92ea718..1e4fc42134 100644 --- a/src/infra/core/tests/tests/test_query_service_impl.rs +++ b/src/infra/core/tests/tests/test_query_service_impl.rs @@ -115,12 +115,15 @@ fn create_catalog_with_local_workspace( tempdir: &Path, dataset_action_authorizer: MockDatasetActionAuthorizer, ) -> dill::Catalog { + let datasets_dir = tempdir.join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); + dill::CatalogBuilder::new() .add::() .add::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tempdir.join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() diff --git a/src/infra/core/tests/tests/test_reset_service_impl.rs b/src/infra/core/tests/tests/test_reset_service_impl.rs index 92f0edd4ef..776ace7e17 100644 --- a/src/infra/core/tests/tests/test_reset_service_impl.rs +++ b/src/infra/core/tests/tests/test_reset_service_impl.rs @@ -111,6 +111,8 @@ struct ResetTestHarness { impl ResetTestHarness { fn new() -> Self { let tempdir = tempfile::tempdir().unwrap(); + let datasets_dir = tempdir.path().join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); let catalog = dill::CatalogBuilder::new() .add::() @@ -120,7 +122,7 @@ impl ResetTestHarness { .bind::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tempdir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() diff --git a/src/infra/core/tests/tests/test_search_service_impl.rs b/src/infra/core/tests/tests/test_search_service_impl.rs index abae1de3e7..4f108bc3ba 100644 --- a/src/infra/core/tests/tests/test_search_service_impl.rs +++ b/src/infra/core/tests/tests/test_search_service_impl.rs @@ -22,6 +22,8 @@ async fn do_test_search(tmp_workspace_dir: &Path, repo_url: Url) { let dataset_local_alias = DatasetAlias::new(None, DatasetName::new_unchecked("foo")); let repo_name = RepoName::new_unchecked("repo"); let dataset_remote_alias = DatasetAliasRemote::try_from("repo/bar").unwrap(); + let datasets_dir = tmp_workspace_dir.join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); let catalog = dill::CatalogBuilder::new() .add::() @@ -29,7 +31,7 @@ async fn do_test_search(tmp_workspace_dir: &Path, repo_url: Url) { .add_value(CurrentAccountSubject::new_test()) .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tmp_workspace_dir.join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() diff --git a/src/infra/core/tests/tests/test_sync_service_impl.rs b/src/infra/core/tests/tests/test_sync_service_impl.rs index e69f7f216e..abaaf73b99 100644 --- a/src/infra/core/tests/tests/test_sync_service_impl.rs +++ b/src/infra/core/tests/tests/test_sync_service_impl.rs @@ -83,6 +83,9 @@ async fn do_test_sync( &dataset_alias_2, ); + let datasets_dir = tmp_workspace_dir.join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); + let catalog = dill::CatalogBuilder::new() .add::() .add::() @@ -93,7 +96,7 @@ async fn do_test_sync( .bind::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tmp_workspace_dir.join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() diff --git a/src/infra/core/tests/tests/test_transform_service_impl.rs b/src/infra/core/tests/tests/test_transform_service_impl.rs index dfdea79ad3..87207b7f41 100644 --- a/src/infra/core/tests/tests/test_transform_service_impl.rs +++ b/src/infra/core/tests/tests/test_transform_service_impl.rs @@ -40,6 +40,8 @@ impl TransformTestHarness { engine_provisioner: TEngineProvisioner, ) -> Self { let tempdir = tempfile::tempdir().unwrap(); + let datasets_dir = tempdir.path().join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); let catalog = dill::CatalogBuilder::new() .add::() @@ -51,7 +53,7 @@ impl TransformTestHarness { .bind::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tempdir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .add::() diff --git a/src/infra/core/tests/tests/test_verification_service_impl.rs b/src/infra/core/tests/tests/test_verification_service_impl.rs index 0c86be8552..adb7620c6c 100644 --- a/src/infra/core/tests/tests/test_verification_service_impl.rs +++ b/src/infra/core/tests/tests/test_verification_service_impl.rs @@ -25,6 +25,8 @@ use super::test_pull_service_impl::TestTransformService; #[tokio::test] async fn test_verify_data_consistency() { let tempdir = tempfile::tempdir().unwrap(); + let datasets_dir = tempdir.path().join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); let dataset_alias = DatasetAlias::new(None, DatasetName::new_unchecked("bar")); @@ -38,7 +40,7 @@ async fn test_verify_data_consistency() { .bind::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tempdir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() From 37dbbd26253c809d8175cfa68e18e0011395764a Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Wed, 27 Mar 2024 15:31:08 +0100 Subject: [PATCH 09/16] Fix datasets lookup logic --- .../graphql/tests/tests/test_gql_data.rs | 23 ++- .../tests/tests/test_gql_metadata_chain.rs | 110 +++++++------- .../src/identity/dataset_identity.rs | 2 +- .../tests/tests/test_dataset_refs.rs | 2 +- .../src/repos/dataset_repository_local_fs.rs | 142 ++++++++++++------ .../core/tests/tests/engine/test_engine_io.rs | 4 +- 6 files changed, 168 insertions(+), 115 deletions(-) diff --git a/src/adapter/graphql/tests/tests/test_gql_data.rs b/src/adapter/graphql/tests/tests/test_gql_data.rs index ae78ac2b76..48a902219d 100644 --- a/src/adapter/graphql/tests/tests/test_gql_data.rs +++ b/src/adapter/graphql/tests/tests/test_gql_data.rs @@ -45,12 +45,16 @@ fn create_catalog_with_local_workspace(tempdir: &Path, is_multitenant: bool) -> ///////////////////////////////////////////////////////////////////////////////////////// -async fn create_test_dataset(catalog: &dill::Catalog, tempdir: &Path) { +async fn create_test_dataset( + catalog: &dill::Catalog, + tempdir: &Path, + account_name: Option, +) { let dataset_repo = catalog.get_one::().unwrap(); let dataset = dataset_repo .create_dataset( - &DatasetAlias::new(None, DatasetName::new_unchecked("foo")), + &DatasetAlias::new(account_name, DatasetName::new_unchecked("foo")), MetadataFactory::metadata_block(MetadataFactory::seed(DatasetKind::Root).build()) .build_typed(), ) @@ -107,7 +111,7 @@ async fn create_test_dataset(catalog: &dill::Catalog, tempdir: &Path) { async fn test_dataset_schema_local_fs() { let tempdir = tempfile::tempdir().unwrap(); let catalog = create_catalog_with_local_workspace(tempdir.path(), false); - create_test_dataset(&catalog, tempdir.path()).await; + create_test_dataset(&catalog, tempdir.path(), None).await; let schema = kamu_adapter_graphql::schema_quiet(); let res = schema @@ -161,12 +165,17 @@ async fn test_dataset_schema_local_fs() { ///////////////////////////////////////////////////////////////////////////////////////// -#[test_group::group(engine, datafusion)] +// #[test_group::group(engine, datafusion)] #[test_log::test(tokio::test)] async fn test_dataset_case_insensetive_schema_local_fs() { let tempdir = tempfile::tempdir().unwrap(); let catalog = create_catalog_with_local_workspace(tempdir.path(), true); - create_test_dataset(&catalog, tempdir.path()).await; + create_test_dataset( + &catalog, + tempdir.path(), + Some(AccountName::new_unchecked("KaMu")), + ) + .await; let schema = kamu_adapter_graphql::schema_quiet(); let res = schema @@ -225,7 +234,7 @@ async fn test_dataset_case_insensetive_schema_local_fs() { async fn test_dataset_tail_local_fs() { let tempdir = tempfile::tempdir().unwrap(); let catalog = create_catalog_with_local_workspace(tempdir.path(), false); - create_test_dataset(&catalog, tempdir.path()).await; + create_test_dataset(&catalog, tempdir.path(), None).await; let schema = kamu_adapter_graphql::schema_quiet(); let res = schema @@ -266,7 +275,7 @@ async fn test_dataset_tail_local_fs() { async fn test_dataset_tail_empty_local_fs() { let tempdir = tempfile::tempdir().unwrap(); let catalog = create_catalog_with_local_workspace(tempdir.path(), false); - create_test_dataset(&catalog, tempdir.path()).await; + create_test_dataset(&catalog, tempdir.path(), None).await; let schema = kamu_adapter_graphql::schema_quiet(); let res = schema diff --git a/src/adapter/graphql/tests/tests/test_gql_metadata_chain.rs b/src/adapter/graphql/tests/tests/test_gql_metadata_chain.rs index 53f74ea426..501da4abd2 100644 --- a/src/adapter/graphql/tests/tests/test_gql_metadata_chain.rs +++ b/src/adapter/graphql/tests/tests/test_gql_metadata_chain.rs @@ -25,25 +25,11 @@ use crate::utils::{authentication_catalogs, expect_anonymous_access_error}; #[test_log::test(tokio::test)] async fn test_metadata_chain_events() { - let tempdir = tempfile::tempdir().unwrap(); - let datasets_dir = tempdir.path().join("datasets"); - std::fs::create_dir(&datasets_dir).unwrap(); - - let base_catalog = dill::CatalogBuilder::new() - .add::() - .add::() - .add_builder( - DatasetRepositoryLocalFs::builder() - .with_root(datasets_dir) - .with_multi_tenant(false), - ) - .bind::() - .add::() - .build(); + let harness = GraphQLMetadataChainHarness::new(false); // Init dataset - let (_, catalog_authorized) = authentication_catalogs(&base_catalog); - let dataset_repo = catalog_authorized + let dataset_repo = harness + .catalog_authorized .get_one::() .unwrap(); let create_result = dataset_repo @@ -126,7 +112,7 @@ async fn test_metadata_chain_events() { let schema = kamu_adapter_graphql::schema_quiet(); let res = schema - .execute(async_graphql::Request::new(request_code.clone()).data(catalog_authorized)) + .execute(async_graphql::Request::new(request_code.clone()).data(harness.catalog_authorized)) .await; assert!(res.is_ok(), "{res:?}"); @@ -179,23 +165,10 @@ async fn test_metadata_chain_events() { #[test_log::test(tokio::test)] async fn metadata_chain_append_event() { - let tempdir = tempfile::tempdir().unwrap(); - - let base_catalog = dill::CatalogBuilder::new() - .add::() - .add::() - .add_builder( - DatasetRepositoryLocalFs::builder() - .with_root(tempdir.path().join("datasets")) - .with_multi_tenant(false), - ) - .bind::() - .add::() - .build(); + let harness = GraphQLMetadataChainHarness::new(false); - let (catalog_anonymous, catalog_authorized) = authentication_catalogs(&base_catalog); - - let dataset_repo = catalog_authorized + let dataset_repo = harness + .catalog_authorized .get_one::() .unwrap(); let create_result = dataset_repo @@ -245,12 +218,12 @@ async fn metadata_chain_append_event() { let schema = kamu_adapter_graphql::schema_quiet(); let res = schema - .execute(async_graphql::Request::new(request_code.clone()).data(catalog_anonymous)) + .execute(async_graphql::Request::new(request_code.clone()).data(harness.catalog_anonymous)) .await; expect_anonymous_access_error(res); let res = schema - .execute(async_graphql::Request::new(request_code.clone()).data(catalog_authorized)) + .execute(async_graphql::Request::new(request_code.clone()).data(harness.catalog_authorized)) .await; assert!(res.is_ok(), "{res:?}"); assert_eq!( @@ -275,23 +248,10 @@ async fn metadata_chain_append_event() { #[test_log::test(tokio::test)] async fn metadata_update_readme_new() { - let tempdir = tempfile::tempdir().unwrap(); - - let base_catalog = dill::CatalogBuilder::new() - .add::() - .add::() - .add_builder( - DatasetRepositoryLocalFs::builder() - .with_root(tempdir.path().join("datasets")) - .with_multi_tenant(false), - ) - .bind::() - .add::() - .build(); + let harness = GraphQLMetadataChainHarness::new(false); - let (catalog_anonymous, catalog_authorized) = authentication_catalogs(&base_catalog); - - let dataset_repo = catalog_authorized + let dataset_repo = harness + .catalog_authorized .get_one::() .unwrap(); let create_result = dataset_repo @@ -331,14 +291,16 @@ async fn metadata_update_readme_new() { let res = schema .execute( - async_graphql::Request::new(new_readme_request_code.clone()).data(catalog_anonymous), + async_graphql::Request::new(new_readme_request_code.clone()) + .data(harness.catalog_anonymous), ) .await; expect_anonymous_access_error(res); let res = schema .execute( - async_graphql::Request::new(new_readme_request_code).data(catalog_authorized.clone()), + async_graphql::Request::new(new_readme_request_code) + .data(harness.catalog_authorized.clone()), ) .await; @@ -399,7 +361,7 @@ async fn metadata_update_readme_new() { ) .replace("", &create_result.dataset_handle.id.to_string()), ) - .data(catalog_authorized.clone()), + .data(harness.catalog_authorized.clone()), ) .await; @@ -437,7 +399,7 @@ async fn metadata_update_readme_new() { ) .replace("", &create_result.dataset_handle.id.to_string()), ) - .data(catalog_authorized.clone()), + .data(harness.catalog_authorized.clone()), ) .await; @@ -490,7 +452,7 @@ async fn metadata_update_readme_new() { ) .replace("", &create_result.dataset_handle.id.to_string()), ) - .data(catalog_authorized), + .data(harness.catalog_authorized), ) .await; @@ -535,3 +497,37 @@ async fn assert_attachments_eq(dataset: Arc, expected: SetAttachmen } //////////////////////////////////////////////////////////////////////////////////////// + +struct GraphQLMetadataChainHarness { + _tempdir: tempfile::TempDir, + catalog_authorized: dill::Catalog, + catalog_anonymous: dill::Catalog, +} + +impl GraphQLMetadataChainHarness { + fn new(is_multi_tenant: bool) -> Self { + let tempdir = tempfile::tempdir().unwrap(); + let datasets_dir = tempdir.path().join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); + + let base_catalog = dill::CatalogBuilder::new() + .add::() + .add::() + .add_builder( + DatasetRepositoryLocalFs::builder() + .with_root(datasets_dir) + .with_multi_tenant(is_multi_tenant), + ) + .bind::() + .add::() + .build(); + + let (catalog_anonymous, catalog_authorized) = authentication_catalogs(&base_catalog); + + Self { + _tempdir: tempdir, + catalog_anonymous, + catalog_authorized, + } + } +} diff --git a/src/domain/opendatafabric/src/identity/dataset_identity.rs b/src/domain/opendatafabric/src/identity/dataset_identity.rs index 00c3f8ea9a..75823c59df 100644 --- a/src/domain/opendatafabric/src/identity/dataset_identity.rs +++ b/src/domain/opendatafabric/src/identity/dataset_identity.rs @@ -293,7 +293,7 @@ impl DatasetAliasPattern { } pub fn matches(&self, dataset_handle: &DatasetHandle) -> bool { - self.account_name == dataset_handle.alias.account_name + (self.account_name.is_none() || self.account_name == dataset_handle.alias.account_name) && self .dataset_name_pattern .matches(&dataset_handle.alias.dataset_name) diff --git a/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs b/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs index e310de5833..ed72ed4119 100644 --- a/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs +++ b/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs @@ -117,7 +117,7 @@ fn test_dataset_ref_pattern_match() { }, }; - assert!(!pattern.matches(&dataset_handle)); + assert!(pattern.matches(&dataset_handle)); let dataset_account = "account1"; let dataset_name_pattern = "net%"; diff --git a/src/infra/core/src/repos/dataset_repository_local_fs.rs b/src/infra/core/src/repos/dataset_repository_local_fs.rs index 575b6bab25..3e106a9b7a 100644 --- a/src/infra/core/src/repos/dataset_repository_local_fs.rs +++ b/src/infra/core/src/repos/dataset_repository_local_fs.rs @@ -98,9 +98,8 @@ impl DatasetRepositoryLocalFs { } // TODO: Make dataset factory (and thus the hashing algo) configurable - async fn get_dataset_impl(&self, dataset_handle: &DatasetHandle) -> Arc { - let layout = - DatasetLayout::new(self.storage_strategy.get_dataset_path(dataset_handle).await); + fn get_dataset_impl(&self, dataset_handle: &DatasetHandle) -> Arc { + let layout = DatasetLayout::new(self.storage_strategy.get_dataset_path(dataset_handle)); Self::build_dataset(layout, self.event_bus.clone()) } @@ -112,9 +111,7 @@ impl DatasetRepositoryLocalFs { ) -> Result { let dataset_handle = self.resolve_dataset_ref(dataset_ref).await?; Ok(DatasetLayout::new( - self.storage_strategy - .get_dataset_path(&dataset_handle) - .await, + self.storage_strategy.get_dataset_path(&dataset_handle), )) } } @@ -125,10 +122,7 @@ impl DatasetRepositoryLocalFs { impl DatasetRegistry for DatasetRepositoryLocalFs { async fn get_dataset_url(&self, dataset_ref: &DatasetRef) -> Result { let dataset_handle = self.resolve_dataset_ref(dataset_ref).await?; - let dataset_path = self - .storage_strategy - .get_dataset_path(&dataset_handle) - .await; + let dataset_path = self.storage_strategy.get_dataset_path(&dataset_handle); Ok(Url::from_directory_path(dataset_path).unwrap()) } } @@ -192,7 +186,7 @@ impl DatasetRepository for DatasetRepositoryLocalFs { dataset_ref: &DatasetRef, ) -> Result, GetDatasetError> { let dataset_handle = self.resolve_dataset_ref(dataset_ref).await?; - let dataset = self.get_dataset_impl(&dataset_handle).await; + let dataset = self.get_dataset_impl(&dataset_handle); Ok(dataset) } @@ -248,10 +242,7 @@ impl DatasetRepository for DatasetRepositoryLocalFs { // It's okay to create a new dataset by this point let dataset_id = seed_block.event.dataset_id.clone(); let dataset_handle = DatasetHandle::new(dataset_id, dataset_alias.clone()); - let dataset_path = self - .storage_strategy - .get_dataset_path(&dataset_handle) - .await; + let dataset_path = self.storage_strategy.get_dataset_path(&dataset_handle); let layout = DatasetLayout::create(&dataset_path).int_err()?; let dataset = Self::build_dataset(layout, self.event_bus.clone()); @@ -393,10 +384,7 @@ impl DatasetRepository for DatasetRepositoryLocalFs { // repo_info.datasets.remove(index); // self.write_repo_info(repo_info).await?; - let dataset_dir = self - .storage_strategy - .get_dataset_path(&dataset_handle) - .await; + let dataset_dir = self.storage_strategy.get_dataset_path(&dataset_handle); tokio::fs::remove_dir_all(dataset_dir).await.int_err()?; self.event_bus @@ -415,7 +403,7 @@ impl DatasetRepository for DatasetRepositoryLocalFs { trait DatasetStorageStrategy: Sync + Send { fn is_multi_tenant(&self) -> bool; - async fn get_dataset_path(&self, dataset_handle: &DatasetHandle) -> PathBuf; + fn get_dataset_path(&self, dataset_handle: &DatasetHandle) -> PathBuf; fn get_all_datasets(&self) -> DatasetHandleStream<'_>; @@ -479,22 +467,9 @@ impl DatasetSingleTenantStorageStrategy { &dataset_alias.dataset_name } - async fn dataset_path_impl(&self, dataset_alias: &DatasetAlias) -> PathBuf { + fn dataset_path_impl(&self, dataset_alias: &DatasetAlias) -> PathBuf { let dataset_name = self.dataset_name(dataset_alias); - let path = self.root.join(dataset_name); - - if !path.exists() { - use tokio_stream::StreamExt; - - let mut all_datasets_stream = self.get_all_datasets(); - - while let Some(dataset_handle) = all_datasets_stream.try_next().await.unwrap() { - if &dataset_handle.alias == dataset_alias { - return self.root.join(&dataset_alias.dataset_name); - } - } - } - path + self.root.join(dataset_name) } async fn attempt_resolving_summary_via_path( @@ -517,6 +492,39 @@ impl DatasetSingleTenantStorageStrategy { } }) } + + async fn attempt_resolve_dataset_alias( + &self, + dataset_alias: &DatasetAlias, + ) -> Result { + assert!( + !dataset_alias.is_multi_tenant() + || dataset_alias.account_name.as_ref().unwrap() == DEFAULT_ACCOUNT_NAME, + "Multi-tenant refs shouldn't have reached down to here with earlier validations" + ); + + let dataset_path = self.dataset_path_impl(dataset_alias); + if !dataset_path.exists() { + return Err(ResolveDatasetError::NotFound(DatasetNotFoundError { + dataset_ref: dataset_alias.as_local_ref(), + })); + } + + self.resolve_dataset_handle(&dataset_path, dataset_alias) + .await + } + + async fn resolve_dataset_handle( + &self, + dataset_path: &PathBuf, + dataset_alias: &DatasetAlias, + ) -> Result { + let summary = self + .attempt_resolving_summary_via_path(dataset_path, dataset_alias) + .await?; + + Ok(DatasetHandle::new(summary.id, dataset_alias.clone())) + } } #[async_trait] @@ -525,8 +533,8 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { false } - async fn get_dataset_path(&self, dataset_handle: &DatasetHandle) -> PathBuf { - self.dataset_path_impl(&dataset_handle.alias).await + fn get_dataset_path(&self, dataset_handle: &DatasetHandle) -> PathBuf { + self.dataset_path_impl(&dataset_handle.alias) } fn get_all_datasets(&self) -> DatasetHandleStream<'_> { @@ -542,7 +550,7 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { } let dataset_name = DatasetName::try_from(&dataset_dir_entry.file_name()).int_err()?; let dataset_alias = DatasetAlias::new(None, dataset_name); - match self.resolve_dataset_alias(&dataset_alias).await { + match self.attempt_resolve_dataset_alias(&dataset_alias).await { Ok(hdl) => { yield hdl; Ok(()) } Err(ResolveDatasetError::NotFound(_)) => Ok(()), Err(e) => Err(e.int_err()) @@ -569,17 +577,29 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { "Multi-tenant refs shouldn't have reached down to here with earlier validations" ); - let dataset_path = self.dataset_path_impl(dataset_alias).await; + let dataset_path = self.dataset_path_impl(dataset_alias); if !dataset_path.exists() { + use tokio_stream::StreamExt; + + let mut all_datasets_stream = self.get_all_datasets(); + + while let Some(dataset_handle) = all_datasets_stream.try_next().await.unwrap() { + if &dataset_handle.alias == dataset_alias { + return self + .resolve_dataset_handle( + &self.root.join(&dataset_handle.alias.dataset_name), + &dataset_handle.alias, + ) + .await; + } + } return Err(ResolveDatasetError::NotFound(DatasetNotFoundError { dataset_ref: dataset_alias.as_local_ref(), })); } - let summary = self - .attempt_resolving_summary_via_path(&dataset_path, dataset_alias) - .await?; - Ok(DatasetHandle::new(summary.id, dataset_alias.clone())) + self.resolve_dataset_handle(&dataset_path, dataset_alias) + .await } async fn resolve_dataset_id( @@ -601,7 +621,7 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { DatasetName::try_from(&dataset_dir_entry.file_name()).int_err()?, ); - let dataset_path = self.dataset_path_impl(&alias).await; + let dataset_path = self.dataset_path_impl(&alias); let summary = self .attempt_resolving_summary_via_path(&dataset_path, &alias) @@ -631,7 +651,7 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { dataset_handle: &DatasetHandle, new_name: &DatasetName, ) -> Result<(), InternalError> { - let old_dataset_path = self.get_dataset_path(dataset_handle).await; + let old_dataset_path = self.get_dataset_path(dataset_handle); let new_dataset_path = old_dataset_path.parent().unwrap().join(new_name); std::fs::rename(old_dataset_path, new_dataset_path).int_err()?; Ok(()) @@ -752,6 +772,32 @@ impl DatasetMultiTenantStorageStrategy { } }) } + + fn resolve_account_dir( + &self, + account_name: &AccountName, + ) -> Result { + let account_dataset_dir_path = self.root.join(account_name); + + if !account_dataset_dir_path.is_dir() { + let read_accout_dirs = std::fs::read_dir(self.root.as_path()).int_err()?; + + for read_accout_dir in read_accout_dirs { + let account_dir_name = AccountName::new_unchecked( + read_accout_dir + .int_err()? + .file_name() + .to_str() + .unwrap_or(""), + ); + if account_name == &account_dir_name { + return Ok(self.root.join(account_dir_name)); + } + } + } + + Ok(account_dataset_dir_path) + } } #[async_trait] @@ -760,7 +806,7 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy { true } - async fn get_dataset_path(&self, dataset_handle: &DatasetHandle) -> PathBuf { + fn get_dataset_path(&self, dataset_handle: &DatasetHandle) -> PathBuf { let account_name = self.effective_account_name(&dataset_handle.alias); self.root @@ -816,8 +862,8 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy { dataset_alias: &DatasetAlias, ) -> Result { let effective_account_name = self.effective_account_name(dataset_alias); + let account_dataset_dir_path = self.resolve_account_dir(effective_account_name)?; - let account_dataset_dir_path = self.root.join(effective_account_name); if account_dataset_dir_path.is_dir() { let read_dataset_dir = std::fs::read_dir(account_dataset_dir_path).int_err()?; @@ -905,7 +951,7 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy { dataset_handle: &DatasetHandle, new_name: &DatasetName, ) -> Result<(), InternalError> { - let dataset_path = self.get_dataset_path(dataset_handle).await; + let dataset_path = self.get_dataset_path(dataset_handle); let layout = DatasetLayout::new(dataset_path); let dataset = DatasetRepositoryLocalFs::build_dataset(layout, self.event_bus.clone()); diff --git a/src/infra/core/tests/tests/engine/test_engine_io.rs b/src/infra/core/tests/tests/engine/test_engine_io.rs index 088d5f8318..343ee57c38 100644 --- a/src/infra/core/tests/tests/engine/test_engine_io.rs +++ b/src/infra/core/tests/tests/engine/test_engine_io.rs @@ -223,6 +223,8 @@ async fn test_engine_io_local_file_mount() { let tempdir = tempfile::tempdir().unwrap(); let run_info_dir = tempdir.path().join("run"); let cache_dir = tempdir.path().join("cache"); + let datasets_dir = tempdir.path().join("datasets"); + std::fs::create_dir(&datasets_dir).unwrap(); std::fs::create_dir(&run_info_dir).unwrap(); std::fs::create_dir(&cache_dir).unwrap(); @@ -233,7 +235,7 @@ async fn test_engine_io_local_file_mount() { .add_value(CurrentAccountSubject::new_test()) .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(tempdir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() From 0284ba6a3d1c41ff55489ddcab22877ac9ee89a7 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Wed, 27 Mar 2024 22:27:12 +0100 Subject: [PATCH 10/16] Add repo support --- .../src/remote_repository_registry_impl.rs | 60 +++++++++++-------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/src/infra/core/src/remote_repository_registry_impl.rs b/src/infra/core/src/remote_repository_registry_impl.rs index 6a758ea886..40af96483d 100644 --- a/src/infra/core/src/remote_repository_registry_impl.rs +++ b/src/infra/core/src/remote_repository_registry_impl.rs @@ -36,6 +36,22 @@ impl RemoteRepositoryRegistryImpl { std::fs::create_dir_all(&repos_dir)?; Ok(Self::new(repos_dir)) } + + pub fn get_repository_file_path(&self, repo_name: &RepoName) -> Option { + let file_path = self.repos_dir.join(repo_name); + + if !file_path.exists() { + let all_repositories_stream = self.get_all_repositories(); + for repository_name in all_repositories_stream { + if &repository_name == repo_name { + return Some(self.repos_dir.join(repository_name)); + } + } + return None; + } + + Some(file_path) + } } //////////////////////////////////////////////////////////////////////////////////////// @@ -54,25 +70,22 @@ impl RemoteRepositoryRegistry for RemoteRepositoryRegistryImpl { } fn get_repository(&self, repo_name: &RepoName) -> Result { - let file_path = self.repos_dir.join(repo_name); - - if !file_path.exists() { - return Err(RepositoryNotFoundError { - repo_name: repo_name.clone(), - } - .into()); + if let Some(file_path) = self.get_repository_file_path(repo_name) { + let file = std::fs::File::open(file_path).int_err()?; + let manifest: Manifest = + serde_yaml::from_reader(&file).int_err()?; + assert_eq!(manifest.kind, "Repository"); + return Ok(manifest.content); } - let file = std::fs::File::open(&file_path).int_err()?; - let manifest: Manifest = serde_yaml::from_reader(&file).int_err()?; - assert_eq!(manifest.kind, "Repository"); - Ok(manifest.content) + Err(RepositoryNotFoundError { + repo_name: repo_name.clone(), + } + .into()) } fn add_repository(&self, repo_name: &RepoName, mut url: Url) -> Result<(), AddRepoError> { - let file_path = self.repos_dir.join(repo_name); - - if file_path.exists() { + if self.get_repository_file_path(repo_name).is_some() { return Err(RepositoryAlreadyExistsError { repo_name: repo_name.clone(), } @@ -90,23 +103,20 @@ impl RemoteRepositoryRegistry for RemoteRepositoryRegistryImpl { content: RepositoryAccessInfo { url }, }; - let file = std::fs::File::create(&file_path).int_err()?; + let file = std::fs::File::create(self.repos_dir.join(repo_name)).int_err()?; serde_yaml::to_writer(file, &manifest).int_err()?; Ok(()) } fn delete_repository(&self, repo_name: &RepoName) -> Result<(), DeleteRepoError> { - let file_path = self.repos_dir.join(repo_name); - - if !file_path.exists() { - return Err(RepositoryNotFoundError { - repo_name: repo_name.clone(), - } - .into()); + if let Some(file_path) = self.get_repository_file_path(repo_name) { + std::fs::remove_file(file_path).int_err()?; + return Ok(()); } - - std::fs::remove_file(&file_path).int_err()?; - Ok(()) + Err(RepositoryNotFoundError { + repo_name: repo_name.clone(), + } + .into()) } } From b6571679ce4cf6af206a96216d2b746aada3a4a6 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Sun, 31 Mar 2024 17:27:42 +0200 Subject: [PATCH 11/16] Fix tests --- .../tests/tests/test_compact_service_impl.rs | 39 ++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/src/infra/core/tests/tests/test_compact_service_impl.rs b/src/infra/core/tests/tests/test_compact_service_impl.rs index c5d4cd4b35..7871ce58db 100644 --- a/src/infra/core/tests/tests/test_compact_service_impl.rs +++ b/src/infra/core/tests/tests/test_compact_service_impl.rs @@ -231,7 +231,7 @@ async fn test_dataset_compact_watermark_only_blocks() { None, None, CommitOpts { - system_time: Some(harness.current_date_tame), + system_time: Some(harness.current_date_time), ..CommitOpts::default() }, ) @@ -268,7 +268,7 @@ async fn test_dataset_compact_watermark_only_blocks() { None, None, CommitOpts { - system_time: Some(harness.current_date_tame), + system_time: Some(harness.current_date_time), ..CommitOpts::default() }, ) @@ -696,24 +696,18 @@ async fn test_dataset_compact_derive_error() { let created = harness .create_dataset( MetadataFactory::dataset_snapshot() - .name("derive-foo") + .name("derive.foo") .kind(DatasetKind::Derivative) .push_event(MetadataFactory::set_data_schema().build()) .build(), ) .await; - let dataset_handle = harness - .dataset_repo - .resolve_dataset_ref(&created.dataset_handle.as_local_ref()) - .await - .unwrap(); - assert_matches!( harness .compact_svc .compact_dataset( - &dataset_handle, + &created.dataset_handle, MAX_SLICE_SIZE, MAX_SLICE_RECORDS, Some(Arc::new(NullCompactionMultiListener {})) @@ -841,40 +835,40 @@ async fn test_large_dataset_compact() { ///////////////////////////////////////////////////////////////////////////////////////// struct CompactTestHarness { + _temp_dir: tempfile::TempDir, dataset_repo: Arc, compact_svc: Arc, push_ingest_svc: Arc, verification_svc: Arc, - current_date_tame: DateTime, + current_date_time: DateTime, ctx: SessionContext, } impl CompactTestHarness { fn new() -> Self { - Self::new_local_with_authorizer(kamu_core::auth::AlwaysHappyDatasetActionAuthorizer::new()) + Self::new_local() } - fn new_local_with_authorizer( - dataset_action_authorizer: TDatasetAuthorizer, - ) -> Self { + fn new_local() -> Self { let temp_dir = tempfile::tempdir().unwrap(); let run_info_dir = temp_dir.path().join("run"); + let datasets_dir = temp_dir.path().join("datasets"); std::fs::create_dir(&run_info_dir).unwrap(); - let current_date_tame = Utc.with_ymd_and_hms(2050, 1, 1, 12, 0, 0).unwrap(); + std::fs::create_dir(&datasets_dir).unwrap(); + let current_date_time = Utc.with_ymd_and_hms(2050, 1, 1, 12, 0, 0).unwrap(); let catalog = dill::CatalogBuilder::new() .add::() .add::() .add_value(CurrentAccountSubject::new_test()) - .add_value(dataset_action_authorizer) - .bind::() .add_builder( DatasetRepositoryLocalFs::builder() - .with_root(temp_dir.path().join("datasets")) + .with_root(datasets_dir) .with_multi_tenant(false), ) .bind::() - .add_value(SystemTimeSourceStub::new_set(current_date_tame)) + .add::() + .add_value(SystemTimeSourceStub::new_set(current_date_time)) .bind::() .add::() .add_builder(CompactServiceImpl::builder().with_run_info_dir(run_info_dir.clone())) @@ -899,11 +893,12 @@ impl CompactTestHarness { let verification_svc = catalog.get_one::().unwrap(); Self { + _temp_dir: temp_dir, dataset_repo, compact_svc, push_ingest_svc, verification_svc, - current_date_tame, + current_date_time, ctx: SessionContext::new_with_config(SessionConfig::new().with_target_partitions(1)), } } @@ -1018,7 +1013,7 @@ impl CompactTestHarness { event.into(), CommitOpts { block_ref: &BlockRef::Head, - system_time: Some(self.current_date_tame), + system_time: Some(self.current_date_time), prev_block_hash: Some(Some(head)), check_object_refs: false, update_block_ref: true, From 1994b9acc94e945bf3631c63c3c6da8810546c18 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Sun, 31 Mar 2024 20:02:50 +0200 Subject: [PATCH 12/16] Update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 261501d928..2e4ff90879 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Support `ArrowJson` schema output format in QGL API and CLI commands - New `kamu system compact ` command that compacts dataslices for the given dataset +### Changed +- Case insensetive comparisons of s, s and s ## [0.170.0] - 2024-03-29 ### Added From 7a2eb009b72214f3bda969e332f5ed0d61686496 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Tue, 2 Apr 2024 19:03:28 +0200 Subject: [PATCH 13/16] Fix review comments Iter 1 - add account normalization --- .../graphql/tests/tests/test_gql_data.rs | 70 +------- .../src/remote_repository_registry_impl.rs | 1 + .../src/repos/dataset_repository_local_fs.rs | 158 +++++++++++------- .../repos/test_dataset_repository_shared.rs | 31 +++- 4 files changed, 134 insertions(+), 126 deletions(-) diff --git a/src/adapter/graphql/tests/tests/test_gql_data.rs b/src/adapter/graphql/tests/tests/test_gql_data.rs index 48a902219d..56fc746533 100644 --- a/src/adapter/graphql/tests/tests/test_gql_data.rs +++ b/src/adapter/graphql/tests/tests/test_gql_data.rs @@ -110,7 +110,7 @@ async fn create_test_dataset( #[test_log::test(tokio::test)] async fn test_dataset_schema_local_fs() { let tempdir = tempfile::tempdir().unwrap(); - let catalog = create_catalog_with_local_workspace(tempdir.path(), false); + let catalog = create_catalog_with_local_workspace(tempdir.path(), true); create_test_dataset(&catalog, tempdir.path(), None).await; let schema = kamu_adapter_graphql::schema_quiet(); @@ -165,75 +165,11 @@ async fn test_dataset_schema_local_fs() { ///////////////////////////////////////////////////////////////////////////////////////// -// #[test_group::group(engine, datafusion)] -#[test_log::test(tokio::test)] -async fn test_dataset_case_insensetive_schema_local_fs() { - let tempdir = tempfile::tempdir().unwrap(); - let catalog = create_catalog_with_local_workspace(tempdir.path(), true); - create_test_dataset( - &catalog, - tempdir.path(), - Some(AccountName::new_unchecked("KaMu")), - ) - .await; - - let schema = kamu_adapter_graphql::schema_quiet(); - let res = schema - .execute( - async_graphql::Request::new(indoc::indoc!( - r#" - { - datasets { - byOwnerAndName(accountName: "kAmU", datasetName: "FOo") { - name - data { - tail(limit: 1, schemaFormat: PARQUET_JSON, dataFormat: JSON) { - ... on DataQueryResultSuccess { - schema { content } - } - } - } - } - } - } - "# - )) - .data(catalog), - ) - .await; - assert!(res.is_ok(), "{res:?}"); - let json = serde_json::to_string(&res.data).unwrap(); - let json = serde_json::from_str::(&json).unwrap(); - let data_schema = &json["datasets"]["byOwnerAndName"]["data"]["tail"]["schema"]["content"]; - let data_schema = - serde_json::from_str::(data_schema.as_str().unwrap()).unwrap(); - assert_eq!( - data_schema, - serde_json::json!({ - "name": "arrow_schema", - "type": "struct", - "fields": [{ - "name": "offset", - "repetition": "REQUIRED", - "type": "INT64", - "logicalType": "INTEGER(64,false)" - }, { - "name": "blah", - "repetition": "REQUIRED", - "type": "BYTE_ARRAY", - "logicalType": "STRING" - }] - }) - ); -} - -///////////////////////////////////////////////////////////////////////////////////////// - #[test_group::group(engine, datafusion)] #[test_log::test(tokio::test)] async fn test_dataset_tail_local_fs() { let tempdir = tempfile::tempdir().unwrap(); - let catalog = create_catalog_with_local_workspace(tempdir.path(), false); + let catalog = create_catalog_with_local_workspace(tempdir.path(), true); create_test_dataset(&catalog, tempdir.path(), None).await; let schema = kamu_adapter_graphql::schema_quiet(); @@ -274,7 +210,7 @@ async fn test_dataset_tail_local_fs() { #[test_log::test(tokio::test)] async fn test_dataset_tail_empty_local_fs() { let tempdir = tempfile::tempdir().unwrap(); - let catalog = create_catalog_with_local_workspace(tempdir.path(), false); + let catalog = create_catalog_with_local_workspace(tempdir.path(), true); create_test_dataset(&catalog, tempdir.path(), None).await; let schema = kamu_adapter_graphql::schema_quiet(); diff --git a/src/infra/core/src/remote_repository_registry_impl.rs b/src/infra/core/src/remote_repository_registry_impl.rs index 40af96483d..90786bca96 100644 --- a/src/infra/core/src/remote_repository_registry_impl.rs +++ b/src/infra/core/src/remote_repository_registry_impl.rs @@ -41,6 +41,7 @@ impl RemoteRepositoryRegistryImpl { let file_path = self.repos_dir.join(repo_name); if !file_path.exists() { + // run full scan to support case-insensetive matches let all_repositories_stream = self.get_all_repositories(); for repository_name in all_repositories_stream { if &repository_name == repo_name { diff --git a/src/infra/core/src/repos/dataset_repository_local_fs.rs b/src/infra/core/src/repos/dataset_repository_local_fs.rs index 3e106a9b7a..520b7cacf9 100644 --- a/src/infra/core/src/repos/dataset_repository_local_fs.rs +++ b/src/infra/core/src/repos/dataset_repository_local_fs.rs @@ -7,7 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use async_trait::async_trait; @@ -114,6 +114,18 @@ impl DatasetRepositoryLocalFs { self.storage_strategy.get_dataset_path(&dataset_handle), )) } + + fn get_canonical_path_param(dataset_path: &Path) -> Result<(PathBuf, String), InternalError> { + let canonical_dataset_path = std::fs::canonicalize(dataset_path).int_err()?; + let dataset_name_str = canonical_dataset_path + .file_name() + .unwrap() + .to_str() + .unwrap() + .to_string(); + + Ok((canonical_dataset_path, dataset_name_str)) + } } ///////////////////////////////////////////////////////////////////////////////////////// @@ -241,7 +253,21 @@ impl DatasetRepository for DatasetRepositoryLocalFs { // It's okay to create a new dataset by this point let dataset_id = seed_block.event.dataset_id.clone(); - let dataset_handle = DatasetHandle::new(dataset_id, dataset_alias.clone()); + let dataset_handle = if let Some(account_name) = &dataset_alias.account_name { + let (_, canonical_account_name) = self + .storage_strategy + .resolve_account_dir(account_name) + .int_err()?; + let canonical_dataset_alias = DatasetAlias::new( + Some(canonical_account_name), + dataset_alias.dataset_name.clone(), + ); + + DatasetHandle::new(dataset_id, canonical_dataset_alias) + } else { + DatasetHandle::new(dataset_id, dataset_alias.clone()) + }; + let dataset_path = self.storage_strategy.get_dataset_path(&dataset_handle); let layout = DatasetLayout::create(&dataset_path).int_err()?; let dataset = Self::build_dataset(layout, self.event_bus.clone()); @@ -430,6 +456,11 @@ trait DatasetStorageStrategy: Sync + Send { dataset_handle: &DatasetHandle, new_name: &DatasetName, ) -> Result<(), InternalError>; + + fn resolve_account_dir( + &self, + account_name: &AccountName, + ) -> Result<(PathBuf, AccountName), ResolveDatasetError>; } #[derive(thiserror::Error, Debug)] @@ -476,10 +507,11 @@ impl DatasetSingleTenantStorageStrategy { &self, dataset_path: &PathBuf, dataset_alias: &DatasetAlias, - ) -> Result { + ) -> Result<(DatasetSummary, DatasetAlias), ResolveDatasetError> { let layout = DatasetLayout::new(dataset_path); let dataset = DatasetRepositoryLocalFs::build_dataset(layout, self.event_bus.clone()); - dataset + + let dataset_summary = dataset .get_summary(GetSummaryOpts::default()) .await .map_err(|e| { @@ -490,28 +522,16 @@ impl DatasetSingleTenantStorageStrategy { } else { ResolveDatasetError::Internal(e.int_err()) } - }) - } + })?; - async fn attempt_resolve_dataset_alias( - &self, - dataset_alias: &DatasetAlias, - ) -> Result { - assert!( - !dataset_alias.is_multi_tenant() - || dataset_alias.account_name.as_ref().unwrap() == DEFAULT_ACCOUNT_NAME, - "Multi-tenant refs shouldn't have reached down to here with earlier validations" - ); - - let dataset_path = self.dataset_path_impl(dataset_alias); - if !dataset_path.exists() { - return Err(ResolveDatasetError::NotFound(DatasetNotFoundError { - dataset_ref: dataset_alias.as_local_ref(), - })); - } + let (_, canonical_dataset_name) = + DatasetRepositoryLocalFs::get_canonical_path_param(dataset_path)?; + let canonical_dataset_alias = DatasetAlias { + dataset_name: DatasetName::new_unchecked(canonical_dataset_name.as_str()), + account_name: None, + }; - self.resolve_dataset_handle(&dataset_path, dataset_alias) - .await + Ok((dataset_summary, canonical_dataset_alias)) } async fn resolve_dataset_handle( @@ -519,11 +539,14 @@ impl DatasetSingleTenantStorageStrategy { dataset_path: &PathBuf, dataset_alias: &DatasetAlias, ) -> Result { - let summary = self + let (summary, canonical_dataset_alias) = self .attempt_resolving_summary_via_path(dataset_path, dataset_alias) .await?; - Ok(DatasetHandle::new(summary.id, dataset_alias.clone())) + Ok(DatasetHandle::new( + summary.id, + canonical_dataset_alias.clone(), + )) } } @@ -550,7 +573,7 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { } let dataset_name = DatasetName::try_from(&dataset_dir_entry.file_name()).int_err()?; let dataset_alias = DatasetAlias::new(None, dataset_name); - match self.attempt_resolve_dataset_alias(&dataset_alias).await { + match self.resolve_dataset_handle(&dataset_dir_entry.path(), &dataset_alias).await { Ok(hdl) => { yield hdl; Ok(()) } Err(ResolveDatasetError::NotFound(_)) => Ok(()), Err(e) => Err(e.int_err()) @@ -623,12 +646,12 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { let dataset_path = self.dataset_path_impl(&alias); - let summary = self + let (summary, canonical_dataset_alias) = self .attempt_resolving_summary_via_path(&dataset_path, &alias) .await?; if summary.id == *dataset_id { - return Ok(DatasetHandle::new(summary.id, alias)); + return Ok(DatasetHandle::new(summary.id, canonical_dataset_alias)); } } @@ -656,6 +679,16 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { std::fs::rename(old_dataset_path, new_dataset_path).int_err()?; Ok(()) } + + fn resolve_account_dir( + &self, + _account_name: &AccountName, + ) -> Result<(PathBuf, AccountName), ResolveDatasetError> { + Ok(( + self.root.join(DEFAULT_ACCOUNT_NAME), + AccountName::new_unchecked(DEFAULT_ACCOUNT_NAME), + )) + } } ///////////////////////////////////////////////////////////////////////////////////////// @@ -772,32 +805,6 @@ impl DatasetMultiTenantStorageStrategy { } }) } - - fn resolve_account_dir( - &self, - account_name: &AccountName, - ) -> Result { - let account_dataset_dir_path = self.root.join(account_name); - - if !account_dataset_dir_path.is_dir() { - let read_accout_dirs = std::fs::read_dir(self.root.as_path()).int_err()?; - - for read_accout_dir in read_accout_dirs { - let account_dir_name = AccountName::new_unchecked( - read_accout_dir - .int_err()? - .file_name() - .to_str() - .unwrap_or(""), - ); - if account_name == &account_dir_name { - return Ok(self.root.join(account_dir_name)); - } - } - } - - Ok(account_dataset_dir_path) - } } #[async_trait] @@ -862,10 +869,14 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy { dataset_alias: &DatasetAlias, ) -> Result { let effective_account_name = self.effective_account_name(dataset_alias); - let account_dataset_dir_path = self.resolve_account_dir(effective_account_name)?; + let (account_dataset_dir_path, _) = self.resolve_account_dir(effective_account_name)?; if account_dataset_dir_path.is_dir() { - let read_dataset_dir = std::fs::read_dir(account_dataset_dir_path).int_err()?; + let read_dataset_dir = std::fs::read_dir(account_dataset_dir_path).map_err(|_| { + ResolveDatasetError::NotFound(DatasetNotFoundError { + dataset_ref: dataset_alias.as_local_ref(), + }) + })?; for r_dataset_dir in read_dataset_dir { let dataset_dir_entry = r_dataset_dir.int_err()?; @@ -962,6 +973,39 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy { Ok(()) } + + fn resolve_account_dir( + &self, + account_name: &AccountName, + ) -> Result<(PathBuf, AccountName), ResolveDatasetError> { + let account_dataset_dir_path = self.root.join(account_name); + + if !account_dataset_dir_path.is_dir() { + let read_accout_dirs = std::fs::read_dir(self.root.as_path()).int_err()?; + + for read_accout_dir in read_accout_dirs { + let account_dir_name = AccountName::new_unchecked( + read_accout_dir + .int_err()? + .file_name() + .to_str() + .unwrap_or(""), + ); + if account_name == &account_dir_name { + return Ok((self.root.join(&account_dir_name), account_dir_name)); + } + } + return Ok((account_dataset_dir_path, account_name.clone())); + } + + let (canonical_account_dataset_dir_path, canonical_account_name) = + DatasetRepositoryLocalFs::get_canonical_path_param(&account_dataset_dir_path)?; + + Ok(( + canonical_account_dataset_dir_path, + AccountName::new_unchecked(canonical_account_name.as_str()), + )) + } } ///////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/core/tests/tests/repos/test_dataset_repository_shared.rs b/src/infra/core/tests/tests/repos/test_dataset_repository_shared.rs index 839cb589c0..2afa5d706f 100644 --- a/src/infra/core/tests/tests/repos/test_dataset_repository_shared.rs +++ b/src/infra/core/tests/tests/repos/test_dataset_repository_shared.rs @@ -66,7 +66,7 @@ pub async fn test_create_and_get_case_insensetive_dataset( account_name: Option, ) { let dataset_alias_to_create = - DatasetAlias::new(account_name.clone(), DatasetName::new_unchecked("foo")); + DatasetAlias::new(account_name.clone(), DatasetName::new_unchecked("Foo")); assert_matches!( repo.get_dataset(&dataset_alias_to_create.as_local_ref()) @@ -87,7 +87,7 @@ pub async fn test_create_and_get_case_insensetive_dataset( assert_eq!(create_result.dataset_handle.alias, dataset_alias_to_create); - let account_name_uppercase = account_name.map(|account_name_value| { + let account_name_uppercase = account_name.clone().map(|account_name_value| { AccountName::new_unchecked(&account_name_value.to_ascii_uppercase()) }); @@ -100,6 +100,33 @@ pub async fn test_create_and_get_case_insensetive_dataset( .await .is_ok()); + // Test creation another dataset for existing account with different symbols + // registry + let new_dataset_alias_to_create = DatasetAlias::new( + account_name + .clone() + .map(|a| AccountName::new_unchecked(a.to_uppercase().as_str())), + DatasetName::new_unchecked("BaR"), + ); + + let snapshot = MetadataFactory::dataset_snapshot() + .name(new_dataset_alias_to_create.clone()) + .kind(DatasetKind::Root) + .push_event(MetadataFactory::set_polling_source().build()) + .build(); + + let create_result = repo.create_dataset_from_snapshot(snapshot).await.unwrap(); + + // Assert dataset_name eq to new alias and account_name eq to old existing one + assert_eq!( + create_result.dataset_handle.alias.dataset_name, + new_dataset_alias_to_create.dataset_name + ); + assert_eq!( + create_result.dataset_handle.alias.account_name, + dataset_alias_to_create.account_name + ); + // Now test name collision let create_result = repo .create_dataset( From 97905fcb2d6ef0c32af239dc93221ba3e96c7d9b Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Wed, 3 Apr 2024 15:01:12 +0200 Subject: [PATCH 14/16] Fix review comments Iter 1 - remove duplicates and marks --- CHANGELOG.md | 2 +- src/domain/opendatafabric/src/identity/dataset_identity.rs | 1 - src/infra/core/src/repos/dataset_repository_local_fs.rs | 6 +++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e4ff90879..cb25ed732b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Support `ArrowJson` schema output format in QGL API and CLI commands - New `kamu system compact ` command that compacts dataslices for the given dataset ### Changed -- Case insensetive comparisons of s, s and s +- Case insensitive comparisons of `dataset`s, `account`s and `repo`s ## [0.170.0] - 2024-03-29 ### Added diff --git a/src/domain/opendatafabric/src/identity/dataset_identity.rs b/src/domain/opendatafabric/src/identity/dataset_identity.rs index 75823c59df..403e652aba 100644 --- a/src/domain/opendatafabric/src/identity/dataset_identity.rs +++ b/src/domain/opendatafabric/src/identity/dataset_identity.rs @@ -179,7 +179,6 @@ macro_rules! newtype_str { impl Hash for $typ { fn hash(&self, state: &mut H) { Self::into_lowercase(&self.0).hash(state); - Self::into_lowercase(&self.0).hash(state); } } diff --git a/src/infra/core/src/repos/dataset_repository_local_fs.rs b/src/infra/core/src/repos/dataset_repository_local_fs.rs index 520b7cacf9..802384c40a 100644 --- a/src/infra/core/src/repos/dataset_repository_local_fs.rs +++ b/src/infra/core/src/repos/dataset_repository_local_fs.rs @@ -981,11 +981,11 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy { let account_dataset_dir_path = self.root.join(account_name); if !account_dataset_dir_path.is_dir() { - let read_accout_dirs = std::fs::read_dir(self.root.as_path()).int_err()?; + let read_account_dirs = std::fs::read_dir(self.root.as_path()).int_err()?; - for read_accout_dir in read_accout_dirs { + for read_account_dir in read_account_dirs { let account_dir_name = AccountName::new_unchecked( - read_accout_dir + read_account_dir .int_err()? .file_name() .to_str() From 1f519820ed6d5ee4479ee90cf684e68a21ecebcf Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Wed, 3 Apr 2024 15:04:58 +0200 Subject: [PATCH 15/16] Fix review comments Iter 2 - make resolve acc dir unreacheble --- src/infra/core/src/repos/dataset_repository_local_fs.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/infra/core/src/repos/dataset_repository_local_fs.rs b/src/infra/core/src/repos/dataset_repository_local_fs.rs index 802384c40a..c9a61a8bc2 100644 --- a/src/infra/core/src/repos/dataset_repository_local_fs.rs +++ b/src/infra/core/src/repos/dataset_repository_local_fs.rs @@ -684,10 +684,7 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { &self, _account_name: &AccountName, ) -> Result<(PathBuf, AccountName), ResolveDatasetError> { - Ok(( - self.root.join(DEFAULT_ACCOUNT_NAME), - AccountName::new_unchecked(DEFAULT_ACCOUNT_NAME), - )) + unreachable!() } } From d022f4f7ac94d158f2549ba3d90fff0db18b4226 Mon Sep 17 00:00:00 2001 From: Sergei Zaychenko Date: Wed, 3 Apr 2024 12:22:28 -0700 Subject: [PATCH 16/16] Reshaped canonical alias operation --- .../src/repos/dataset_repository_local_fs.rs | 107 +++++++++--------- 1 file changed, 55 insertions(+), 52 deletions(-) diff --git a/src/infra/core/src/repos/dataset_repository_local_fs.rs b/src/infra/core/src/repos/dataset_repository_local_fs.rs index c9a61a8bc2..d7efd0eee7 100644 --- a/src/infra/core/src/repos/dataset_repository_local_fs.rs +++ b/src/infra/core/src/repos/dataset_repository_local_fs.rs @@ -252,21 +252,12 @@ impl DatasetRepository for DatasetRepositoryLocalFs { } // It's okay to create a new dataset by this point - let dataset_id = seed_block.event.dataset_id.clone(); - let dataset_handle = if let Some(account_name) = &dataset_alias.account_name { - let (_, canonical_account_name) = self - .storage_strategy - .resolve_account_dir(account_name) - .int_err()?; - let canonical_dataset_alias = DatasetAlias::new( - Some(canonical_account_name), - dataset_alias.dataset_name.clone(), - ); - - DatasetHandle::new(dataset_id, canonical_dataset_alias) - } else { - DatasetHandle::new(dataset_id, dataset_alias.clone()) - }; + let dataset_handle = DatasetHandle::new( + seed_block.event.dataset_id.clone(), + self.storage_strategy + .canonical_dataset_alias(dataset_alias) + .int_err()?, + ); let dataset_path = self.storage_strategy.get_dataset_path(&dataset_handle); let layout = DatasetLayout::create(&dataset_path).int_err()?; @@ -457,10 +448,10 @@ trait DatasetStorageStrategy: Sync + Send { new_name: &DatasetName, ) -> Result<(), InternalError>; - fn resolve_account_dir( + fn canonical_dataset_alias( &self, - account_name: &AccountName, - ) -> Result<(PathBuf, AccountName), ResolveDatasetError>; + raw_alias: &DatasetAlias, + ) -> Result; } #[derive(thiserror::Error, Debug)] @@ -680,11 +671,11 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy { Ok(()) } - fn resolve_account_dir( + fn canonical_dataset_alias( &self, - _account_name: &AccountName, - ) -> Result<(PathBuf, AccountName), ResolveDatasetError> { - unreachable!() + raw_alias: &DatasetAlias, + ) -> Result { + Ok(raw_alias.clone()) } } @@ -802,6 +793,39 @@ impl DatasetMultiTenantStorageStrategy { } }) } + + fn resolve_account_dir( + &self, + account_name: &AccountName, + ) -> Result<(PathBuf, AccountName), ResolveDatasetError> { + let account_dataset_dir_path = self.root.join(account_name); + + if !account_dataset_dir_path.is_dir() { + let read_account_dirs = std::fs::read_dir(self.root.as_path()).int_err()?; + + for read_account_dir in read_account_dirs { + let account_dir_name = AccountName::new_unchecked( + read_account_dir + .int_err()? + .file_name() + .to_str() + .unwrap_or(""), + ); + if account_name == &account_dir_name { + return Ok((self.root.join(&account_dir_name), account_dir_name)); + } + } + return Ok((account_dataset_dir_path, account_name.clone())); + } + + let (canonical_account_dataset_dir_path, canonical_account_name) = + DatasetRepositoryLocalFs::get_canonical_path_param(&account_dataset_dir_path)?; + + Ok(( + canonical_account_dataset_dir_path, + AccountName::new_unchecked(canonical_account_name.as_str()), + )) + } } #[async_trait] @@ -971,37 +995,16 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy { Ok(()) } - fn resolve_account_dir( + fn canonical_dataset_alias( &self, - account_name: &AccountName, - ) -> Result<(PathBuf, AccountName), ResolveDatasetError> { - let account_dataset_dir_path = self.root.join(account_name); - - if !account_dataset_dir_path.is_dir() { - let read_account_dirs = std::fs::read_dir(self.root.as_path()).int_err()?; - - for read_account_dir in read_account_dirs { - let account_dir_name = AccountName::new_unchecked( - read_account_dir - .int_err()? - .file_name() - .to_str() - .unwrap_or(""), - ); - if account_name == &account_dir_name { - return Ok((self.root.join(&account_dir_name), account_dir_name)); - } - } - return Ok((account_dataset_dir_path, account_name.clone())); - } - - let (canonical_account_dataset_dir_path, canonical_account_name) = - DatasetRepositoryLocalFs::get_canonical_path_param(&account_dataset_dir_path)?; - - Ok(( - canonical_account_dataset_dir_path, - AccountName::new_unchecked(canonical_account_name.as_str()), - )) + raw_alias: &DatasetAlias, + ) -> Result { + Ok(if let Some(account_name) = &raw_alias.account_name { + let (_, canonical_account_name) = self.resolve_account_dir(account_name).int_err()?; + DatasetAlias::new(Some(canonical_account_name), raw_alias.dataset_name.clone()) + } else { + raw_alias.clone() + }) } }