Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,5 @@ private ConfigConstants() {}
public static final String VERSION_1_3_0 = "1.3.0";

/** The current version of backend storage initialization script. */
public static final String CURRENT_SCRIPT_VERSION = VERSION_1_2_0;
public static final String CURRENT_SCRIPT_VERSION = VERSION_1_3_0;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.storage.relational.mapper;

import java.util.List;
import org.apache.gravitino.storage.relational.po.auth.EntityChangeRecord;
import org.apache.ibatis.annotations.DeleteProvider;
import org.apache.ibatis.annotations.InsertProvider;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.SelectProvider;

/**
* A MyBatis Mapper for entity_change_log table operations.
*
* <p>This append-only log tracks structural changes to entities (create, alter, drop) and is used
* by the entity change poller to drive targeted invalidation of the metadataIdCache on HA peer
* nodes.
*/
public interface EntityChangeLogMapper {

String ENTITY_CHANGE_LOG_TABLE_NAME = "entity_change_log";

@SelectProvider(type = EntityChangeLogSQLProviderFactory.class, method = "selectEntityChanges")
List<EntityChangeRecord> selectChanges(
@Param("createdAtAfter") long createdAtAfter, @Param("maxRows") int maxRows);

@InsertProvider(type = EntityChangeLogSQLProviderFactory.class, method = "insertEntityChange")
void insertChange(
@Param("metalakeName") String metalakeName,
@Param("entityType") String entityType,
@Param("fullName") String fullName,
@Param("operateType") String operateType,
@Param("createdAt") long createdAt);

@DeleteProvider(type = EntityChangeLogSQLProviderFactory.class, method = "pruneOldEntityChanges")
void pruneOldEntries(@Param("before") long before);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.storage.relational.mapper;

import static org.apache.gravitino.storage.relational.mapper.EntityChangeLogMapper.ENTITY_CHANGE_LOG_TABLE_NAME;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
import org.apache.gravitino.storage.relational.mapper.provider.base.EntityChangeLogBaseSQLProvider;
import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
import org.apache.ibatis.annotations.Param;

public class EntityChangeLogSQLProviderFactory {

private static final Map<JDBCBackendType, EntityChangeLogBaseSQLProvider>
ENTITY_CHANGE_LOG_SQL_PROVIDER_MAP =
ImmutableMap.of(
JDBCBackendType.MYSQL, new EntityChangeLogMySQLProvider(),
JDBCBackendType.H2, new EntityChangeLogH2Provider(),
JDBCBackendType.POSTGRESQL, new EntityChangeLogPostgreSQLProvider());
Comment on lines +32 to +37

public static EntityChangeLogBaseSQLProvider getProvider() {
String databaseId =
SqlSessionFactoryHelper.getInstance()
.getSqlSessionFactory()
.getConfiguration()
.getDatabaseId();
JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
return ENTITY_CHANGE_LOG_SQL_PROVIDER_MAP.get(jdbcBackendType);
}

static class EntityChangeLogMySQLProvider extends EntityChangeLogBaseSQLProvider {}

static class EntityChangeLogH2Provider extends EntityChangeLogBaseSQLProvider {}

static class EntityChangeLogPostgreSQLProvider extends EntityChangeLogBaseSQLProvider {
@Override
public String pruneOldEntityChanges(@Param("before") long before) {
return "DELETE FROM "
+ ENTITY_CHANGE_LOG_TABLE_NAME
+ " WHERE id IN (SELECT id FROM "
+ ENTITY_CHANGE_LOG_TABLE_NAME
+ " WHERE created_at < #{before} LIMIT 1000)";
}
}

public static String selectEntityChanges(
@Param("createdAtAfter") long createdAtAfter, @Param("maxRows") int maxRows) {
return getProvider().selectEntityChanges(createdAtAfter, maxRows);
}

public static String insertEntityChange(
@Param("metalakeName") String metalakeName,
@Param("entityType") String entityType,
@Param("fullName") String fullName,
@Param("operateType") String operateType,
@Param("createdAt") long createdAt) {
return getProvider()
.insertEntityChange(metalakeName, entityType, fullName, operateType, createdAt);
}

public static String pruneOldEntityChanges(@Param("before") long before) {
return getProvider().pruneOldEntityChanges(before);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import org.apache.gravitino.storage.relational.po.ExtendedGroupPO;
import org.apache.gravitino.storage.relational.po.GroupPO;
import org.apache.gravitino.storage.relational.po.auth.GroupAuthInfo;
import org.apache.ibatis.annotations.DeleteProvider;
import org.apache.ibatis.annotations.InsertProvider;
import org.apache.ibatis.annotations.Param;
Expand Down Expand Up @@ -88,4 +89,10 @@ Integer updateGroupMeta(
method = "deleteGroupMetasByLegacyTimeline")
Integer deleteGroupMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);

@UpdateProvider(type = GroupMetaSQLProviderFactory.class, method = "touchGroupUpdatedAt")
void touchUpdatedAt(@Param("groupId") long groupId, @Param("now") long now);

@SelectProvider(type = GroupMetaSQLProviderFactory.class, method = "getGroupInfoByUserId")
List<GroupAuthInfo> getGroupInfoByUserId(@Param("userId") long userId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,12 @@ public static String deleteGroupMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return getProvider().deleteGroupMetasByLegacyTimeline(legacyTimeline, limit);
}

public static String touchGroupUpdatedAt(@Param("groupId") long groupId, @Param("now") long now) {
return getProvider().touchGroupUpdatedAt(groupId, now);
}

public static String getGroupInfoByUserId(@Param("userId") long userId) {
return getProvider().getGroupInfoByUserId(userId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.gravitino.storage.relational.po.OwnerRelPO;
import org.apache.gravitino.storage.relational.po.UserOwnerRelPO;
import org.apache.gravitino.storage.relational.po.UserPO;
import org.apache.gravitino.storage.relational.po.auth.ChangedOwnerInfo;
import org.apache.gravitino.storage.relational.po.auth.OwnerInfo;
import org.apache.ibatis.annotations.InsertProvider;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.SelectProvider;
Expand Down Expand Up @@ -95,4 +97,12 @@ void softDeleteOwnerRelByOwnerIdAndType(
method = "deleteOwnerMetasByLegacyTimeline")
Integer deleteOwnerMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);

@SelectProvider(
type = OwnerMetaSQLProviderFactory.class,
method = "selectOwnerByMetadataObjectId")
OwnerInfo selectOwnerByMetadataObjectId(@Param("metadataObjectId") long metadataObjectId);

@SelectProvider(type = OwnerMetaSQLProviderFactory.class, method = "selectChangedOwners")
List<ChangedOwnerInfo> selectChangedOwners(@Param("updatedAtAfter") long updatedAtAfter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,13 @@ public static String batchSelectUserOwnerMetaByMetadataObjectIdAndType(
return getProvider()
.batchSelectUserOwnerMetaByMetadataObjectIdAndType(metadataObjectIds, metadataObjectType);
}

public static String selectOwnerByMetadataObjectId(
@Param("metadataObjectId") long metadataObjectId) {
return getProvider().selectOwnerByMetadataObjectId(metadataObjectId);
}

public static String selectChangedOwners(@Param("updatedAtAfter") long updatedAtAfter) {
return getProvider().selectChangedOwners(updatedAtAfter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.List;
import org.apache.gravitino.storage.relational.po.RolePO;
import org.apache.gravitino.storage.relational.po.auth.RoleUpdatedAt;
import org.apache.ibatis.annotations.DeleteProvider;
import org.apache.ibatis.annotations.InsertProvider;
import org.apache.ibatis.annotations.Param;
Expand Down Expand Up @@ -93,4 +94,10 @@ Integer updateRoleMeta(
method = "deleteRoleMetasByLegacyTimeline")
Integer deleteRoleMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);

@UpdateProvider(type = RoleMetaSQLProviderFactory.class, method = "touchRoleUpdatedAt")
void touchUpdatedAt(@Param("roleId") long roleId, @Param("now") long now);

@SelectProvider(type = RoleMetaSQLProviderFactory.class, method = "batchGetRoleUpdatedAt")
List<RoleUpdatedAt> batchGetUpdatedAt(@Param("roleIds") List<Long> roleIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.gravitino.storage.relational.mapper;

import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
import org.apache.gravitino.storage.relational.mapper.provider.base.RoleMetaBaseSQLProvider;
Expand Down Expand Up @@ -101,4 +102,12 @@ public static String deleteRoleMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return getProvider().deleteRoleMetasByLegacyTimeline(legacyTimeline, limit);
}

public static String touchRoleUpdatedAt(@Param("roleId") long roleId, @Param("now") long now) {
return getProvider().touchRoleUpdatedAt(roleId, now);
}

public static String batchGetRoleUpdatedAt(@Param("roleIds") List<Long> roleIds) {
return getProvider().batchGetRoleUpdatedAt(roleIds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import org.apache.gravitino.storage.relational.po.ExtendedUserPO;
import org.apache.gravitino.storage.relational.po.UserPO;
import org.apache.gravitino.storage.relational.po.auth.UserAuthInfo;
import org.apache.ibatis.annotations.DeleteProvider;
import org.apache.ibatis.annotations.InsertProvider;
import org.apache.ibatis.annotations.Param;
Expand Down Expand Up @@ -88,4 +89,11 @@ Integer updateUserMeta(
method = "deleteUserMetasByLegacyTimeline")
Integer deleteUserMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);

@UpdateProvider(type = UserMetaSQLProviderFactory.class, method = "touchUserUpdatedAt")
void touchUpdatedAt(@Param("userId") long userId, @Param("now") long now);

@SelectProvider(type = UserMetaSQLProviderFactory.class, method = "getUserInfo")
UserAuthInfo getUserInfo(
@Param("metalakeName") String metalakeName, @Param("userName") String userName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,13 @@ public static String deleteUserMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return getProvider().deleteUserMetasByLegacyTimeline(legacyTimeline, limit);
}

public static String touchUserUpdatedAt(@Param("userId") long userId, @Param("now") long now) {
return getProvider().touchUserUpdatedAt(userId, now);
}

public static String getUserInfo(
@Param("metalakeName") String metalakeName, @Param("userName") String userName) {
return getProvider().getUserInfo(metalakeName, userName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
import org.apache.gravitino.storage.relational.mapper.EntityChangeLogMapper;
import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class DefaultMapperPackageProvider implements MapperPackageProvider {
public List<Class<?>> getMapperClasses() {
return ImmutableList.of(
CatalogMetaMapper.class,
EntityChangeLogMapper.class,
FilesetMetaMapper.class,
FilesetVersionMapper.class,
FunctionMetaMapper.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.storage.relational.mapper.provider.base;

import static org.apache.gravitino.storage.relational.mapper.EntityChangeLogMapper.ENTITY_CHANGE_LOG_TABLE_NAME;

import org.apache.ibatis.annotations.Param;

public class EntityChangeLogBaseSQLProvider {

public String selectEntityChanges(
@Param("createdAtAfter") long createdAtAfter, @Param("maxRows") int maxRows) {
return "SELECT metalake_name as metalakeName, entity_type as entityType,"
+ " full_name as fullName, operate_type as operateType, created_at as createdAt"
+ " FROM "
+ ENTITY_CHANGE_LOG_TABLE_NAME
+ " WHERE created_at > #{createdAtAfter} ORDER BY created_at LIMIT #{maxRows}";
}

public String insertEntityChange(
@Param("metalakeName") String metalakeName,
@Param("entityType") String entityType,
@Param("fullName") String fullName,
@Param("operateType") String operateType,
@Param("createdAt") long createdAt) {
return "INSERT INTO "
+ ENTITY_CHANGE_LOG_TABLE_NAME
+ " (metalake_name, entity_type, full_name, operate_type, created_at)"
+ " VALUES (#{metalakeName}, #{entityType}, #{fullName}, #{operateType}, #{createdAt})";
}

public String pruneOldEntityChanges(@Param("before") long before) {
return "DELETE FROM "
+ ENTITY_CHANGE_LOG_TABLE_NAME
+ " WHERE created_at < #{before} LIMIT 1000";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,17 @@ public String deleteGroupMetasByLegacyTimeline(
+ GROUP_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}";
}

public String touchGroupUpdatedAt(@Param("groupId") long groupId, @Param("now") long now) {
return "UPDATE " + GROUP_TABLE_NAME + " SET updated_at = #{now} WHERE group_id = #{groupId}";
}

public String getGroupInfoByUserId(@Param("userId") long userId) {
return "SELECT gm.group_id as groupId, gm.updated_at as updatedAt"
+ " FROM "
+ GROUP_TABLE_NAME
+ " gm"
+ " JOIN group_user_rel gu ON gm.group_id = gu.group_id AND gu.deleted_at = 0"
+ " WHERE gu.user_id = #{userId} AND gm.deleted_at = 0";
}
}
Loading
Loading