Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 136 additions & 1 deletion contrib/nosql/redis/redis_group_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ package redis

import (
"context"
"sync"
"time"

"github.com/redis/go-redis/v9"

"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/os/gtime"
Expand Down Expand Up @@ -131,12 +134,37 @@ func (r GroupGeneric) Move(ctx context.Context, key string, db int) (int64, erro
}

// Del removes the specified keys.
// a key is ignored if it does not exist.
// A key is ignored if it does not exist.
//
// In Cluster mode with multiple keys, it performs per-key deletion to avoid CROSSSLOT errors.
// In Standalone/Sentinel mode, it issues a single DEL command with all keys.
//
// It returns the number of keys that were removed.
//
// https://redis.io/commands/del/
func (r GroupGeneric) Del(ctx context.Context, keys ...string) (int64, error) {
if len(keys) == 0 {
return 0, nil
}
// Cluster mode: per-key deletion to avoid CROSSSLOT errors.
if r.isClusterMode() && len(keys) > 1 {
var (
totalDeleted int64
firstErr error
)
for _, key := range keys {
n, err := r.Operation.Do(ctx, "Del", key)
if err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
totalDeleted += n.Int64()
}
return totalDeleted, firstErr
}
// Standalone/Sentinel: single DEL command with all keys.
v, err := r.Operation.Do(ctx, "Del", gconv.Interfaces(keys)...)
return v.Int64(), err
}
Expand Down Expand Up @@ -366,3 +394,110 @@ func (r GroupGeneric) PTTL(ctx context.Context, key string) (int64, error) {
v, err := r.Operation.Do(ctx, "PTTL", key)
return v.Int64(), err
}

// ScanAll iterates all keys matching the given pattern across the entire keyspace.
// In Standalone/Sentinel mode, it repeatedly calls Scan until the cursor returns to 0.
// In Cluster mode, it transparently iterates all master nodes and aggregates results.
// This is the safe alternative to Keys, which can block the server on large datasets.
func (r GroupGeneric) ScanAll(ctx context.Context, option ...gredis.ScanOption) ([]string, error) {
if r.isClusterMode() {
return r.scanAllCluster(ctx, option...)
}
return r.scanAllStandalone(ctx, option...)
}

// scanAllStandalone performs a full keyspace scan in standalone or sentinel mode.
// It repeatedly calls Scan with the given cursor until the cursor returns to 0,
// accumulating all matching keys into a single slice.
func (r GroupGeneric) scanAllStandalone(ctx context.Context, option ...gredis.ScanOption) ([]string, error) {
var (
allKeys []string
cursor uint64
)
for {
nextCursor, keys, err := r.Scan(ctx, cursor, option...)
if err != nil {
return nil, err
}
allKeys = append(allKeys, keys...)
if nextCursor == 0 {
break
}
cursor = nextCursor
}
return allKeys, nil
}

// scanAllCluster performs a full keyspace scan across all master nodes in a Redis Cluster.
// It uses ForEachMaster to iterate each master node independently, scanning each node
// until its cursor returns to 0, and aggregates all keys into a single slice.
func (r GroupGeneric) scanAllCluster(ctx context.Context, option ...gredis.ScanOption) ([]string, error) {
redisInstance := r.getRedisInstance()
if redisInstance == nil {
// Fallback to standalone scan if the underlying adapter is not *Redis.
return r.scanAllStandalone(ctx, option...)
}
clusterClient, ok := redisInstance.client.(*redis.ClusterClient)
if !ok {
return r.scanAllStandalone(ctx, option...)
}

// Build MATCH and COUNT arguments from ScanOption.
var (
matchPattern = "*"
countHint = int64(100)
)
if len(option) > 0 {
if option[0].Match != "" {
matchPattern = option[0].Match
}
if option[0].Count > 0 {
countHint = int64(option[0].Count)
}
}

var (
allKeys []string
mu sync.Mutex
)

err := clusterClient.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error {
var cursor uint64
for {
keys, nextCursor, err := client.Scan(ctx, cursor, matchPattern, countHint).Result()
if err != nil {
return err
}
if len(keys) > 0 {
mu.Lock()
allKeys = append(allKeys, keys...)
mu.Unlock()
}
if nextCursor == 0 {
return nil
}
cursor = nextCursor
}
})
return allKeys, err
}

// isClusterMode checks whether the underlying Redis client is running in Cluster mode.
// It type-asserts the internal client to *redis.ClusterClient.
func (r GroupGeneric) isClusterMode() bool {
redisInstance := r.getRedisInstance()
if redisInstance == nil {
return false
}
_, ok := redisInstance.client.(*redis.ClusterClient)
return ok
}

// getRedisInstance retrieves the underlying *Redis instance from the AdapterOperation.
// Returns nil if the operation is not backed by the concrete *Redis driver.
func (r GroupGeneric) getRedisInstance() *Redis {
if redisInstance, ok := r.Operation.(*Redis); ok {
return redisInstance
}
return nil
}
Loading
Loading