diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index 8c301c89a2..b0aa384541 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -1272,6 +1272,104 @@ void testDeleteProducerOffsetsAuthorization() throws Exception { guestAdmin.deleteProducerOffsets(producerId).get(); } + @Test + void testDatabaseExistsAuthorization() throws Exception { + String testDbName = "test_db_exists_auth"; + + // 1. Create database as root (super user can do everything) + rootAdmin.createDatabase(testDbName, DatabaseDescriptor.EMPTY, false).get(); + + // 2. Guest user WITHOUT DESCRIBE permission should get false (hiding existence) + assertThat(guestAdmin.databaseExists(testDbName).get()) + .as("Guest without DESCRIBE permission should not see database exists") + .isFalse(); + + // 3. Non-existent database should also return false + assertThat(guestAdmin.databaseExists("non_existent_db").get()) + .as("Non-existent database should return false") + .isFalse(); + + // 4. Root user (super user) should see database exists + assertThat(rootAdmin.databaseExists(testDbName).get()) + .as("Root user should always see database exists") + .isTrue(); + + // 5. Grant DESCRIBE permission to guest on the database + List aclBindings = + Collections.singletonList( + new AclBinding( + Resource.database(testDbName), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.DESCRIBE, + PermissionType.ALLOW))); + rootAdmin.createAcls(aclBindings).all().get(); + FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); + + // 6. Now guest WITH DESCRIBE permission should see database exists + assertThat(guestAdmin.databaseExists(testDbName).get()) + .as("Guest with DESCRIBE permission should see database exists") + .isTrue(); + + // 7. Verify non-existent database still returns false (not an authorization issue) + assertThat(guestAdmin.databaseExists("non_existent_db").get()) + .as("Non-existent database should still return false even with permissions") + .isFalse(); + + // Cleanup + rootAdmin.dropDatabase(testDbName, true, true).get(); + } + + @Test + void testTableExistsAuthorization() throws Exception { + TablePath testTablePath = + TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_table_exists_auth"); + + // 1. Create table as root + rootAdmin.createTable(testTablePath, DATA1_TABLE_DESCRIPTOR_PK, false).get(); + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady( + rootAdmin.getTableInfo(testTablePath).get().getTableId()); + + // 2. Guest WITHOUT permission should get false + assertThat(guestAdmin.tableExists(testTablePath).get()) + .as("Guest without permission should not see table exists") + .isFalse(); + + // 3. Root user should see table exists + assertThat(rootAdmin.tableExists(testTablePath).get()) + .as("Root user should see table exists") + .isTrue(); + + // 4. Grant DESCRIBE permission on the table + List aclBindings = + Collections.singletonList( + new AclBinding( + Resource.table(testTablePath), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.DESCRIBE, + PermissionType.ALLOW))); + rootAdmin.createAcls(aclBindings).all().get(); + FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); + + // 5. Guest WITH permission should now see table exists + assertThat(guestAdmin.tableExists(testTablePath).get()) + .as("Guest with DESCRIBE permission should see table exists") + .isTrue(); + + // 6. Verify non-existent table returns false + TablePath nonExistentTable = + TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "non_existent_table"); + assertThat(guestAdmin.tableExists(nonExistentTable).get()) + .as("Non-existent table should return false") + .isFalse(); + + // Cleanup + rootAdmin.dropTable(testTablePath, true).get(); + } + private static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index 2946fb93bc..bfc2fa58d5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -245,9 +245,25 @@ public CompletableFuture getDatabaseInfo( @Override public CompletableFuture databaseExists(DatabaseExistsRequest request) { - // By design: database exists not need to check database authorization. + String databaseName = request.getDatabaseName(); DatabaseExistsResponse response = new DatabaseExistsResponse(); - boolean exists = metadataManager.databaseExists(request.getDatabaseName()); + boolean exists = metadataManager.databaseExists(databaseName); + + // For security, return false for both non-existent and unauthorized databases + if (exists) { + try { + authorizeDatabase(OperationType.DESCRIBE, databaseName); + // If we get here, user is authorized + } catch (Exception e) { + // Not authorized - hide database existence + LOG.debug( + "User {} not authorized to access database '{}', returning false", + currentSession().getPrincipal(), + databaseName); + exists = false; + } + } + response.setExists(exists); return CompletableFuture.completedFuture(response); } @@ -308,9 +324,25 @@ public CompletableFuture getTableSchema(GetTableSchemaRe @Override public CompletableFuture tableExists(TableExistsRequest request) { - // By design: table exists not need to check table authorization. + TablePath tablePath = toTablePath(request.getTablePath()); TableExistsResponse response = new TableExistsResponse(); - boolean exists = metadataManager.tableExists(toTablePath(request.getTablePath())); + boolean exists = metadataManager.tableExists(tablePath); + + // For security, return false for both non-existent and unauthorized tables + if (exists) { + try { + authorizeTable(OperationType.DESCRIBE, tablePath); + // If we get here, user is authorized + } catch (Exception e) { + // Not authorized - hide table existence + LOG.debug( + "User {} not authorized to access table '{}', returning false", + currentSession().getPrincipal(), + tablePath); + exists = false; + } + } + response.setExists(exists); return CompletableFuture.completedFuture(response); }