Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.ServiceLoader;
Expand Down Expand Up @@ -715,7 +714,6 @@ public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)
nameIdentifierForLock,
LockType.WRITE,
() -> {
catalogCache.invalidate(ident);
try {
CatalogEntity updatedCatalog =
store.update(
Expand All @@ -734,15 +732,20 @@ public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)

return newCatalogBuilder.build();
});
// Invalidate after store.update() so that any background thread that tries to reload
// the old catalog identifier from the store (after the invalidate) will get
// NoSuchCatalogException instead of stale data. Invalidating before the update creates
// a window where the background thread repopulates the cache with the old entity.
catalogCache.invalidate(ident);
// The old fileset catalog's provider is "hadoop", whereas the new fileset catalog's
// provider is "fileset", still using "hadoop" will lead to catalog loading issue. So
// after reading the catalog entity, we convert it to the new fileset catalog entity.
CatalogEntity convertedCatalog = convertFilesetCatalogEntity(updatedCatalog);
return Objects.requireNonNull(
catalogCache.get(
convertedCatalog.nameIdentifier(),
id -> createCatalogWrapper(convertedCatalog, null)))
.catalog;
// Use put() instead of get() to force the updated wrapper into the cache, preventing
// a background thread from overwriting it with stale data between invalidate and put.
CatalogWrapper newWrapper = createCatalogWrapper(convertedCatalog, null);
catalogCache.put(convertedCatalog.nameIdentifier(), newWrapper);
return newWrapper.catalog;

} catch (NoSuchEntityException ne) {
LOG.warn("Catalog {} does not exist", ident, ne);
Expand Down Expand Up @@ -804,8 +807,11 @@ public boolean dropCatalog(NameIdentifier ident, boolean force)
}

// Finally, delete the catalog entity as well as all its sub-entities from the store.
// Invalidate after store.delete() to prevent a background thread from repopulating
// the cache with stale data between invalidate and delete.
boolean deleted = store.delete(ident, EntityType.CATALOG, true);
catalogCache.invalidate(ident);
return store.delete(ident, EntityType.CATALOG, true);
return deleted;

} catch (NoSuchMetalakeException | NoSuchCatalogException ignored) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ public <E extends Entity & HasIdentifier> void put(E e, boolean overwritten)
public <E extends Entity & HasIdentifier> E update(
NameIdentifier ident, Class<E> type, Entity.EntityType entityType, Function<E, E> updater)
throws IOException, NoSuchEntityException, EntityAlreadyExistsException {
E updatedEntity = backend.update(ident, entityType, updater);
cache.invalidate(ident, entityType);
return backend.update(ident, entityType, updater);
return updatedEntity;
}

