Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,104 @@ void testDeleteProducerOffsetsAuthorization() throws Exception {
guestAdmin.deleteProducerOffsets(producerId).get();
}

@Test
void testDatabaseExistsAuthorization() throws Exception {
String testDbName = "test_db_exists_auth";

// 1. Create database as root (super user can do everything)
rootAdmin.createDatabase(testDbName, DatabaseDescriptor.EMPTY, false).get();

// 2. Guest user WITHOUT DESCRIBE permission should get false (hiding existence)
assertThat(guestAdmin.databaseExists(testDbName).get())
.as("Guest without DESCRIBE permission should not see database exists")
.isFalse();

// 3. Non-existent database should also return false
assertThat(guestAdmin.databaseExists("non_existent_db").get())
.as("Non-existent database should return false")
.isFalse();

// 4. Root user (super user) should see database exists
assertThat(rootAdmin.databaseExists(testDbName).get())
.as("Root user should always see database exists")
.isTrue();

// 5. Grant DESCRIBE permission to guest on the database
List<AclBinding> aclBindings =
Collections.singletonList(
new AclBinding(
Resource.database(testDbName),
new AccessControlEntry(
guestPrincipal,
"*",
OperationType.DESCRIBE,
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindings).all().get();
FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);

// 6. Now guest WITH DESCRIBE permission should see database exists
assertThat(guestAdmin.databaseExists(testDbName).get())
.as("Guest with DESCRIBE permission should see database exists")
.isTrue();

// 7. Verify non-existent database still returns false (not an authorization issue)
assertThat(guestAdmin.databaseExists("non_existent_db").get())
.as("Non-existent database should still return false even with permissions")
.isFalse();

// Cleanup
rootAdmin.dropDatabase(testDbName, true, true).get();
}

@Test
void testTableExistsAuthorization() throws Exception {
TablePath testTablePath =
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_table_exists_auth");

// 1. Create table as root
rootAdmin.createTable(testTablePath, DATA1_TABLE_DESCRIPTOR_PK, false).get();
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(
rootAdmin.getTableInfo(testTablePath).get().getTableId());

// 2. Guest WITHOUT permission should get false
assertThat(guestAdmin.tableExists(testTablePath).get())
.as("Guest without permission should not see table exists")
.isFalse();

// 3. Root user should see table exists
assertThat(rootAdmin.tableExists(testTablePath).get())
.as("Root user should see table exists")
.isTrue();

// 4. Grant DESCRIBE permission on the table
List<AclBinding> aclBindings =
Collections.singletonList(
new AclBinding(
Resource.table(testTablePath),
new AccessControlEntry(
guestPrincipal,
"*",
OperationType.DESCRIBE,
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindings).all().get();
FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);

// 5. Guest WITH permission should now see table exists
assertThat(guestAdmin.tableExists(testTablePath).get())
.as("Guest with DESCRIBE permission should see table exists")
.isTrue();

// 6. Verify non-existent table returns false
TablePath nonExistentTable =
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "non_existent_table");
assertThat(guestAdmin.tableExists(nonExistentTable).get())
.as("Non-existent table should return false")
.isFalse();

// Cleanup
rootAdmin.dropTable(testTablePath, true).get();
}

private static Configuration initConfig() {
Configuration conf = new Configuration();
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,25 @@ public CompletableFuture<GetDatabaseInfoResponse> getDatabaseInfo(

@Override
public CompletableFuture<DatabaseExistsResponse> databaseExists(DatabaseExistsRequest request) {
// By design: database exists not need to check database authorization.
String databaseName = request.getDatabaseName();
DatabaseExistsResponse response = new DatabaseExistsResponse();
boolean exists = metadataManager.databaseExists(request.getDatabaseName());
boolean exists = metadataManager.databaseExists(databaseName);

// For security, return false for both non-existent and unauthorized databases
if (exists) {
try {
authorizeDatabase(OperationType.DESCRIBE, databaseName);
// If we get here, user is authorized
} catch (Exception e) {
// Not authorized - hide database existence
LOG.debug(
"User {} not authorized to access database '{}', returning false",
currentSession().getPrincipal(),
databaseName);
exists = false;
}
}

response.setExists(exists);
return CompletableFuture.completedFuture(response);
}
Expand Down Expand Up @@ -308,9 +324,25 @@ public CompletableFuture<GetTableSchemaResponse> getTableSchema(GetTableSchemaRe

@Override
public CompletableFuture<TableExistsResponse> tableExists(TableExistsRequest request) {
// By design: table exists not need to check table authorization.
TablePath tablePath = toTablePath(request.getTablePath());
TableExistsResponse response = new TableExistsResponse();
boolean exists = metadataManager.tableExists(toTablePath(request.getTablePath()));
boolean exists = metadataManager.tableExists(tablePath);

// For security, return false for both non-existent and unauthorized tables
if (exists) {
try {
authorizeTable(OperationType.DESCRIBE, tablePath);
// If we get here, user is authorized
} catch (Exception e) {
// Not authorized - hide table existence
LOG.debug(
"User {} not authorized to access table '{}', returning false",
currentSession().getPrincipal(),
tablePath);
exists = false;
}
}

response.setExists(exists);
return CompletableFuture.completedFuture(response);
}
Expand Down
Loading