-
Notifications
You must be signed in to change notification settings - Fork 803
[#10737] fix(core): Avoid blocking dropCatalog on imported schemas #10738
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3af686d
c3e9723
30e2dab
b0088ca
fd0a696
f780666
642c7e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,12 +46,14 @@ | |
| import org.apache.gravitino.GravitinoEnv; | ||
| import org.apache.gravitino.NameIdentifier; | ||
| import org.apache.gravitino.Namespace; | ||
| import org.apache.gravitino.Schema; | ||
| import org.apache.gravitino.connector.capability.Capability; | ||
| import org.apache.gravitino.connector.capability.CapabilityResult; | ||
| import org.apache.gravitino.exceptions.CatalogAlreadyExistsException; | ||
| import org.apache.gravitino.exceptions.CatalogInUseException; | ||
| import org.apache.gravitino.exceptions.NoSuchCatalogException; | ||
| import org.apache.gravitino.exceptions.NoSuchMetalakeException; | ||
| import org.apache.gravitino.exceptions.NoSuchSchemaException; | ||
| import org.apache.gravitino.lock.LockManager; | ||
| import org.apache.gravitino.meta.AuditInfo; | ||
| import org.apache.gravitino.meta.BaseMetalake; | ||
|
|
@@ -589,6 +591,167 @@ public void testDropCatalog() throws Exception { | |
| Assertions.assertNull(catalogManager.getCatalogCache().getIfPresent(ident)); | ||
| } | ||
|
|
||
| @Test | ||
| public void testDropCatalogSkipsImportedSchemas() throws Exception { | ||
| NameIdentifier ident = NameIdentifier.of("metalake", "test41"); | ||
| Map<String, String> props = | ||
| ImmutableMap.of( | ||
| "provider", | ||
| "test", | ||
| PROPERTY_KEY1, | ||
| "value1", | ||
| PROPERTY_KEY2, | ||
| "value2", | ||
| PROPERTY_KEY5_PREFIX + "1", | ||
| "value3"); | ||
| String comment = "comment"; | ||
|
|
||
| Catalog catalog = | ||
| catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, comment, props); | ||
| Mockito.doCallRealMethod().when(catalogManager).loadCatalogAndWrap(ident); | ||
| Assertions.assertDoesNotThrow(() -> catalogManager.disableCatalog(ident)); | ||
| CatalogEntity catalogEntity = entityStore.get(ident, EntityType.CATALOG, CatalogEntity.class); | ||
| FieldUtils.writeField(catalog, "entity", catalogEntity, true); | ||
|
|
||
| SchemaEntity importedSchemaEntity = | ||
| SchemaEntity.builder() | ||
| .withId(RandomIdGenerator.INSTANCE.nextId()) | ||
| .withName("imported_schema") | ||
| .withNamespace(Namespace.of("metalake", "test41")) | ||
| .withAuditInfo( | ||
| AuditInfo.builder() | ||
| .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) | ||
| .withCreateTime(Instant.now()) | ||
| .build()) | ||
| .build(); | ||
| entityStore.put(importedSchemaEntity); | ||
|
|
||
| Schema importedSchema = Mockito.mock(Schema.class); | ||
| // Non-empty properties without StringIdentifier simulate an imported schema on a backend | ||
| // that supports property storage (e.g., Hive, Iceberg) but did not create this schema | ||
| // via Gravitino. | ||
| Mockito.doReturn(ImmutableMap.of("owner", "external")).when(importedSchema).properties(); | ||
| CatalogManager.CatalogWrapper wrapper = Mockito.mock(CatalogManager.CatalogWrapper.class); | ||
| Capability capability = Mockito.mock(Capability.class); | ||
| CapabilityResult unsupportedResult = CapabilityResult.unsupported("Not managed"); | ||
| Mockito.doReturn(wrapper).when(catalogManager).loadCatalogAndWrap(ident); | ||
| Mockito.doReturn(catalog).when(wrapper).catalog(); | ||
| Mockito.doReturn(capability).when(wrapper).capabilities(); | ||
| Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any()); | ||
| Mockito.doReturn( | ||
| new NameIdentifier[] {NameIdentifier.of("metalake", "test41", "imported_schema")}) | ||
| .doReturn(importedSchema) | ||
| .when(wrapper) | ||
| .doWithSchemaOps(any()); | ||
|
|
||
| // Imported schema (no StringIdentifier in external catalog properties) should not block drop. | ||
| Assertions.assertTrue(catalogManager.dropCatalog(ident)); | ||
| } | ||
|
|
||
| @Test | ||
| public void testDropCatalogIgnoresMissingSchema() throws Exception { | ||
| NameIdentifier ident = NameIdentifier.of("metalake", "test41"); | ||
| Map<String, String> props = | ||
| ImmutableMap.of( | ||
| "provider", | ||
| "test", | ||
| PROPERTY_KEY1, | ||
| "value1", | ||
| PROPERTY_KEY2, | ||
| "value2", | ||
| PROPERTY_KEY5_PREFIX + "1", | ||
| "value3"); | ||
| String comment = "comment"; | ||
|
|
||
| Catalog catalog = | ||
| catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, comment, props); | ||
| Mockito.doCallRealMethod().when(catalogManager).loadCatalogAndWrap(ident); | ||
| Assertions.assertDoesNotThrow(() -> catalogManager.disableCatalog(ident)); | ||
| CatalogEntity catalogEntity = entityStore.get(ident, EntityType.CATALOG, CatalogEntity.class); | ||
| FieldUtils.writeField(catalog, "entity", catalogEntity, true); | ||
|
|
||
| SchemaEntity schemaEntity = | ||
| SchemaEntity.builder() | ||
| .withId(RandomIdGenerator.INSTANCE.nextId()) | ||
| .withName("default") | ||
| .withNamespace(Namespace.of("metalake", "test41")) | ||
| .withAuditInfo( | ||
| AuditInfo.builder() | ||
| .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) | ||
| .withCreateTime(Instant.now()) | ||
| .build()) | ||
| .build(); | ||
| entityStore.put(schemaEntity); | ||
|
|
||
| CatalogManager.CatalogWrapper wrapper = Mockito.mock(CatalogManager.CatalogWrapper.class); | ||
| Capability capability = Mockito.mock(Capability.class); | ||
| CapabilityResult unsupportedResult = CapabilityResult.unsupported("Not managed"); | ||
| Mockito.doReturn(wrapper).when(catalogManager).loadCatalogAndWrap(ident); | ||
| Mockito.doReturn(catalog).when(wrapper).catalog(); | ||
| Mockito.doReturn(capability).when(wrapper).capabilities(); | ||
| Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any()); | ||
| Mockito.doReturn(new NameIdentifier[] {NameIdentifier.of("metalake", "test41", "default")}) | ||
| .doThrow(new NoSuchSchemaException("Schema not found")) | ||
| .when(wrapper) | ||
| .doWithSchemaOps(any()); | ||
|
|
||
| // Schema disappearing between listSchemas and loadSchema should not block drop. | ||
| Assertions.assertTrue(catalogManager.dropCatalog(ident)); | ||
| } | ||
|
Comment on lines
+652
to
+700
|
||
|
|
||
| @Test | ||
| public void testDropCatalogFailsOnSchemaClassificationError() throws Exception { | ||
| NameIdentifier ident = NameIdentifier.of("metalake", "test41"); | ||
| Map<String, String> props = | ||
| ImmutableMap.of( | ||
| "provider", | ||
| "test", | ||
| PROPERTY_KEY1, | ||
| "value1", | ||
| PROPERTY_KEY2, | ||
| "value2", | ||
| PROPERTY_KEY5_PREFIX + "1", | ||
| "value3"); | ||
| String comment = "comment"; | ||
|
|
||
| Catalog catalog = | ||
| catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, comment, props); | ||
| Mockito.doCallRealMethod().when(catalogManager).loadCatalogAndWrap(ident); | ||
| Assertions.assertDoesNotThrow(() -> catalogManager.disableCatalog(ident)); | ||
| CatalogEntity catalogEntity = entityStore.get(ident, EntityType.CATALOG, CatalogEntity.class); | ||
| FieldUtils.writeField(catalog, "entity", catalogEntity, true); | ||
|
|
||
| SchemaEntity schemaEntity = | ||
| SchemaEntity.builder() | ||
| .withId(RandomIdGenerator.INSTANCE.nextId()) | ||
| .withName("test_schema1") | ||
| .withNamespace(Namespace.of("metalake", "test41")) | ||
| .withAuditInfo( | ||
| AuditInfo.builder() | ||
| .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) | ||
| .withCreateTime(Instant.now()) | ||
| .build()) | ||
| .build(); | ||
| entityStore.put(schemaEntity); | ||
|
|
||
| CatalogManager.CatalogWrapper wrapper = Mockito.mock(CatalogManager.CatalogWrapper.class); | ||
| Capability capability = Mockito.mock(Capability.class); | ||
| CapabilityResult unsupportedResult = CapabilityResult.unsupported("Not managed"); | ||
| Mockito.doReturn(wrapper).when(catalogManager).loadCatalogAndWrap(ident); | ||
| Mockito.doReturn(catalog).when(wrapper).catalog(); | ||
| Mockito.doReturn(capability).when(wrapper).capabilities(); | ||
| Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any()); | ||
| Mockito.doReturn(new NameIdentifier[] {NameIdentifier.of("metalake", "test41", "test_schema1")}) | ||
| .doThrow(new RuntimeException("Failed connect")) | ||
| .when(wrapper) | ||
| .doWithSchemaOps(any()); | ||
|
|
||
| // Unexpected errors during schema classification should propagate (fail-closed). | ||
| RuntimeException ex = | ||
| Assertions.assertThrows(RuntimeException.class, () -> catalogManager.dropCatalog(ident)); | ||
| Assertions.assertTrue(ex.getCause().getMessage().contains("Failed connect")); | ||
| } | ||
|
|
||
| @Test | ||
| public void testForceDropCatalog() throws Exception { | ||
| NameIdentifier ident = NameIdentifier.of("metalake", "test41"); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests stub
catalogManager.loadCatalogAndWrap(ident)on a static Mockito spy, but the@BeforeEach/@AfterEachreset only clears the entity store and doesn’t reset Mockito stubbings. Since multiple tests reuse the same catalog identifier (metalake.test41), stubs can leak across test methods and make ordering matter. Consider either using distinct catalog names per test or resetting the spy inreset()(e.g.,Mockito.reset(catalogManager)and then re-spy/re-stub any common behavior).