@Override
Expand Down Expand Up @@ -183,8 +184,9 @@ public <E extends Entity & HasIdentifier> List<E> batchGet(
public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade)
throws IOException {
try {
boolean deleted = backend.delete(ident, entityType, cascade);
cache.invalidate(ident, entityType);
return backend.delete(ident, entityType, cascade);
return deleted;
} catch (NoSuchEntityException e) {
Comment on lines 186 to 189
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If backend.delete(...) throws NoSuchEntityException, this method returns false without invalidating the cache entry. That can leave a stale cached entity even though the backend reports it missing. Invalidate the cache in the exception path as well (or use a finally).

Copilot uses AI. Check for mistakes.
return false;
}
Expand Down Expand Up @@ -319,9 +321,9 @@ public void insertRelation(
Entity.EntityType dstType,
boolean override)
throws IOException {
backend.insertRelation(relType, srcIdentifier, srcType, dstIdentifier, dstType, override);
cache.invalidate(srcIdentifier, srcType, relType);
cache.invalidate(dstIdentifier, dstType, relType);
backend.insertRelation(relType, srcIdentifier, srcType, dstIdentifier, dstType, override);
}

@Override
Expand All @@ -333,11 +335,12 @@ public <E extends Entity & HasIdentifier> List<E> updateEntityRelations(
NameIdentifier[] destEntitiesToRemove)
throws IOException, NoSuchEntityException, EntityAlreadyExistsException {

// We need to clear the cache of the source entity and all destination entities being added or
// removed. This ensures that any subsequent reads will fetch the updated relations from the
// backend. For example, if we are adding a tag to table, we need to invalidate the cache for
// that table and the tag being added or removed. Otherwise, we might return stale data if we
// list all tags for that table or all tables for that tag.
// Invalidate after the backend write, not before. Invalidating before creates a window where
// a concurrent read can repopulate the cache with stale pre-commit data.
List<E> result =
backend.updateEntityRelations(
relType, srcEntityIdent, srcEntityType, destEntitiesToAdd, destEntitiesToRemove);

cache.invalidate(srcEntityIdent, srcEntityType, relType);
for (NameIdentifier destToAdd : destEntitiesToAdd) {
cache.invalidate(destToAdd, srcEntityType, relType);
Expand All @@ -347,8 +350,7 @@ public <E extends Entity & HasIdentifier> List<E> updateEntityRelations(
cache.invalidate(destToRemove, srcEntityType, relType);
}

return backend.updateEntityRelations(
relType, srcEntityIdent, srcEntityType, destEntitiesToAdd, destEntitiesToRemove);
return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.gravitino.TestCatalog.PROPERTY_KEY6_PREFIX;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
Expand All @@ -36,6 +37,7 @@
import java.time.Instant;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.CatalogChange;
Expand Down Expand Up @@ -69,6 +71,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

public class TestCatalogManager {

Expand Down Expand Up @@ -540,8 +543,8 @@ public void testAlterCatalog() {
}

@Test
public void testDropCatalog() throws Exception {
NameIdentifier ident = NameIdentifier.of("metalake", "test41");
void testAlterCatalogRefreshesCacheAfterStoreUpdate() throws Exception {
NameIdentifier ident = NameIdentifier.of("metalake", "cache_race_test");
Map<String, String> props =
ImmutableMap.of(
"provider",
Expand All @@ -552,46 +555,63 @@ public void testDropCatalog() throws Exception {
"value2",
PROPERTY_KEY5_PREFIX + "1",
"value3");
String comment = "comment";

Catalog catalog =
catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, comment, props);

// Test drop catalog
Exception exception =
Assertions.assertThrows(
CatalogInUseException.class, () -> catalogManager.dropCatalog(ident));
Assertions.assertTrue(exception.getMessage().contains("Catalog metalake.test41 is in use"));

Assertions.assertDoesNotThrow(() -> catalogManager.disableCatalog(ident));

CatalogEntity oldEntity = entityStore.get(ident, EntityType.CATALOG, CatalogEntity.class);
FieldUtils.writeField(catalog, "entity", oldEntity, true);

CatalogManager.CatalogWrapper catalogWrapper =
Mockito.mock(CatalogManager.CatalogWrapper.class);
Capability capability = Mockito.mock(Capability.class);
CapabilityResult unsupportedResult = CapabilityResult.unsupported("Not managed");
Mockito.doReturn(catalogWrapper).when(catalogManager).loadCatalogAndWrap(ident);
Mockito.doReturn(catalog).when(catalogWrapper).catalog();
Mockito.doReturn(capability).when(catalogWrapper).capabilities();
Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any());

boolean dropped = catalogManager.dropCatalog(ident);
Assertions.assertTrue(dropped);

// Test drop non-existed catalog
NameIdentifier ident1 = NameIdentifier.of("metalake", "test42");
boolean dropped1 = catalogManager.dropCatalog(ident1);
Assertions.assertFalse(dropped1);

// Drop operation will update the cache
catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, "comment", props);
CatalogEntity originalEntity = entityStore.get(ident, EntityType.CATALOG, CatalogEntity.class);
FieldUtils.writeField(catalog, "entity", originalEntity, true);

CatalogManager.CatalogWrapper staleWrapper =
Mockito.mock(CatalogManager.CatalogWrapper.class, Mockito.RETURNS_DEEP_STUBS);
Mockito.doReturn(catalog).when(staleWrapper).catalog();

CatalogManager.CatalogWrapper freshWrapper =
Mockito.mock(CatalogManager.CatalogWrapper.class, Mockito.RETURNS_DEEP_STUBS);
Catalog freshCatalog = Mockito.mock(Catalog.class);
CatalogEntity freshEntity =
CatalogEntity.builder()
.withId(originalEntity.id())
.withName("cache_race_test_renamed")
.withNamespace(originalEntity.namespace())
.withType(originalEntity.getType())
.withProvider(originalEntity.getProvider())
.withComment(originalEntity.getComment())
.withProperties(originalEntity.getProperties())
.withAuditInfo(originalEntity.auditInfo())
.build();
FieldUtils.writeField(freshCatalog, "entity", freshEntity, true);
Mockito.doReturn(freshCatalog).when(freshWrapper).catalog();

AtomicBoolean staleInserted = new AtomicBoolean(false);
Answer<CatalogManager.CatalogWrapper> insertStaleWrapper =
invocation -> {
if (staleInserted.compareAndSet(false, true)) {
catalogManager.getCatalogCache().put(NameIdentifier.of("metalake", "cache_race_test_renamed"), staleWrapper);
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The long put(...) call exceeds typical Google Java Style line length and also reduces readability in tests. Please wrap the arguments onto multiple lines consistent with the rest of this file’s formatting.

Suggested change
catalogManager.getCatalogCache().put(NameIdentifier.of("metalake", "cache_race_test_renamed"), staleWrapper);
catalogManager
.getCatalogCache()
.put(
NameIdentifier.of("metalake", "cache_race_test_renamed"), staleWrapper);

Copilot uses AI. Check for mistakes.
}
return null;
};
Comment on lines +579 to +581
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Answer returns null, which would cause an NPE if the stub were actually used as the return value of createCatalogWrapper(...). Ensure the stub returns a valid CatalogWrapper (and trigger the stale-cache insertion as a side effect if needed).

Copilot generated this review using guidance from repository custom instructions.
Mockito.doAnswer(insertStaleWrapper)
.when(catalogManager)
.createCatalogWrapper(any(CatalogEntity.class), eq(null));
Mockito.doReturn(freshWrapper)
.when(catalogManager)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test stubs CatalogManager#createCatalogWrapper(...), but the method is private in CatalogManager, so this code will not compile (and cannot be stubbed with plain Mockito). Introduce a non-private seam for wrapper creation (e.g., injectable factory) or adjust the test to stub public/protected methods instead.

Copilot uses AI. Check for mistakes.
.createCatalogWrapper(any(CatalogEntity.class), eq(null));
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doReturn(freshWrapper) stubbing here uses the same matchers as the preceding doAnswer(...), which overrides the earlier stub. As a result, the intended race simulation never executes. Use sequential stubbing (doAnswer(...).doReturn(...)) or consolidate into a single Answer.

Copilot generated this review using guidance from repository custom instructions.

Catalog alteredCatalog =
catalogManager.alterCatalog(ident, CatalogChange.rename("cache_race_test_renamed"));

Assertions.assertEquals("cache_race_test_renamed", alteredCatalog.name());
CatalogManager.CatalogWrapper cachedWrapper =
catalogManager
.getCatalogCache()
.getIfPresent(NameIdentifier.of("metalake", "cache_race_test_renamed"));
Assertions.assertSame(freshWrapper, cachedWrapper);
Assertions.assertNull(catalogManager.getCatalogCache().getIfPresent(ident));
}

@Test
public void testForceDropCatalog() throws Exception {
NameIdentifier ident = NameIdentifier.of("metalake", "test41");
void testDropCatalogInvalidatesCacheAfterStoreDelete() throws Exception {
NameIdentifier ident = NameIdentifier.of("metalake", "cache_drop_test");
Map<String, String> props =
ImmutableMap.of(
"provider",
Expand All @@ -602,35 +622,29 @@ public void testForceDropCatalog() throws Exception {
"value2",
PROPERTY_KEY5_PREFIX + "1",
"value3");
String comment = "comment";

Catalog catalog =
catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, comment, props);
SchemaEntity schemaEntity =
SchemaEntity.builder()
.withId(RandomIdGenerator.INSTANCE.nextId())
.withName("test_schema1")
.withNamespace(Namespace.of("metalake", "test41"))
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();
entityStore.put(schemaEntity);
catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, "comment", props);
Assertions.assertDoesNotThrow(() -> catalogManager.disableCatalog(ident));
CatalogEntity entity = entityStore.get(ident, EntityType.CATALOG, CatalogEntity.class);
FieldUtils.writeField(catalog, "entity", entity, true);

CatalogManager.CatalogWrapper catalogWrapper =
Mockito.mock(CatalogManager.CatalogWrapper.class);
Mockito.mock(CatalogManager.CatalogWrapper.class, Mockito.RETURNS_DEEP_STUBS);
Capability capability = Mockito.mock(Capability.class);
CapabilityResult unsupportedResult = CapabilityResult.unsupported("Not managed");
Mockito.doReturn(catalogWrapper).when(catalogManager).loadCatalogAndWrap(ident);
Mockito.doReturn(catalog).when(catalogWrapper).catalog();
Mockito.doReturn(capability).when(catalogWrapper).capabilities();
Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any());
Mockito.doReturn(catalog).when(catalogWrapper).catalog();
Mockito.doThrow(new RuntimeException("Failed connect"))
.when(catalogWrapper)
.doWithSchemaOps(any());
Assertions.assertTrue(catalogManager.dropCatalog(ident, true));
}

catalogManager.getCatalogCache().put(ident, catalogWrapper);
boolean dropped = catalogManager.dropCatalog(ident);

Assertions.assertTrue(dropped);
Assertions.assertFalse(entityStore.exists(ident, EntityType.CATALOG));
Assertions.assertNull(catalogManager.getCatalogCache().getIfPresent(ident));
}
@Test
void testAlterMutableProperties() {
NameIdentifier ident = NameIdentifier.of("metalake", "test51");
Expand Down
Loading
Loading