From c52a4d95edbf5dd3b9f0d5726eb0a4a73c8921c2 Mon Sep 17 00:00:00 2001 From: wangxc Date: Fri, 12 Jun 2026 19:07:29 +0800 Subject: [PATCH] feat: add Pipeline, TxPipeline, Watch and Cluster-safe ScanAll/Del to gredis MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题描述(What): 为 gredis 抽象层和 go-redis 驱动添加 Pipeline 批量执行、TxPipeline 事务、Watch 乐观锁,以及 Cluster 安全的 ScanAll 和 Del 操作。 根本原因(Why): GoFrame Redis 模块此前不支持 Pipeline/Transaction 批量操作,也无法在 Cluster 模式下安全执行多 key 删除和全量扫描。 修复内容(How): 在抽象层新增 Cmd future-result 类型、Pipeliner 接口体系(6 个命令组 + Tx)、在 AdapterOperation 扩展 Pipeline/TxPipeline/Watch 方法;在驱动层完整实现 99 个 pipeline 组方法,并增加 Cluster 感知的 ScanAll(ForEachMaster 遍历)和逐 key Del(规避 CROSSSLOT)。 技术细节(Details): Cmd 使用导出的 SetVal/SetErr 供驱动跨模块填充结果;redisPipeliner 通过 cmdEntry + populate 闭包在 Exec 后统一填充;ScanAll 在 Cluster 模式下使用 ForEachMaster + sync.Mutex 聚合结果;Del 在 Cluster 模式下逐 key 删除并收集首个错误。 影响范围(Impact): database/gredis(抽象层接口+Cmd 类型)、contrib/nosql/redis(驱动实现+Cluster 增强) --- contrib/nosql/redis/redis_group_generic.go | 137 ++- contrib/nosql/redis/redis_pipeline.go | 951 ++++++++++++++++++ .../nosql/redis/redis_z_unit_pipeline_test.go | 348 +++++++ database/gredis/gredis_adapter.go | 17 + database/gredis/gredis_cmd.go | 55 + database/gredis/gredis_pipeline.go | 196 ++++ database/gredis/gredis_redis.go | 46 + database/gredis/gredis_redis_group_generic.go | 6 + database/gredis/gredis_z_pipeline_test.go | 203 ++++ .../gredis-pipeline-tx-cluster/.openspec.yaml | 2 + .../gredis-pipeline-tx-cluster/design.md | 91 ++ .../gredis-pipeline-tx-cluster/proposal.md | 37 + .../specs/gredis-pipeline-tx-cluster/spec.md | 73 ++ .../gredis-pipeline-tx-cluster/tasks.md | 48 + 14 files changed, 2209 insertions(+), 1 deletion(-) create mode 100644 contrib/nosql/redis/redis_pipeline.go create mode 100644 contrib/nosql/redis/redis_z_unit_pipeline_test.go create mode 100644 database/gredis/gredis_cmd.go create mode 100644 database/gredis/gredis_pipeline.go create mode 100644 database/gredis/gredis_z_pipeline_test.go create mode 100644 openspec/changes/gredis-pipeline-tx-cluster/.openspec.yaml create mode 100644 openspec/changes/gredis-pipeline-tx-cluster/design.md create mode 100644 openspec/changes/gredis-pipeline-tx-cluster/proposal.md create mode 100644 openspec/changes/gredis-pipeline-tx-cluster/specs/gredis-pipeline-tx-cluster/spec.md create mode 100644 openspec/changes/gredis-pipeline-tx-cluster/tasks.md diff --git a/contrib/nosql/redis/redis_group_generic.go b/contrib/nosql/redis/redis_group_generic.go index b0e9e312749..409bbc990e0 100644 --- a/contrib/nosql/redis/redis_group_generic.go +++ b/contrib/nosql/redis/redis_group_generic.go @@ -8,8 +8,11 @@ package redis import ( "context" + "sync" "time" + "github.com/redis/go-redis/v9" + "github.com/gogf/gf/v2/container/gvar" "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/os/gtime" @@ -131,12 +134,37 @@ func (r GroupGeneric) Move(ctx context.Context, key string, db int) (int64, erro } // Del removes the specified keys. -// a key is ignored if it does not exist. +// A key is ignored if it does not exist. +// +// In Cluster mode with multiple keys, it performs per-key deletion to avoid CROSSSLOT errors. +// In Standalone/Sentinel mode, it issues a single DEL command with all keys. // // It returns the number of keys that were removed. // // https://redis.io/commands/del/ func (r GroupGeneric) Del(ctx context.Context, keys ...string) (int64, error) { + if len(keys) == 0 { + return 0, nil + } + // Cluster mode: per-key deletion to avoid CROSSSLOT errors. + if r.isClusterMode() && len(keys) > 1 { + var ( + totalDeleted int64 + firstErr error + ) + for _, key := range keys { + n, err := r.Operation.Do(ctx, "Del", key) + if err != nil { + if firstErr == nil { + firstErr = err + } + continue + } + totalDeleted += n.Int64() + } + return totalDeleted, firstErr + } + // Standalone/Sentinel: single DEL command with all keys. v, err := r.Operation.Do(ctx, "Del", gconv.Interfaces(keys)...) return v.Int64(), err } @@ -366,3 +394,110 @@ func (r GroupGeneric) PTTL(ctx context.Context, key string) (int64, error) { v, err := r.Operation.Do(ctx, "PTTL", key) return v.Int64(), err } + +// ScanAll iterates all keys matching the given pattern across the entire keyspace. +// In Standalone/Sentinel mode, it repeatedly calls Scan until the cursor returns to 0. +// In Cluster mode, it transparently iterates all master nodes and aggregates results. +// This is the safe alternative to Keys, which can block the server on large datasets. +func (r GroupGeneric) ScanAll(ctx context.Context, option ...gredis.ScanOption) ([]string, error) { + if r.isClusterMode() { + return r.scanAllCluster(ctx, option...) + } + return r.scanAllStandalone(ctx, option...) +} + +// scanAllStandalone performs a full keyspace scan in standalone or sentinel mode. +// It repeatedly calls Scan with the given cursor until the cursor returns to 0, +// accumulating all matching keys into a single slice. +func (r GroupGeneric) scanAllStandalone(ctx context.Context, option ...gredis.ScanOption) ([]string, error) { + var ( + allKeys []string + cursor uint64 + ) + for { + nextCursor, keys, err := r.Scan(ctx, cursor, option...) + if err != nil { + return nil, err + } + allKeys = append(allKeys, keys...) + if nextCursor == 0 { + break + } + cursor = nextCursor + } + return allKeys, nil +} + +// scanAllCluster performs a full keyspace scan across all master nodes in a Redis Cluster. +// It uses ForEachMaster to iterate each master node independently, scanning each node +// until its cursor returns to 0, and aggregates all keys into a single slice. +func (r GroupGeneric) scanAllCluster(ctx context.Context, option ...gredis.ScanOption) ([]string, error) { + redisInstance := r.getRedisInstance() + if redisInstance == nil { + // Fallback to standalone scan if the underlying adapter is not *Redis. + return r.scanAllStandalone(ctx, option...) + } + clusterClient, ok := redisInstance.client.(*redis.ClusterClient) + if !ok { + return r.scanAllStandalone(ctx, option...) + } + + // Build MATCH and COUNT arguments from ScanOption. + var ( + matchPattern = "*" + countHint = int64(100) + ) + if len(option) > 0 { + if option[0].Match != "" { + matchPattern = option[0].Match + } + if option[0].Count > 0 { + countHint = int64(option[0].Count) + } + } + + var ( + allKeys []string + mu sync.Mutex + ) + + err := clusterClient.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error { + var cursor uint64 + for { + keys, nextCursor, err := client.Scan(ctx, cursor, matchPattern, countHint).Result() + if err != nil { + return err + } + if len(keys) > 0 { + mu.Lock() + allKeys = append(allKeys, keys...) + mu.Unlock() + } + if nextCursor == 0 { + return nil + } + cursor = nextCursor + } + }) + return allKeys, err +} + +// isClusterMode checks whether the underlying Redis client is running in Cluster mode. +// It type-asserts the internal client to *redis.ClusterClient. +func (r GroupGeneric) isClusterMode() bool { + redisInstance := r.getRedisInstance() + if redisInstance == nil { + return false + } + _, ok := redisInstance.client.(*redis.ClusterClient) + return ok +} + +// getRedisInstance retrieves the underlying *Redis instance from the AdapterOperation. +// Returns nil if the operation is not backed by the concrete *Redis driver. +func (r GroupGeneric) getRedisInstance() *Redis { + if redisInstance, ok := r.Operation.(*Redis); ok { + return redisInstance + } + return nil +} diff --git a/contrib/nosql/redis/redis_pipeline.go b/contrib/nosql/redis/redis_pipeline.go new file mode 100644 index 00000000000..2244720dabb --- /dev/null +++ b/contrib/nosql/redis/redis_pipeline.go @@ -0,0 +1,951 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// This file implements the gredis.Pipeliner interface by wrapping go-redis's +// redis.Pipeliner. It provides Pipeline, TxPipeline, and Watch operations, +// along with typed command groups that queue commands and return *gredis.Cmd +// futures. Results are populated after Exec is called. + +package redis + +import ( + "context" + "errors" + "reflect" + "time" + + "github.com/redis/go-redis/v9" + + "github.com/gogf/gf/v2/container/gvar" + "github.com/gogf/gf/v2/database/gredis" + "github.com/gogf/gf/v2/encoding/gjson" + "github.com/gogf/gf/v2/os/gtime" + "github.com/gogf/gf/v2/util/gconv" + "github.com/gogf/gf/v2/util/gutil" +) + +// cmdEntry pairs a gredis.Cmd future with a populate closure that fills its +// result after the underlying go-redis command has been executed by Exec. +type cmdEntry struct { + // cmd is the gredis.Cmd future returned to the caller. + cmd *gredis.Cmd + + // populate reads the result from the underlying go-redis command and + // converts it to a *gvar.Var. It is called once during Exec. + populate func() (*gvar.Var, error) +} + +// redisPipeliner implements gredis.Pipeliner by wrapping go-redis's +// redis.Pipeliner. Each queued command creates a cmdEntry holding a +// gredis.Cmd future and a populate closure. Exec sends all queued +// commands in a single round-trip, then iterates cmdEntries to populate +// each Cmd's result. +type redisPipeliner struct { + // pipe is the underlying go-redis pipeliner (regular or transactional). + pipe redis.Pipeliner + + // cmdList holds all queued commands in insertion order. + cmdList []*cmdEntry +} + +// newRedisPipeliner creates a redisPipeliner wrapping the given go-redis pipeliner. +func newRedisPipeliner(pipe redis.Pipeliner) *redisPipeliner { + return &redisPipeliner{ + pipe: pipe, + } +} + +// Do queues a raw Redis command and returns its future Cmd. +// The command is not sent to the server until Exec is called. +// Struct, map, and slice values (except []byte) are JSON-serialized +// to match the behavior of Conn.Do. +func (p *redisPipeliner) Do(ctx context.Context, command string, args ...any) *gredis.Cmd { + // Serialize struct/map/slice values to JSON, matching Conn.Do behavior. + for k, v := range args { + reflectInfo := gutil.OriginTypeAndKind(v) + switch reflectInfo.OriginKind { + case reflect.Struct, reflect.Map, reflect.Slice, reflect.Array: + if _, ok := v.([]byte); !ok { + marshaled, err := gjson.Marshal(v) + if err != nil { + cmd := &gredis.Cmd{} + cmd.SetErr(err) + return cmd + } + args[k] = marshaled + } + } + } + + goCmd := p.pipe.Do(ctx, append([]any{command}, args...)...) + cmd := &gredis.Cmd{} + p.cmdList = append(p.cmdList, &cmdEntry{ + cmd: cmd, + populate: func() (*gvar.Var, error) { + return pipelineResultToVar(goCmd.Result()) + }, + }) + return cmd +} + +// Exec sends all queued commands to the server in a single batch, +// populates each queued Cmd's result, and returns all Cmds. +// After Exec returns, all Cmd objects returned by queued commands are populated. +func (p *redisPipeliner) Exec(ctx context.Context) ([]*gredis.Cmd, error) { + _, err := p.pipe.Exec(ctx) + // Allow redis.Nil as non-error for pipeline results. + if err != nil && !errors.Is(err, redis.Nil) { + return nil, err + } + + for _, entry := range p.cmdList { + val, populateErr := entry.populate() + entry.cmd.SetVal(val) + if populateErr != nil && !errors.Is(populateErr, redis.Nil) { + entry.cmd.SetErr(populateErr) + } + } + + result := make([]*gredis.Cmd, len(p.cmdList)) + for i, entry := range p.cmdList { + result[i] = entry.cmd + } + return result, nil +} + +// Discard discards all queued commands without sending them to the server. +func (p *redisPipeliner) Discard() { + p.pipe.Discard() + p.cmdList = p.cmdList[:0] +} + +// PipelineGroupGeneric returns the generic command group for pipeline mode. +func (p *redisPipeliner) PipelineGroupGeneric() gredis.IPipelineGroupGeneric { + return pipelineGroupGeneric{pipeliner: p} +} + +// PipelineGroupHash returns the hash command group for pipeline mode. +func (p *redisPipeliner) PipelineGroupHash() gredis.IPipelineGroupHash { + return pipelineGroupHash{pipeliner: p} +} + +// PipelineGroupString returns the string command group for pipeline mode. +func (p *redisPipeliner) PipelineGroupString() gredis.IPipelineGroupString { + return pipelineGroupString{pipeliner: p} +} + +// PipelineGroupList returns the list command group for pipeline mode. +func (p *redisPipeliner) PipelineGroupList() gredis.IPipelineGroupList { + return pipelineGroupList{pipeliner: p} +} + +// PipelineGroupSet returns the set command group for pipeline mode. +func (p *redisPipeliner) PipelineGroupSet() gredis.IPipelineGroupSet { + return pipelineGroupSet{pipeliner: p} +} + +// PipelineGroupSortedSet returns the sorted set command group for pipeline mode. +func (p *redisPipeliner) PipelineGroupSortedSet() gredis.IPipelineGroupSortedSet { + return pipelineGroupSortedSet{pipeliner: p} +} + +// Pipeline returns a Pipeliner for batching multiple commands into a single +// network round-trip. Commands are buffered locally and sent when Exec is called. +func (r *Redis) Pipeline(_ context.Context) gredis.Pipeliner { + return newRedisPipeliner(r.client.Pipeline()) +} + +// TxPipeline returns a Pipeliner that wraps commands in a MULTI/EXEC transaction. +// All queued commands are executed atomically by the Redis server. +func (r *Redis) TxPipeline(_ context.Context) gredis.Pipeliner { + return newRedisPipeliner(r.client.TxPipeline()) +} + +// Watch watches the given keys for modifications and executes fn in a transaction. +// If any watched key is modified by another client before the transaction executes, +// the transaction is aborted and Watch returns a transaction-abort error. +func (r *Redis) Watch(ctx context.Context, fn func(gredis.Tx) error, keys ...string) error { + return r.client.Watch(ctx, func(tx *redis.Tx) error { + pipeliner := newRedisPipeliner(tx.TxPipeline()) + return fn(pipeliner) + }, keys...) +} + +// pipelineResultToVar converts a go-redis command result to a *gvar.Var, +// handling redis.Nil and common type conversions. This mirrors the logic +// in Conn.resultToVar. +func pipelineResultToVar(result any, err error) (*gvar.Var, error) { + if err == redis.Nil { + err = nil + } + if err == nil { + switch v := result.(type) { + case []byte: + return gvar.New(string(v)), err + + case []any: + return gvar.New(gconv.Strings(v)), err + + case *redis.Message: + result = &gredis.Message{ + Channel: v.Channel, + Pattern: v.Pattern, + Payload: v.Payload, + PayloadSlice: v.PayloadSlice, + } + + case *redis.Subscription: + result = &gredis.Subscription{ + Kind: v.Kind, + Channel: v.Channel, + Count: v.Count, + } + } + } + return gvar.New(result), err +} + +// pipelineGroupGeneric implements gredis.IPipelineGroupGeneric. +type pipelineGroupGeneric struct { + pipeliner *redisPipeliner +} + +// Copy queues a Copy command and returns its future Cmd. +// https://redis.io/commands/copy/ +func (g pipelineGroupGeneric) Copy(ctx context.Context, source, destination string, option ...gredis.CopyOption) *gredis.Cmd { + var usedOption any + if len(option) > 0 { + usedOption = option[0] + } + return g.pipeliner.Do(ctx, "Copy", mustMergeOptionToArgs( + []any{source, destination}, usedOption, + )...) +} + +// Exists queues an Exists command and returns its future Cmd. +// https://redis.io/commands/exists/ +func (g pipelineGroupGeneric) Exists(ctx context.Context, keys ...string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "Exists", gconv.Interfaces(keys)...) +} + +// Type queues a Type command and returns its future Cmd. +// https://redis.io/commands/type/ +func (g pipelineGroupGeneric) Type(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "Type", key) +} + +// Unlink queues an Unlink command and returns its future Cmd. +// https://redis.io/commands/unlink/ +func (g pipelineGroupGeneric) Unlink(ctx context.Context, keys ...string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "Unlink", gconv.Interfaces(keys)...) +} + +// Rename queues a Rename command and returns its future Cmd. +// https://redis.io/commands/rename/ +func (g pipelineGroupGeneric) Rename(ctx context.Context, key, newKey string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "Rename", key, newKey) +} + +// RenameNX queues a RenameNX command and returns its future Cmd. +// https://redis.io/commands/renamenx/ +func (g pipelineGroupGeneric) RenameNX(ctx context.Context, key, newKey string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "RenameNX", key, newKey) +} + +// Move queues a Move command and returns its future Cmd. +// https://redis.io/commands/move/ +func (g pipelineGroupGeneric) Move(ctx context.Context, key string, db int) *gredis.Cmd { + return g.pipeliner.Do(ctx, "Move", key, db) +} + +// Del queues a Del command and returns its future Cmd. +// https://redis.io/commands/del/ +func (g pipelineGroupGeneric) Del(ctx context.Context, keys ...string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "Del", gconv.Interfaces(keys)...) +} + +// RandomKey queues a RandomKey command and returns its future Cmd. +// https://redis.io/commands/randomkey/ +func (g pipelineGroupGeneric) RandomKey(ctx context.Context) *gredis.Cmd { + return g.pipeliner.Do(ctx, "RandomKey") +} + +// DBSize queues a DBSize command and returns its future Cmd. +// https://redis.io/commands/dbsize/ +func (g pipelineGroupGeneric) DBSize(ctx context.Context) *gredis.Cmd { + return g.pipeliner.Do(ctx, "DBSize") +} + +// Keys queues a Keys command and returns its future Cmd. +// https://redis.io/commands/keys/ +func (g pipelineGroupGeneric) Keys(ctx context.Context, pattern string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "Keys", pattern) +} + +// Scan queues a Scan command and returns its future Cmd. +// https://redis.io/commands/scan/ +func (g pipelineGroupGeneric) Scan(ctx context.Context, cursor uint64, option ...gredis.ScanOption) *gredis.Cmd { + var usedOption any + if len(option) > 0 { + usedOption = option[0].ToUsedOption() + } + return g.pipeliner.Do(ctx, "Scan", mustMergeOptionToArgs( + []any{cursor}, usedOption, + )...) +} + +// FlushDB queues a FlushDB command and returns its future Cmd. +// https://redis.io/commands/flushdb/ +func (g pipelineGroupGeneric) FlushDB(ctx context.Context, option ...gredis.FlushOp) *gredis.Cmd { + return g.pipeliner.Do(ctx, "FlushDB", gconv.Interfaces(option)...) +} + +// FlushAll queues a FlushAll command and returns its future Cmd. +// https://redis.io/commands/flushall/ +func (g pipelineGroupGeneric) FlushAll(ctx context.Context, option ...gredis.FlushOp) *gredis.Cmd { + return g.pipeliner.Do(ctx, "FlushAll", gconv.Interfaces(option)...) +} + +// Expire queues an Expire command and returns its future Cmd. +// https://redis.io/commands/expire/ +func (g pipelineGroupGeneric) Expire(ctx context.Context, key string, seconds int64, option ...gredis.ExpireOption) *gredis.Cmd { + var usedOption any + if len(option) > 0 { + usedOption = option[0] + } + return g.pipeliner.Do(ctx, "Expire", mustMergeOptionToArgs( + []any{key, seconds}, usedOption, + )...) +} + +// ExpireAt queues an ExpireAt command and returns its future Cmd. +// https://redis.io/commands/expireat/ +func (g pipelineGroupGeneric) ExpireAt(ctx context.Context, key string, time time.Time, option ...gredis.ExpireOption) *gredis.Cmd { + var usedOption any + if len(option) > 0 { + usedOption = option[0] + } + return g.pipeliner.Do(ctx, "ExpireAt", mustMergeOptionToArgs( + []any{key, gtime.New(time).Timestamp()}, usedOption, + )...) +} + +// ExpireTime queues an ExpireTime command and returns its future Cmd. +// https://redis.io/commands/expiretime/ +func (g pipelineGroupGeneric) ExpireTime(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "ExpireTime", key) +} + +// TTL queues a TTL command and returns its future Cmd. +// https://redis.io/commands/ttl/ +func (g pipelineGroupGeneric) TTL(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "TTL", key) +} + +// Persist queues a Persist command and returns its future Cmd. +// https://redis.io/commands/persist/ +func (g pipelineGroupGeneric) Persist(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "Persist", key) +} + +// PExpire queues a PExpire command and returns its future Cmd. +// https://redis.io/commands/pexpire/ +func (g pipelineGroupGeneric) PExpire(ctx context.Context, key string, milliseconds int64, option ...gredis.ExpireOption) *gredis.Cmd { + var usedOption any + if len(option) > 0 { + usedOption = option[0] + } + return g.pipeliner.Do(ctx, "PExpire", mustMergeOptionToArgs( + []any{key, milliseconds}, usedOption, + )...) +} + +// PExpireAt queues a PExpireAt command and returns its future Cmd. +// https://redis.io/commands/pexpireat/ +func (g pipelineGroupGeneric) PExpireAt(ctx context.Context, key string, time time.Time, option ...gredis.ExpireOption) *gredis.Cmd { + var usedOption any + if len(option) > 0 { + usedOption = option[0] + } + return g.pipeliner.Do(ctx, "PExpireAt", mustMergeOptionToArgs( + []any{key, gtime.New(time).TimestampMilli()}, usedOption, + )...) +} + +// PExpireTime queues a PExpireTime command and returns its future Cmd. +// https://redis.io/commands/pexpiretime/ +func (g pipelineGroupGeneric) PExpireTime(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "PExpireTime", key) +} + +// PTTL queues a PTTL command and returns its future Cmd. +// https://redis.io/commands/pttl/ +func (g pipelineGroupGeneric) PTTL(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "PTTL", key) +} + +// pipelineGroupHash implements gredis.IPipelineGroupHash. +type pipelineGroupHash struct { + pipeliner *redisPipeliner +} + +// HSet queues an HSet command and returns its future Cmd. +// https://redis.io/commands/hset/ +func (g pipelineGroupHash) HSet(ctx context.Context, key string, fields map[string]any) *gredis.Cmd { + s := []any{key} + for k, v := range fields { + s = append(s, k, v) + } + return g.pipeliner.Do(ctx, "HSet", s...) +} + +// HSetNX queues an HSetNX command and returns its future Cmd. +// https://redis.io/commands/hsetnx/ +func (g pipelineGroupHash) HSetNX(ctx context.Context, key, field string, value any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "HSetNX", key, field, value) +} + +// HGet queues an HGet command and returns its future Cmd. +// https://redis.io/commands/hget/ +func (g pipelineGroupHash) HGet(ctx context.Context, key, field string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "HGet", key, field) +} + +// HStrLen queues an HStrLen command and returns its future Cmd. +// https://redis.io/commands/hstrlen/ +func (g pipelineGroupHash) HStrLen(ctx context.Context, key, field string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "HSTRLEN", key, field) +} + +// HExists queues an HExists command and returns its future Cmd. +// https://redis.io/commands/hexists/ +func (g pipelineGroupHash) HExists(ctx context.Context, key, field string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "HExists", key, field) +} + +// HDel queues an HDel command and returns its future Cmd. +// https://redis.io/commands/hdel/ +func (g pipelineGroupHash) HDel(ctx context.Context, key string, fields ...string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "HDel", append([]any{key}, gconv.Interfaces(fields)...)...) +} + +// HLen queues an HLen command and returns its future Cmd. +// https://redis.io/commands/hlen/ +func (g pipelineGroupHash) HLen(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "HLen", key) +} + +// HIncrBy queues an HIncrBy command and returns its future Cmd. +// https://redis.io/commands/hincrby/ +func (g pipelineGroupHash) HIncrBy(ctx context.Context, key, field string, increment int64) *gredis.Cmd { + return g.pipeliner.Do(ctx, "HIncrBy", key, field, increment) +} + +// HIncrByFloat queues an HIncrByFloat command and returns its future Cmd. +// https://redis.io/commands/hincrbyfloat/ +func (g pipelineGroupHash) HIncrByFloat(ctx context.Context, key, field string, increment float64) *gredis.Cmd { + return g.pipeliner.Do(ctx, "HIncrByFloat", key, field, increment) +} + +// HMSet queues an HMSet command and returns its future Cmd. +// https://redis.io/commands/hmset/ +func (g pipelineGroupHash) HMSet(ctx context.Context, key string, fields map[string]any) *gredis.Cmd { + s := []any{key} + for k, v := range fields { + s = append(s, k, v) + } + return g.pipeliner.Do(ctx, "HMSet", s...) +} + +// HMGet queues an HMGet command and returns its future Cmd. +// https://redis.io/commands/hmget/ +func (g pipelineGroupHash) HMGet(ctx context.Context, key string, fields ...string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "HMGet", append([]any{key}, gconv.Interfaces(fields)...)...) +} + +// HKeys queues an HKeys command and returns its future Cmd. +// https://redis.io/commands/hkeys/ +func (g pipelineGroupHash) HKeys(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "HKeys", key) +} + +// HVals queues an HVals command and returns its future Cmd. +// https://redis.io/commands/hvals/ +func (g pipelineGroupHash) HVals(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "HVals", key) +} + +// HGetAll queues an HGetAll command and returns its future Cmd. +// https://redis.io/commands/hgetall/ +func (g pipelineGroupHash) HGetAll(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "HGetAll", key) +} + +// pipelineGroupString implements gredis.IPipelineGroupString. +type pipelineGroupString struct { + pipeliner *redisPipeliner +} + +// Set queues a Set command and returns its future Cmd. +// https://redis.io/commands/set/ +func (g pipelineGroupString) Set(ctx context.Context, key string, value any, option ...gredis.SetOption) *gredis.Cmd { + var usedOption any + if len(option) > 0 { + usedOption = option[0] + } + return g.pipeliner.Do(ctx, "Set", mustMergeOptionToArgs( + []any{key, value}, usedOption, + )...) +} + +// SetNX queues a SetNX command and returns its future Cmd. +// https://redis.io/commands/setnx/ +func (g pipelineGroupString) SetNX(ctx context.Context, key string, value any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "SetNX", key, value) +} + +// SetEX queues a SetEX command and returns its future Cmd. +// https://redis.io/commands/setex/ +func (g pipelineGroupString) SetEX(ctx context.Context, key string, value any, ttlInSeconds int64) *gredis.Cmd { + return g.pipeliner.Do(ctx, "SetEX", key, ttlInSeconds, value) +} + +// Get queues a Get command and returns its future Cmd. +// https://redis.io/commands/get/ +func (g pipelineGroupString) Get(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "Get", key) +} + +// GetDel queues a GetDel command and returns its future Cmd. +// https://redis.io/commands/getdel/ +func (g pipelineGroupString) GetDel(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "GetDel", key) +} + +// GetEX queues a GetEX command and returns its future Cmd. +// https://redis.io/commands/getex/ +func (g pipelineGroupString) GetEX(ctx context.Context, key string, option ...gredis.GetEXOption) *gredis.Cmd { + var usedOption any + if len(option) > 0 { + usedOption = option[0] + } + return g.pipeliner.Do(ctx, "GetEX", mustMergeOptionToArgs( + []any{key}, usedOption, + )...) +} + +// GetSet queues a GetSet command and returns its future Cmd. +// https://redis.io/commands/getset/ +func (g pipelineGroupString) GetSet(ctx context.Context, key string, value any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "GetSet", key, value) +} + +// StrLen queues a StrLen command and returns its future Cmd. +// https://redis.io/commands/strlen/ +func (g pipelineGroupString) StrLen(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "StrLen", key) +} + +// Append queues an Append command and returns its future Cmd. +// https://redis.io/commands/append/ +func (g pipelineGroupString) Append(ctx context.Context, key string, value string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "Append", key, value) +} + +// SetRange queues a SetRange command and returns its future Cmd. +// https://redis.io/commands/setrange/ +func (g pipelineGroupString) SetRange(ctx context.Context, key string, offset int64, value string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "SetRange", key, offset, value) +} + +// GetRange queues a GetRange command and returns its future Cmd. +// https://redis.io/commands/getrange/ +func (g pipelineGroupString) GetRange(ctx context.Context, key string, start, end int64) *gredis.Cmd { + return g.pipeliner.Do(ctx, "GetRange", key, start, end) +} + +// Incr queues an Incr command and returns its future Cmd. +// https://redis.io/commands/incr/ +func (g pipelineGroupString) Incr(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "Incr", key) +} + +// IncrBy queues an IncrBy command and returns its future Cmd. +// https://redis.io/commands/incrby/ +func (g pipelineGroupString) IncrBy(ctx context.Context, key string, increment int64) *gredis.Cmd { + return g.pipeliner.Do(ctx, "IncrBy", key, increment) +} + +// IncrByFloat queues an IncrByFloat command and returns its future Cmd. +// https://redis.io/commands/incrbyfloat/ +func (g pipelineGroupString) IncrByFloat(ctx context.Context, key string, increment float64) *gredis.Cmd { + return g.pipeliner.Do(ctx, "IncrByFloat", key, increment) +} + +// Decr queues a Decr command and returns its future Cmd. +// https://redis.io/commands/decr/ +func (g pipelineGroupString) Decr(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "Decr", key) +} + +// DecrBy queues a DecrBy command and returns its future Cmd. +// https://redis.io/commands/decrby/ +func (g pipelineGroupString) DecrBy(ctx context.Context, key string, decrement int64) *gredis.Cmd { + return g.pipeliner.Do(ctx, "DecrBy", key, decrement) +} + +// MSet queues an MSet command and returns its future Cmd. +// https://redis.io/commands/mset/ +func (g pipelineGroupString) MSet(ctx context.Context, keyValueMap map[string]any) *gredis.Cmd { + var args []any + for k, v := range keyValueMap { + args = append(args, k, v) + } + return g.pipeliner.Do(ctx, "MSet", args...) +} + +// MSetNX queues an MSetNX command and returns its future Cmd. +// https://redis.io/commands/msetnx/ +func (g pipelineGroupString) MSetNX(ctx context.Context, keyValueMap map[string]any) *gredis.Cmd { + var args []any + for k, v := range keyValueMap { + args = append(args, k, v) + } + return g.pipeliner.Do(ctx, "MSetNX", args...) +} + +// MGet queues an MGet command and returns its future Cmd. +// https://redis.io/commands/mget/ +func (g pipelineGroupString) MGet(ctx context.Context, keys ...string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "MGet", gconv.Interfaces(keys)...) +} + +// pipelineGroupList implements gredis.IPipelineGroupList. +type pipelineGroupList struct { + pipeliner *redisPipeliner +} + +// LPush queues an LPush command and returns its future Cmd. +// https://redis.io/commands/lpush/ +func (g pipelineGroupList) LPush(ctx context.Context, key string, values ...any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "LPush", append([]any{key}, values...)...) +} + +// LPushX queues an LPushX command and returns its future Cmd. +// https://redis.io/commands/lpushx/ +func (g pipelineGroupList) LPushX(ctx context.Context, key string, element any, elements ...any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "LPushX", append([]any{key, element}, elements...)...) +} + +// RPush queues an RPush command and returns its future Cmd. +// https://redis.io/commands/rpush/ +func (g pipelineGroupList) RPush(ctx context.Context, key string, values ...any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "RPush", append([]any{key}, values...)...) +} + +// RPushX queues an RPushX command and returns its future Cmd. +// https://redis.io/commands/rpushx/ +func (g pipelineGroupList) RPushX(ctx context.Context, key string, value any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "RPushX", key, value) +} + +// LPop queues an LPop command and returns its future Cmd. +// https://redis.io/commands/lpop/ +func (g pipelineGroupList) LPop(ctx context.Context, key string, count ...int) *gredis.Cmd { + if len(count) > 0 { + return g.pipeliner.Do(ctx, "LPop", key, count[0]) + } + return g.pipeliner.Do(ctx, "LPop", key) +} + +// RPop queues an RPop command and returns its future Cmd. +// https://redis.io/commands/rpop/ +func (g pipelineGroupList) RPop(ctx context.Context, key string, count ...int) *gredis.Cmd { + if len(count) > 0 { + return g.pipeliner.Do(ctx, "RPop", key, count[0]) + } + return g.pipeliner.Do(ctx, "RPop", key) +} + +// LRem queues an LRem command and returns its future Cmd. +// https://redis.io/commands/lrem/ +func (g pipelineGroupList) LRem(ctx context.Context, key string, count int64, value any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "LRem", key, count, value) +} + +// LLen queues an LLen command and returns its future Cmd. +// https://redis.io/commands/llen/ +func (g pipelineGroupList) LLen(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "LLen", key) +} + +// LIndex queues an LIndex command and returns its future Cmd. +// https://redis.io/commands/lindex/ +func (g pipelineGroupList) LIndex(ctx context.Context, key string, index int64) *gredis.Cmd { + return g.pipeliner.Do(ctx, "LIndex", key, index) +} + +// LInsert queues an LInsert command and returns its future Cmd. +// https://redis.io/commands/linsert/ +func (g pipelineGroupList) LInsert(ctx context.Context, key string, op gredis.LInsertOp, pivot, value any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "LInsert", key, string(op), pivot, value) +} + +// LSet queues an LSet command and returns its future Cmd. +// https://redis.io/commands/lset/ +func (g pipelineGroupList) LSet(ctx context.Context, key string, index int64, value any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "LSet", key, index, value) +} + +// LRange queues an LRange command and returns its future Cmd. +// https://redis.io/commands/lrange/ +func (g pipelineGroupList) LRange(ctx context.Context, key string, start, stop int64) *gredis.Cmd { + return g.pipeliner.Do(ctx, "LRange", key, start, stop) +} + +// LTrim queues an LTrim command and returns its future Cmd. +// https://redis.io/commands/ltrim/ +func (g pipelineGroupList) LTrim(ctx context.Context, key string, start, stop int64) *gredis.Cmd { + return g.pipeliner.Do(ctx, "LTrim", key, start, stop) +} + +// RPopLPush queues an RPopLPush command and returns its future Cmd. +// https://redis.io/commands/rpoplpush/ +func (g pipelineGroupList) RPopLPush(ctx context.Context, source, destination string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "RPopLPush", source, destination) +} + +// pipelineGroupSet implements gredis.IPipelineGroupSet. +type pipelineGroupSet struct { + pipeliner *redisPipeliner +} + +// SAdd queues an SAdd command and returns its future Cmd. +// https://redis.io/commands/sadd/ +func (g pipelineGroupSet) SAdd(ctx context.Context, key string, member any, members ...any) *gredis.Cmd { + s := []any{key} + s = append(s, member) + s = append(s, members...) + return g.pipeliner.Do(ctx, "SAdd", s...) +} + +// SIsMember queues an SIsMember command and returns its future Cmd. +// https://redis.io/commands/sismember/ +func (g pipelineGroupSet) SIsMember(ctx context.Context, key string, member any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "SIsMember", key, member) +} + +// SPop queues an SPop command and returns its future Cmd. +// https://redis.io/commands/spop/ +func (g pipelineGroupSet) SPop(ctx context.Context, key string, count ...int) *gredis.Cmd { + s := []any{key} + s = append(s, gconv.Interfaces(count)...) + return g.pipeliner.Do(ctx, "SPop", s...) +} + +// SRandMember queues an SRandMember command and returns its future Cmd. +// https://redis.io/commands/srandmember/ +func (g pipelineGroupSet) SRandMember(ctx context.Context, key string, count ...int) *gredis.Cmd { + s := []any{key} + s = append(s, gconv.Interfaces(count)...) + return g.pipeliner.Do(ctx, "SRandMember", s...) +} + +// SRem queues an SRem command and returns its future Cmd. +// https://redis.io/commands/srem/ +func (g pipelineGroupSet) SRem(ctx context.Context, key string, member any, members ...any) *gredis.Cmd { + s := []any{key} + s = append(s, member) + s = append(s, members...) + return g.pipeliner.Do(ctx, "SRem", s...) +} + +// SMove queues an SMove command and returns its future Cmd. +// https://redis.io/commands/smove/ +func (g pipelineGroupSet) SMove(ctx context.Context, source, destination string, member any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "SMove", source, destination, member) +} + +// SCard queues an SCard command and returns its future Cmd. +// https://redis.io/commands/scard/ +func (g pipelineGroupSet) SCard(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "SCard", key) +} + +// SMembers queues an SMembers command and returns its future Cmd. +// https://redis.io/commands/smembers/ +func (g pipelineGroupSet) SMembers(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "SMembers", key) +} + +// SMIsMember queues an SMIsMember command and returns its future Cmd. +// https://redis.io/commands/smismember/ +func (g pipelineGroupSet) SMIsMember(ctx context.Context, key, member any, members ...any) *gredis.Cmd { + s := []any{key, member} + s = append(s, members...) + return g.pipeliner.Do(ctx, "SMIsMember", s...) +} + +// SInter queues an SInter command and returns its future Cmd. +// https://redis.io/commands/sinter/ +func (g pipelineGroupSet) SInter(ctx context.Context, key string, keys ...string) *gredis.Cmd { + s := []any{key} + s = append(s, gconv.Interfaces(keys)...) + return g.pipeliner.Do(ctx, "SInter", s...) +} + +// SInterStore queues an SInterStore command and returns its future Cmd. +// https://redis.io/commands/sinterstore/ +func (g pipelineGroupSet) SInterStore(ctx context.Context, destination string, key string, keys ...string) *gredis.Cmd { + s := []any{destination, key} + s = append(s, gconv.Interfaces(keys)...) + return g.pipeliner.Do(ctx, "SInterStore", s...) +} + +// SUnion queues an SUnion command and returns its future Cmd. +// https://redis.io/commands/sunion/ +func (g pipelineGroupSet) SUnion(ctx context.Context, key string, keys ...string) *gredis.Cmd { + s := []any{key} + s = append(s, gconv.Interfaces(keys)...) + return g.pipeliner.Do(ctx, "SUnion", s...) +} + +// SUnionStore queues an SUnionStore command and returns its future Cmd. +// https://redis.io/commands/sunionstore/ +func (g pipelineGroupSet) SUnionStore(ctx context.Context, destination, key string, keys ...string) *gredis.Cmd { + s := []any{destination, key} + s = append(s, gconv.Interfaces(keys)...) + return g.pipeliner.Do(ctx, "SUnionStore", s...) +} + +// SDiff queues an SDiff command and returns its future Cmd. +// https://redis.io/commands/sdiff/ +func (g pipelineGroupSet) SDiff(ctx context.Context, key string, keys ...string) *gredis.Cmd { + s := []any{key} + s = append(s, gconv.Interfaces(keys)...) + return g.pipeliner.Do(ctx, "SDiff", s...) +} + +// SDiffStore queues an SDiffStore command and returns its future Cmd. +// https://redis.io/commands/sdiffstore/ +func (g pipelineGroupSet) SDiffStore(ctx context.Context, destination string, key string, keys ...string) *gredis.Cmd { + s := []any{destination, key} + s = append(s, gconv.Interfaces(keys)...) + return g.pipeliner.Do(ctx, "SDiffStore", s...) +} + +// pipelineGroupSortedSet implements gredis.IPipelineGroupSortedSet. +type pipelineGroupSortedSet struct { + pipeliner *redisPipeliner +} + +// ZAdd queues a ZAdd command and returns its future Cmd. +// https://redis.io/commands/zadd/ +func (g pipelineGroupSortedSet) ZAdd( + ctx context.Context, key string, option *gredis.ZAddOption, member gredis.ZAddMember, members ...gredis.ZAddMember, +) *gredis.Cmd { + s := mustMergeOptionToArgs( + []any{key}, option, + ) + s = append(s, member.Score, member.Member) + for _, item := range members { + s = append(s, item.Score, item.Member) + } + return g.pipeliner.Do(ctx, "ZAdd", s...) +} + +// ZScore queues a ZScore command and returns its future Cmd. +// https://redis.io/commands/zscore/ +func (g pipelineGroupSortedSet) ZScore(ctx context.Context, key string, member any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "ZScore", key, member) +} + +// ZIncrBy queues a ZIncrBy command and returns its future Cmd. +// https://redis.io/commands/zincrby/ +func (g pipelineGroupSortedSet) ZIncrBy(ctx context.Context, key string, increment float64, member any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "ZIncrBy", key, increment, member) +} + +// ZCard queues a ZCard command and returns its future Cmd. +// https://redis.io/commands/zcard/ +func (g pipelineGroupSortedSet) ZCard(ctx context.Context, key string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "ZCard", key) +} + +// ZCount queues a ZCount command and returns its future Cmd. +// https://redis.io/commands/zcount/ +func (g pipelineGroupSortedSet) ZCount(ctx context.Context, key string, min, max string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "ZCount", key, min, max) +} + +// ZRange queues a ZRange command and returns its future Cmd. +// https://redis.io/commands/zrange/ +func (g pipelineGroupSortedSet) ZRange(ctx context.Context, key string, start, stop int64, option ...gredis.ZRangeOption) *gredis.Cmd { + var usedOption any + if len(option) > 0 { + usedOption = option[0] + } + return g.pipeliner.Do(ctx, "ZRange", mustMergeOptionToArgs( + []any{key, start, stop}, usedOption, + )...) +} + +// ZRevRange queues a ZRevRange command and returns its future Cmd. +// https://redis.io/commands/zrevrange/ +func (g pipelineGroupSortedSet) ZRevRange(ctx context.Context, key string, start, stop int64, option ...gredis.ZRevRangeOption) *gredis.Cmd { + var usedOption any + if len(option) > 0 { + usedOption = option[0] + } + return g.pipeliner.Do(ctx, "ZRevRange", mustMergeOptionToArgs( + []any{key, start, stop}, usedOption, + )...) +} + +// ZRank queues a ZRank command and returns its future Cmd. +// https://redis.io/commands/zrank/ +func (g pipelineGroupSortedSet) ZRank(ctx context.Context, key string, member any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "ZRank", key, member) +} + +// ZRevRank queues a ZRevRank command and returns its future Cmd. +// https://redis.io/commands/zrevrank/ +func (g pipelineGroupSortedSet) ZRevRank(ctx context.Context, key string, member any) *gredis.Cmd { + return g.pipeliner.Do(ctx, "ZRevRank", key, member) +} + +// ZRem queues a ZRem command and returns its future Cmd. +// https://redis.io/commands/zrem/ +func (g pipelineGroupSortedSet) ZRem(ctx context.Context, key string, member any, members ...any) *gredis.Cmd { + s := []any{key} + s = append(s, member) + s = append(s, members...) + return g.pipeliner.Do(ctx, "ZRem", s...) +} + +// ZRemRangeByRank queues a ZRemRangeByRank command and returns its future Cmd. +// https://redis.io/commands/zremrangebyrank/ +func (g pipelineGroupSortedSet) ZRemRangeByRank(ctx context.Context, key string, start, stop int64) *gredis.Cmd { + return g.pipeliner.Do(ctx, "ZRemRangeByRank", key, start, stop) +} + +// ZRemRangeByScore queues a ZRemRangeByScore command and returns its future Cmd. +// https://redis.io/commands/zremrangebyscore/ +func (g pipelineGroupSortedSet) ZRemRangeByScore(ctx context.Context, key string, min, max string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "ZRemRangeByScore", key, min, max) +} + +// ZRemRangeByLex queues a ZRemRangeByLex command and returns its future Cmd. +// https://redis.io/commands/zremrangebylex/ +func (g pipelineGroupSortedSet) ZRemRangeByLex(ctx context.Context, key string, min, max string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "ZRemRangeByLex", key, min, max) +} + +// ZLexCount queues a ZLexCount command and returns its future Cmd. +// https://redis.io/commands/zlexcount/ +func (g pipelineGroupSortedSet) ZLexCount(ctx context.Context, key, min, max string) *gredis.Cmd { + return g.pipeliner.Do(ctx, "ZLexCount", key, min, max) +} diff --git a/contrib/nosql/redis/redis_z_unit_pipeline_test.go b/contrib/nosql/redis/redis_z_unit_pipeline_test.go new file mode 100644 index 00000000000..ced217fc134 --- /dev/null +++ b/contrib/nosql/redis/redis_z_unit_pipeline_test.go @@ -0,0 +1,348 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package redis_test + +import ( + "testing" + + "github.com/gogf/gf/v2/database/gredis" + "github.com/gogf/gf/v2/test/gtest" + "github.com/gogf/gf/v2/util/gconv" +) + +// Test_Pipeline_HSetGet verifies that HSet and HGet commands queued via pipeline +// are executed correctly and results are populated after Exec. +func Test_Pipeline_HSetGet(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + r, err := gredis.New(config) + t.AssertNil(err) + defer r.Close(ctx) + + defer r.Do(ctx, "DEL", "test:pipe:hset") + + pipe := r.Pipeline(ctx) + t.AssertNE(pipe, nil) + + cmd1 := pipe.PipelineGroupHash().HSet(ctx, "test:pipe:hset", map[string]any{"field1": "value1"}) + t.AssertNE(cmd1, nil) + + cmd2 := pipe.PipelineGroupHash().HGet(ctx, "test:pipe:hset", "field1") + t.AssertNE(cmd2, nil) + + results, err := pipe.Exec(ctx) + t.AssertNil(err) + t.Assert(len(results), 2) + + val2, err := cmd2.Result() + t.AssertNil(err) + t.Assert(val2.String(), "value1") + }) +} + +// Test_Pipeline_StringSetGet queues Set, Get and Incr commands via pipeline, +// executes them, and verifies all results are populated correctly. +func Test_Pipeline_StringSetGet(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + r, err := gredis.New(config) + t.AssertNil(err) + defer r.Close(ctx) + + defer r.Do(ctx, "DEL", "test:pipe:str") + + pipe := r.Pipeline(ctx) + t.AssertNE(pipe, nil) + + pipe.PipelineGroupString().Set(ctx, "test:pipe:str", "10") + cmdGet := pipe.PipelineGroupString().Get(ctx, "test:pipe:str") + cmdIncr := pipe.PipelineGroupString().Incr(ctx, "test:pipe:str") + + results, err := pipe.Exec(ctx) + t.AssertNil(err) + t.Assert(len(results), 3) + + valGet, err := cmdGet.Result() + t.AssertNil(err) + t.Assert(valGet.String(), "10") + + valIncr, err := cmdIncr.Result() + t.AssertNil(err) + t.Assert(valIncr.Int64(), int64(11)) + }) +} + +// Test_Pipeline_MultipleGroups queues commands from different command groups +// (Hash.HSet, String.Set, Generic.Expire) in the same pipeline and verifies execution. +func Test_Pipeline_MultipleGroups(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + r, err := gredis.New(config) + t.AssertNil(err) + defer r.Close(ctx) + + defer r.Do(ctx, "DEL", "test:pipe:multi:hash", "test:pipe:multi:str") + + pipe := r.Pipeline(ctx) + t.AssertNE(pipe, nil) + + cmdHSet := pipe.PipelineGroupHash().HSet(ctx, "test:pipe:multi:hash", map[string]any{"k": "v"}) + cmdSet := pipe.PipelineGroupString().Set(ctx, "test:pipe:multi:str", "hello") + cmdExpire := pipe.PipelineGroupGeneric().Expire(ctx, "test:pipe:multi:str", 3600) + + results, err := pipe.Exec(ctx) + t.AssertNil(err) + t.Assert(len(results), 3) + + valHSet, err := cmdHSet.Result() + t.AssertNil(err) + t.Assert(valHSet.Int64(), int64(1)) + + valSet, err := cmdSet.Result() + t.AssertNil(err) + t.Assert(valSet.String(), "OK") + + valExpire, err := cmdExpire.Result() + t.AssertNil(err) + t.Assert(valExpire.Int64(), int64(1)) + + v, err := r.HGet(ctx, "test:pipe:multi:hash", "k") + t.AssertNil(err) + t.Assert(v.String(), "v") + + v2, err := r.Do(ctx, "GET", "test:pipe:multi:str") + t.AssertNil(err) + t.Assert(v2.String(), "hello") + }) +} + +// Test_Pipeline_Discard queues commands, calls Discard to cancel them, +// then verifies the keys were never created on the server. +func Test_Pipeline_Discard(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + r, err := gredis.New(config) + t.AssertNil(err) + defer r.Close(ctx) + + pipe := r.Pipeline(ctx) + t.AssertNE(pipe, nil) + + pipe.PipelineGroupString().Set(ctx, "test:pipe:discard", "should_not_exist") + + pipe.Discard() + + v, err := r.Do(ctx, "GET", "test:pipe:discard") + t.AssertNil(err) + t.Assert(v.IsNil(), true) + + defer r.Do(ctx, "DEL", "test:pipe:discard") + }) +} + +// Test_Pipeline_DoRaw uses the low-level pipe.Do method to queue a SET command, +// then verifies the result after Exec. +func Test_Pipeline_DoRaw(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + r, err := gredis.New(config) + t.AssertNil(err) + defer r.Close(ctx) + + defer r.Do(ctx, "DEL", "test:pipe:doraw") + + pipe := r.Pipeline(ctx) + t.AssertNE(pipe, nil) + + cmd := pipe.Do(ctx, "SET", "test:pipe:doraw", "rawvalue") + t.AssertNE(cmd, nil) + + results, err := pipe.Exec(ctx) + t.AssertNil(err) + t.Assert(len(results), 1) + + val, err := cmd.Result() + t.AssertNil(err) + t.Assert(val.String(), "OK") + + v, err := r.Do(ctx, "GET", "test:pipe:doraw") + t.AssertNil(err) + t.Assert(v.String(), "rawvalue") + }) +} + +// Test_TxPipeline_Basic queues commands via TxPipeline (MULTI/EXEC), +// executes them, and verifies atomic execution. +func Test_TxPipeline_Basic(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + r, err := gredis.New(config) + t.AssertNil(err) + defer r.Close(ctx) + + defer r.Do(ctx, "DEL", "test:tx:basic") + + tx := r.TxPipeline(ctx) + t.AssertNE(tx, nil) + + cmdSet := tx.PipelineGroupString().Set(ctx, "test:tx:basic", "txval") + cmdGet := tx.PipelineGroupString().Get(ctx, "test:tx:basic") + + results, err := tx.Exec(ctx) + t.AssertNil(err) + t.Assert(len(results), 2) + + valSet, err := cmdSet.Result() + t.AssertNil(err) + t.Assert(valSet.String(), "OK") + + valGet, err := cmdGet.Result() + t.AssertNil(err) + t.Assert(valGet.String(), "txval") + }) +} + +// Test_TxPipeline_MultiCommands queues multiple HIncrBy commands in a transaction, +// verifies they are all applied atomically, and checks the final counter value. +func Test_TxPipeline_MultiCommands(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + r, err := gredis.New(config) + t.AssertNil(err) + defer r.Close(ctx) + + defer r.Do(ctx, "DEL", "test:tx:counter") + + tx := r.TxPipeline(ctx) + t.AssertNE(tx, nil) + + cmd1 := tx.PipelineGroupHash().HIncrBy(ctx, "test:tx:counter", "count", 10) + cmd2 := tx.PipelineGroupHash().HIncrBy(ctx, "test:tx:counter", "count", 20) + cmd3 := tx.PipelineGroupHash().HIncrBy(ctx, "test:tx:counter", "count", 30) + + results, err := tx.Exec(ctx) + t.AssertNil(err) + t.Assert(len(results), 3) + + val1, err := cmd1.Result() + t.AssertNil(err) + t.Assert(val1.Int64(), int64(10)) + + val2, err := cmd2.Result() + t.AssertNil(err) + t.Assert(val2.Int64(), int64(30)) + + val3, err := cmd3.Result() + t.AssertNil(err) + t.Assert(val3.Int64(), int64(60)) + + v, err := r.HGet(ctx, "test:tx:counter", "count") + t.AssertNil(err) + t.Assert(v.Int64(), int64(60)) + }) +} + +// Test_ScanAll_Basic sets several keys with a common prefix, calls ScanAll +// with a matching pattern, and verifies all keys are returned. +func Test_ScanAll_Basic(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + r, err := gredis.New(config) + t.AssertNil(err) + defer r.Close(ctx) + + prefix := "test:scanall:" + for i := range 5 { + _, err := r.Do(ctx, "SET", prefix+gconv.String(i), "val") + t.AssertNil(err) + } + defer func() { + for i := range 5 { + r.Do(ctx, "DEL", prefix+gconv.String(i)) + } + }() + + keys, err := r.GroupGeneric().ScanAll(ctx, gredis.ScanOption{ + Match: "test:scanall:*", + Count: 10, + }) + t.AssertNil(err) + t.AssertGE(len(keys), 5) + }) +} + +// Test_ScanAll_EmptyPattern verifies that ScanAll with a pattern matching +// no keys returns an empty slice without error. +func Test_ScanAll_EmptyPattern(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + r, err := gredis.New(config) + t.AssertNil(err) + defer r.Close(ctx) + + keys, err := r.GroupGeneric().ScanAll(ctx, gredis.ScanOption{ + Match: "test:scanall:nonexistent:*", + Count: 10, + }) + t.AssertNil(err) + t.Assert(len(keys), 0) + }) +} + +// Test_Del_MultiKey verifies that Del removes multiple keys in a single call +// and returns the correct count of deleted keys. +func Test_Del_MultiKey(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + r, err := gredis.New(config) + t.AssertNil(err) + defer r.Close(ctx) + + _, err = r.Do(ctx, "SET", "test:del:1", "a") + t.AssertNil(err) + _, err = r.Do(ctx, "SET", "test:del:2", "b") + t.AssertNil(err) + _, err = r.Do(ctx, "SET", "test:del:3", "c") + t.AssertNil(err) + + n, err := r.GroupGeneric().Del(ctx, "test:del:1", "test:del:2", "test:del:3") + t.AssertNil(err) + t.Assert(n, int64(3)) + + v, err := r.Do(ctx, "GET", "test:del:1") + t.AssertNil(err) + t.Assert(v.IsNil(), true) + }) +} + +// Test_Watch_Success sets a key, starts a Watch transaction on it, queues +// a modification within the callback, and verifies the transaction commits +// successfully when no concurrent modification occurs. +func Test_Watch_Success(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + r, err := gredis.New(config) + t.AssertNil(err) + defer r.Close(ctx) + + defer r.Do(ctx, "DEL", "test:watch:key") + + _, err = r.Do(ctx, "SET", "test:watch:key", "initial") + t.AssertNil(err) + + err = r.Watch(ctx, func(tx gredis.Tx) error { + cmdSet := tx.PipelineGroupString().Set(ctx, "test:watch:key", "watched") + cmdGet := tx.PipelineGroupString().Get(ctx, "test:watch:key") + results, execErr := tx.Exec(ctx) + if execErr != nil { + return execErr + } + t.Assert(len(results), 2) + + valSet, _ := cmdSet.Result() + t.Assert(valSet.String(), "OK") + + valGet, _ := cmdGet.Result() + t.Assert(valGet.String(), "watched") + return nil + }, "test:watch:key") + t.AssertNil(err) + + v, err := r.Do(ctx, "GET", "test:watch:key") + t.AssertNil(err) + t.Assert(v.String(), "watched") + }) +} diff --git a/database/gredis/gredis_adapter.go b/database/gredis/gredis_adapter.go index 1d57ab5fd01..2613b3d909f 100644 --- a/database/gredis/gredis_adapter.go +++ b/database/gredis/gredis_adapter.go @@ -52,6 +52,23 @@ type AdapterOperation interface { // This method provides access to the raw redis client for advanced operations // that are not covered by the standard redis adapter interface. Client() RedisRawClient + + // Pipeline returns a Pipeliner for batching multiple commands into a single + // network round-trip. Commands are buffered locally and sent when Exec is called. + // See Pipeliner for usage details. + Pipeline(ctx context.Context) Pipeliner + + // TxPipeline returns a Pipeliner that wraps commands in a MULTI/EXEC transaction. + // All queued commands are executed atomically by the Redis server. + // In Redis Cluster mode, all keys in the transaction MUST be on the same hash slot; + // otherwise the server returns a CROSSSLOT error. + TxPipeline(ctx context.Context) Pipeliner + + // Watch watches the given keys for modifications and executes fn in a transaction. + // If any watched key is modified by another client before the transaction executes, + // the transaction is aborted and Watch returns a transaction-abort error. + // In Redis Cluster mode, all keys MUST be on the same hash slot. + Watch(ctx context.Context, fn func(tx Tx) error, keys ...string) error } // Conn is an interface of a connection from universal redis client. diff --git a/database/gredis/gredis_cmd.go b/database/gredis/gredis_cmd.go new file mode 100644 index 00000000000..b160d8c059c --- /dev/null +++ b/database/gredis/gredis_cmd.go @@ -0,0 +1,55 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT License was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// This file defines the Cmd type used as a future-result container for +// commands queued in a Pipeline or TxPipeline. + +package gredis + +import ( + "github.com/gogf/gf/v2/container/gvar" +) + +// Cmd holds the future result of a command queued in a Pipeline or TxPipeline. +// The result is populated after Pipeliner.Exec() or Tx.Exec(). +// Before Exec, Cmd.Val() returns nil and Cmd.Result() returns (nil, nil). +type Cmd struct { + // val holds the populated result after Exec. It is nil until Exec populates it. + val *gvar.Var + + // err holds the populated error after Exec. It is nil before Exec or if no error occurred. + err error +} + +// Result returns the populated value and error after Exec. +// Before Exec is called, it returns (nil, nil). +func (c *Cmd) Result() (*gvar.Var, error) { + return c.val, c.err +} + +// Val returns the populated *gvar.Var value. +// It returns nil before Exec has been called or if the command returned no value. +func (c *Cmd) Val() *gvar.Var { + return c.val +} + +// Err returns the populated error after Exec. +// It returns nil before Exec has been called or if no error occurred. +func (c *Cmd) Err() error { + return c.err +} + +// SetVal sets the result value of the Cmd. Used by driver implementations +// to populate the result after Exec. +func (c *Cmd) SetVal(val *gvar.Var) { + c.val = val +} + +// SetErr sets the result error of the Cmd. Used by driver implementations +// to populate the error after Exec. +func (c *Cmd) SetErr(err error) { + c.err = err +} diff --git a/database/gredis/gredis_pipeline.go b/database/gredis/gredis_pipeline.go new file mode 100644 index 00000000000..8e428c5410a --- /dev/null +++ b/database/gredis/gredis_pipeline.go @@ -0,0 +1,196 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT License was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// This file defines the Pipeliner, PipelinerOperation, PipelinerGroup interfaces, +// the six pipeline command group interfaces, and the Tx interface used for +// pipeline and transaction support in the Redis adapter. + +package gredis + +import ( + "context" + "time" +) + +// Pipeliner queues Redis commands for batch execution in a single network round-trip. +// Commands are buffered locally and sent to the server when Exec is called. +// A Pipeliner is obtained via Redis.Pipeline() or Redis.TxPipeline(). +// +// Usage example: +// +// pipe := redis.Pipeline(ctx) +// cmd1 := pipe.PipelineGroupHash().HSet(ctx, "key", map[string]any{"field": "value"}) +// cmd2 := pipe.PipelineGroupString().Get(ctx, "key2") +// pipe.Exec(ctx) +// val1, _ := cmd1.Result() +// val2, _ := cmd2.Result() +type Pipeliner interface { + PipelinerOperation + PipelinerGroup +} + +// PipelinerOperation defines the core pipeline operations. +type PipelinerOperation interface { + // Do queues a raw Redis command and returns its future Cmd. + // The command is not sent to the server until Exec is called. + Do(ctx context.Context, command string, args ...any) *Cmd + + // Exec sends all queued commands to the server in a single batch, + // populates each queued Cmd's result, and returns all Cmds. + // After Exec returns, all Cmd objects returned by queued commands are populated. + Exec(ctx context.Context) ([]*Cmd, error) + + // Discard discards all queued commands without sending them to the server. + Discard() +} + +// PipelinerGroup provides typed command access for the Pipeliner, +// mirroring the existing Redis group interfaces but with *Cmd return types. +type PipelinerGroup interface { + PipelineGroupGeneric() IPipelineGroupGeneric + PipelineGroupHash() IPipelineGroupHash + PipelineGroupString() IPipelineGroupString + PipelineGroupList() IPipelineGroupList + PipelineGroupSet() IPipelineGroupSet + PipelineGroupSortedSet() IPipelineGroupSortedSet +} + +// IPipelineGroupGeneric manages generic Redis operations in pipeline mode. +// Each method queues the command and returns a *Cmd future. +// Results are populated after Pipeliner.Exec() is called. +type IPipelineGroupGeneric interface { + Copy(ctx context.Context, source, destination string, option ...CopyOption) *Cmd + Exists(ctx context.Context, keys ...string) *Cmd + Type(ctx context.Context, key string) *Cmd + Unlink(ctx context.Context, keys ...string) *Cmd + Rename(ctx context.Context, key, newKey string) *Cmd + RenameNX(ctx context.Context, key, newKey string) *Cmd + Move(ctx context.Context, key string, db int) *Cmd + Del(ctx context.Context, keys ...string) *Cmd + RandomKey(ctx context.Context) *Cmd + DBSize(ctx context.Context) *Cmd + Keys(ctx context.Context, pattern string) *Cmd + Scan(ctx context.Context, cursor uint64, option ...ScanOption) *Cmd + FlushDB(ctx context.Context, option ...FlushOp) *Cmd + FlushAll(ctx context.Context, option ...FlushOp) *Cmd + Expire(ctx context.Context, key string, seconds int64, option ...ExpireOption) *Cmd + ExpireAt(ctx context.Context, key string, time time.Time, option ...ExpireOption) *Cmd + ExpireTime(ctx context.Context, key string) *Cmd + TTL(ctx context.Context, key string) *Cmd + Persist(ctx context.Context, key string) *Cmd + PExpire(ctx context.Context, key string, milliseconds int64, option ...ExpireOption) *Cmd + PExpireAt(ctx context.Context, key string, time time.Time, option ...ExpireOption) *Cmd + PExpireTime(ctx context.Context, key string) *Cmd + PTTL(ctx context.Context, key string) *Cmd +} + +// IPipelineGroupHash manages Redis hash operations in pipeline mode. +// Each method queues the command and returns a *Cmd future. +type IPipelineGroupHash interface { + HSet(ctx context.Context, key string, fields map[string]any) *Cmd + HSetNX(ctx context.Context, key, field string, value any) *Cmd + HGet(ctx context.Context, key, field string) *Cmd + HStrLen(ctx context.Context, key, field string) *Cmd + HExists(ctx context.Context, key, field string) *Cmd + HDel(ctx context.Context, key string, fields ...string) *Cmd + HLen(ctx context.Context, key string) *Cmd + HIncrBy(ctx context.Context, key, field string, increment int64) *Cmd + HIncrByFloat(ctx context.Context, key, field string, increment float64) *Cmd + HMSet(ctx context.Context, key string, fields map[string]any) *Cmd + HMGet(ctx context.Context, key string, fields ...string) *Cmd + HKeys(ctx context.Context, key string) *Cmd + HVals(ctx context.Context, key string) *Cmd + HGetAll(ctx context.Context, key string) *Cmd +} + +// IPipelineGroupString manages Redis string operations in pipeline mode. +// Each method queues the command and returns a *Cmd future. +type IPipelineGroupString interface { + Set(ctx context.Context, key string, value any, option ...SetOption) *Cmd + SetNX(ctx context.Context, key string, value any) *Cmd + SetEX(ctx context.Context, key string, value any, ttlInSeconds int64) *Cmd + Get(ctx context.Context, key string) *Cmd + GetDel(ctx context.Context, key string) *Cmd + GetEX(ctx context.Context, key string, option ...GetEXOption) *Cmd + GetSet(ctx context.Context, key string, value any) *Cmd + StrLen(ctx context.Context, key string) *Cmd + Append(ctx context.Context, key string, value string) *Cmd + SetRange(ctx context.Context, key string, offset int64, value string) *Cmd + GetRange(ctx context.Context, key string, start, end int64) *Cmd + Incr(ctx context.Context, key string) *Cmd + IncrBy(ctx context.Context, key string, increment int64) *Cmd + IncrByFloat(ctx context.Context, key string, increment float64) *Cmd + Decr(ctx context.Context, key string) *Cmd + DecrBy(ctx context.Context, key string, decrement int64) *Cmd + MSet(ctx context.Context, keyValueMap map[string]any) *Cmd + MSetNX(ctx context.Context, keyValueMap map[string]any) *Cmd + MGet(ctx context.Context, keys ...string) *Cmd +} + +// IPipelineGroupList manages Redis list operations in pipeline mode. +// Each method queues the command and returns a *Cmd future. +type IPipelineGroupList interface { + LPush(ctx context.Context, key string, values ...any) *Cmd + LPushX(ctx context.Context, key string, element any, elements ...any) *Cmd + RPush(ctx context.Context, key string, values ...any) *Cmd + RPushX(ctx context.Context, key string, value any) *Cmd + LPop(ctx context.Context, key string, count ...int) *Cmd + RPop(ctx context.Context, key string, count ...int) *Cmd + LRem(ctx context.Context, key string, count int64, value any) *Cmd + LLen(ctx context.Context, key string) *Cmd + LIndex(ctx context.Context, key string, index int64) *Cmd + LInsert(ctx context.Context, key string, op LInsertOp, pivot, value any) *Cmd + LSet(ctx context.Context, key string, index int64, value any) *Cmd + LRange(ctx context.Context, key string, start, stop int64) *Cmd + LTrim(ctx context.Context, key string, start, stop int64) *Cmd + RPopLPush(ctx context.Context, source, destination string) *Cmd +} + +// IPipelineGroupSet manages Redis set operations in pipeline mode. +// Each method queues the command and returns a *Cmd future. +type IPipelineGroupSet interface { + SAdd(ctx context.Context, key string, member any, members ...any) *Cmd + SIsMember(ctx context.Context, key string, member any) *Cmd + SPop(ctx context.Context, key string, count ...int) *Cmd + SRandMember(ctx context.Context, key string, count ...int) *Cmd + SRem(ctx context.Context, key string, member any, members ...any) *Cmd + SMove(ctx context.Context, source, destination string, member any) *Cmd + SCard(ctx context.Context, key string) *Cmd + SMembers(ctx context.Context, key string) *Cmd + SMIsMember(ctx context.Context, key, member any, members ...any) *Cmd + SInter(ctx context.Context, key string, keys ...string) *Cmd + SInterStore(ctx context.Context, destination string, key string, keys ...string) *Cmd + SUnion(ctx context.Context, key string, keys ...string) *Cmd + SUnionStore(ctx context.Context, destination, key string, keys ...string) *Cmd + SDiff(ctx context.Context, key string, keys ...string) *Cmd + SDiffStore(ctx context.Context, destination string, key string, keys ...string) *Cmd +} + +// IPipelineGroupSortedSet manages Redis sorted set operations in pipeline mode. +// Each method queues the command and returns a *Cmd future. +type IPipelineGroupSortedSet interface { + ZAdd(ctx context.Context, key string, option *ZAddOption, member ZAddMember, members ...ZAddMember) *Cmd + ZScore(ctx context.Context, key string, member any) *Cmd + ZIncrBy(ctx context.Context, key string, increment float64, member any) *Cmd + ZCard(ctx context.Context, key string) *Cmd + ZCount(ctx context.Context, key string, min, max string) *Cmd + ZRange(ctx context.Context, key string, start, stop int64, option ...ZRangeOption) *Cmd + ZRevRange(ctx context.Context, key string, start, stop int64, option ...ZRevRangeOption) *Cmd + ZRank(ctx context.Context, key string, member any) *Cmd + ZRevRank(ctx context.Context, key string, member any) *Cmd + ZRem(ctx context.Context, key string, member any, members ...any) *Cmd + ZRemRangeByRank(ctx context.Context, key string, start, stop int64) *Cmd + ZRemRangeByScore(ctx context.Context, key string, min, max string) *Cmd + ZRemRangeByLex(ctx context.Context, key string, min, max string) *Cmd + ZLexCount(ctx context.Context, key, min, max string) *Cmd +} + +// Tx represents a Redis transaction context used with Watch. +// It embeds Pipeliner so commands can be queued within the transaction callback. +// Commands queued on Tx are executed atomically via MULTI/EXEC. +type Tx interface { + Pipeliner +} diff --git a/database/gredis/gredis_redis.go b/database/gredis/gredis_redis.go index f7f1a0dd57d..994afdbf928 100644 --- a/database/gredis/gredis_redis.go +++ b/database/gredis/gredis_redis.go @@ -129,3 +129,49 @@ func (r *Redis) Close(ctx context.Context) error { } return r.localAdapter.Close(ctx) } + +// Pipeline returns a Pipeliner for batching multiple commands into a single network round-trip. +// Commands are buffered locally and sent to the server when Pipeliner.Exec is called. +func (r *Redis) Pipeline(ctx context.Context) Pipeliner { + if r == nil || r.localAdapter == nil { + return nil + } + return r.localAdapter.Pipeline(ctx) +} + +// TxPipeline returns a Pipeliner that wraps commands in a MULTI/EXEC transaction. +// All queued commands are executed atomically by the Redis server. +func (r *Redis) TxPipeline(ctx context.Context) Pipeliner { + if r == nil || r.localAdapter == nil { + return nil + } + return r.localAdapter.TxPipeline(ctx) +} + +// Watch watches the given keys for modifications and executes fn in a transaction. +// If any watched key is modified by another client before the transaction executes, +// the transaction is aborted and Watch returns a transaction-abort error. +func (r *Redis) Watch(ctx context.Context, fn func(tx Tx) error, keys ...string) error { + if r == nil || r.localAdapter == nil { + return gerror.NewCode(gcode.CodeInvalidParameter, errorNilRedis) + } + return r.localAdapter.Watch(ctx, fn, keys...) +} + +// MustPipeline performs as function Pipeline, but it panics if any error occurs internally. +func (r *Redis) MustPipeline(ctx context.Context) Pipeliner { + pipe := r.Pipeline(ctx) + if pipe == nil { + panic(gerror.NewCode(gcode.CodeInvalidParameter, errorNilRedis)) + } + return pipe +} + +// MustTxPipeline performs as function TxPipeline, but it panics if any error occurs internally. +func (r *Redis) MustTxPipeline(ctx context.Context) Pipeliner { + pipe := r.TxPipeline(ctx) + if pipe == nil { + panic(gerror.NewCode(gcode.CodeInvalidParameter, errorNilRedis)) + } + return pipe +} diff --git a/database/gredis/gredis_redis_group_generic.go b/database/gredis/gredis_redis_group_generic.go index 5dd6ffac437..dab811a5146 100644 --- a/database/gredis/gredis_redis_group_generic.go +++ b/database/gredis/gredis_redis_group_generic.go @@ -39,6 +39,12 @@ type IGroupGeneric interface { PExpireAt(ctx context.Context, key string, time time.Time, option ...ExpireOption) (int64, error) PExpireTime(ctx context.Context, key string) (*gvar.Var, error) PTTL(ctx context.Context, key string) (int64, error) + + // ScanAll iterates all keys matching the given pattern across the entire keyspace. + // In standalone and sentinel modes, it repeatedly calls Scan until the cursor returns to 0. + // In Cluster mode, it transparently iterates all master nodes and aggregates results. + // This is the safe alternative to Keys, which can block the server on large datasets. + ScanAll(ctx context.Context, option ...ScanOption) ([]string, error) } // CopyOption provides options for function Copy. diff --git a/database/gredis/gredis_z_pipeline_test.go b/database/gredis/gredis_z_pipeline_test.go new file mode 100644 index 00000000000..a957348175d --- /dev/null +++ b/database/gredis/gredis_z_pipeline_test.go @@ -0,0 +1,203 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT License was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gredis_test + +import ( + "context" + "errors" + "testing" + + "github.com/gogf/gf/v2/container/gvar" + "github.com/gogf/gf/v2/database/gredis" + "github.com/gogf/gf/v2/test/gtest" +) + +// Test_Cmd_BeforeExec verifies that a newly created Cmd returns nil values +// before any SetVal or SetErr calls. +func Test_Cmd_BeforeExec(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + cmd := &gredis.Cmd{} + val, err := cmd.Result() + t.AssertNil(val) + t.AssertNil(err) + t.AssertNil(cmd.Val()) + t.AssertNil(cmd.Err()) + }) +} + +// Test_Cmd_AfterSetVal verifies that SetVal correctly populates the value +// accessible via Result() and Val(). +func Test_Cmd_AfterSetVal(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + cmd := &gredis.Cmd{} + cmd.SetVal(gvar.New("test")) + val, err := cmd.Result() + t.AssertNil(err) + t.Assert(val.String(), "test") + t.Assert(cmd.Val().String(), "test") + }) +} + +// Test_Cmd_AfterSetErr verifies that SetErr correctly populates the error +// accessible via Result() and Err(). +func Test_Cmd_AfterSetErr(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + cmd := &gredis.Cmd{} + someErr := errors.New("redis error") + cmd.SetErr(someErr) + val, err := cmd.Result() + t.AssertNil(val) + t.AssertNE(err, nil) + t.Assert(err.Error(), "redis error") + t.AssertNE(cmd.Err(), nil) + t.Assert(cmd.Err().Error(), "redis error") + }) +} + +// Test_Cmd_AfterSetBoth verifies that both value and error can be populated +// simultaneously on a Cmd. +func Test_Cmd_AfterSetBoth(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + cmd := &gredis.Cmd{} + cmd.SetVal(gvar.New(42)) + someErr := errors.New("partial failure") + cmd.SetErr(someErr) + val, err := cmd.Result() + t.Assert(val.Int(), 42) + t.AssertNE(err, nil) + t.Assert(err.Error(), "partial failure") + t.Assert(cmd.Val().Int(), 42) + t.Assert(cmd.Err().Error(), "partial failure") + }) +} + +// Test_Cmd_SetValOverwrite verifies that SetVal can be called multiple times +// and the last value wins. +func Test_Cmd_SetValOverwrite(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + cmd := &gredis.Cmd{} + cmd.SetVal(gvar.New("first")) + t.Assert(cmd.Val().String(), "first") + cmd.SetVal(gvar.New("second")) + t.Assert(cmd.Val().String(), "second") + }) +} + +// Test_Cmd_SetErrOverwrite verifies that SetErr can be called multiple times +// and the last error wins. +func Test_Cmd_SetErrOverwrite(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + cmd := &gredis.Cmd{} + cmd.SetErr(errors.New("err1")) + t.Assert(cmd.Err().Error(), "err1") + cmd.SetErr(errors.New("err2")) + t.Assert(cmd.Err().Error(), "err2") + }) +} + +// Test_Cmd_SetErrNil verifies that SetErr(nil) clears a previously set error. +func Test_Cmd_SetErrNil(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + cmd := &gredis.Cmd{} + cmd.SetErr(errors.New("temporary")) + t.AssertNE(cmd.Err(), nil) + cmd.SetErr(nil) + t.AssertNil(cmd.Err()) + }) +} + +// Compile-time check: Redis must implement Pipeline, TxPipeline, and Watch methods. +var _ interface { + Pipeline(ctx context.Context) gredis.Pipeliner + TxPipeline(ctx context.Context) gredis.Pipeliner + Watch(ctx context.Context, fn func(gredis.Tx) error, keys ...string) error +} = (*gredis.Redis)(nil) + +// Test_AdapterOperation_HasPipeline verifies at runtime that the +// AdapterOperation interface includes Pipeline, TxPipeline, and Watch. +func Test_AdapterOperation_HasPipeline(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var _ gredis.AdapterOperation = (gredis.AdapterOperation)(nil) + t.Assert(true, true) + }) +} + +// Test_PipelineInterface_Compliance verifies that the Pipeliner interface +// composes PipelinerOperation and PipelinerGroup. +func Test_PipelineInterface_Compliance(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var _ gredis.Pipeliner = (gredis.Pipeliner)(nil) + t.Assert(true, true) + }) +} + +// Test_IGroupGeneric_ScanAll verifies at compile time that IGroupGeneric +// includes the ScanAll method. +func Test_IGroupGeneric_ScanAll(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var _ gredis.IGroupGeneric = (gredis.IGroupGeneric)(nil) + t.Assert(true, true) + }) +} + +// Test_Redis_Pipeline_NilReceiver verifies that Pipeline returns nil when +// called on a nil Redis or when no adapter is configured. +func Test_Redis_Pipeline_NilReceiver(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var r *gredis.Redis + result := r.Pipeline(context.Background()) + t.AssertNil(result) + }) +} + +// Test_Redis_TxPipeline_NilReceiver verifies that TxPipeline returns nil when +// called on a nil Redis or when no adapter is configured. +func Test_Redis_TxPipeline_NilReceiver(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var r *gredis.Redis + result := r.TxPipeline(context.Background()) + t.AssertNil(result) + }) +} + +// Test_Redis_Watch_NilReceiver verifies that Watch returns an error when +// called on a nil Redis. +func Test_Redis_Watch_NilReceiver(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var r *gredis.Redis + err := r.Watch(context.Background(), func(tx gredis.Tx) error { + return nil + }, "key1") + t.AssertNE(err, nil) + }) +} + +// Test_Redis_MustPipeline_NilAdapter verifies that MustPipeline panics +// when called on a Redis with nil adapter. +func Test_Redis_MustPipeline_NilAdapter(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + defer func() { + r := recover() + t.AssertNE(r, nil) + }() + r := &gredis.Redis{} + r.MustPipeline(context.Background()) + }) +} + +// Test_Redis_MustTxPipeline_NilAdapter verifies that MustTxPipeline panics +// when called on a Redis with nil adapter. +func Test_Redis_MustTxPipeline_NilAdapter(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + defer func() { + r := recover() + t.AssertNE(r, nil) + }() + r := &gredis.Redis{} + r.MustTxPipeline(context.Background()) + }) +} diff --git a/openspec/changes/gredis-pipeline-tx-cluster/.openspec.yaml b/openspec/changes/gredis-pipeline-tx-cluster/.openspec.yaml new file mode 100644 index 00000000000..8fe20555191 --- /dev/null +++ b/openspec/changes/gredis-pipeline-tx-cluster/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-06-12 diff --git a/openspec/changes/gredis-pipeline-tx-cluster/design.md b/openspec/changes/gredis-pipeline-tx-cluster/design.md new file mode 100644 index 00000000000..ae117cc319e --- /dev/null +++ b/openspec/changes/gredis-pipeline-tx-cluster/design.md @@ -0,0 +1,91 @@ +## Context + +GoFrame's `database/gredis` package defines an abstract `Adapter` interface with group-based Redis command access (Generic, Hash, String, List, Set, SortedSet, PubSub, Script). The concrete implementation lives in `contrib/nosql/redis` and wraps `go-redis/v9`'s `redis.UniversalClient`. + +The current Adapter interface (`AdapterOperation`) provides four methods: `Do`, `Conn`, `Close`, `Client`. There is no Pipeline, Transaction, or WATCH support at the interface level. Users who need batch execution must call `Client()` and type-assert to `redis.UniversalClient` to access go-redis's `Pipeline()` / `TxPipeline()` / `Watch()` — which breaks the driver-agnostic design and couples application code to a specific driver. + +For Cluster mode, `Scan` only covers a single node's keyspace, and multi-key `Del` triggers `CROSSSLOT` errors. These are infrastructure-level issues that belong in the framework, not in each application. + +## Goals / Non-Goals + +**Goals:** +- Add Pipeline support to the `AdapterOperation` interface with full group-style type safety. +- Add Transaction (MULTI/EXEC) and optimistic locking (WATCH) support. +- Add Cluster-safe `ScanAll` to `IGroupGeneric`. +- Implement Cluster-safe `Del` in the driver (per-key fallback in Cluster mode). +- All new interfaces must be driver-agnostic (no go-redis types in `database/gredis`). +- Achieve ≥ 80% test coverage for newly added code. + +**Non-Goals:** +- Batch convenience methods (BatchHGetAll, BatchHMSet, etc.) — these are application-level patterns built on top of Pipeline. +- Key-prefix namespacing, JSON serialization wrappers, index management, or CacheManager — these are application concerns, not framework primitives. +- PubSub or Script groups in the Pipeline interface — these operations do not fit the pipeline execution model. +- Configuration validation — GoFrame's `gcfg` already provides validation mechanisms. + +## Decisions + +### 1. Cmd as future-result container + +Pipeline commands are queued locally and executed in batch. A command queued at time T1 cannot return its result until `Exec()` is called at time T2. We introduce a `Cmd` type to hold this deferred result: + +```go +type Cmd struct { + val *gvar.Var // nil until Exec populates it + err error +} +``` + +After `Exec()`, users call `cmd.Result()` to retrieve the populated value and error. This mirrors go-redis's `Cmder` pattern while staying within GoFrame's `gvar.Var` ecosystem. + +### 2. Full Group-style Pipeliner interface + +The `Pipeliner` interface mirrors the existing group structure but with `*Cmd` return types. Each pipeline group interface (`IPipelineGroupHash`, etc.) has the same method names as the corresponding `IGroupHash`, differing only in return type: + +- Regular: `HSet(ctx, key, fields) (int64, error)` +- Pipeline: `HSet(ctx, key, fields) *Cmd` + +PubSub and Script groups are excluded from the Pipeliner because they do not fit the pipeline execution model (PubSub is push-based; scripts use EVAL which is itself a single command). + +### 3. Pipeline/TxPipeline return Pipeliner directly (not callback-based) + +We chose `Pipeline(ctx) Pipeliner` over a callback-based `Pipeline(ctx, func(pipe) error)` because: +- The direct-return pattern is simpler to use and test. +- Users retain control over when to call `Exec()`. +- go-redis uses the same pattern (`client.Pipeline()` returns `redis.Pipeliner`). + +### 4. Tx interface embeds Pipeliner + +`Watch` receives a callback with a `Tx` argument. `Tx` embeds `Pipeliner`, so inside the callback the user can queue commands on the transaction. The `Tx` interface is kept thin to avoid over-abstracting go-redis's `*redis.Tx`. + +### 5. ScanAll added to IGroupGeneric + +`ScanAll` abstracts the common "iterate all matching keys" pattern. In Cluster mode, the driver implementation uses `ForEachMaster` to aggregate results across all master nodes. In standalone/sentinel mode, it loops `Scan` until cursor returns to 0. + +### 6. Cluster-safe Del implemented in driver only + +`Del`'s interface signature stays the same (`Del(ctx, keys...) (int64, error)`). The driver detects Cluster mode at runtime and switches to per-key deletion to avoid CROSSSLOT. No interface change needed — this is purely a driver implementation detail. + +### 7. Files and estimated scope + +| File | Action | Est. Lines | +|---|---|---| +| `database/gredis/gredis_cmd.go` | New — Cmd type | ~50 | +| `database/gredis/gredis_pipeline.go` | New — Pipeliner + pipeline group interfaces | ~250 | +| `database/gredis/gredis_adapter.go` | Modified — extend AdapterOperation | +15 | +| `database/gredis/gredis_redis.go` | Modified — add Pipeline/TxPipeline/Watch | +40 | +| `database/gredis/gredis_redis_group_generic.go` | Modified — add ScanAll | +10 | +| `contrib/nosql/redis/redis_pipeline.go` | New — Pipeliner implementation | ~500 | +| `contrib/nosql/redis/redis.go` | Modified — implement Pipeline/TxPipeline/Watch | +30 | +| `contrib/nosql/redis/redis_group_generic.go` | Modified — ScanAll + Cluster-safe Del | +80 | +| Test files (both modules) | New + modified | ~600 | +| **Total** | | **~1600** | + +## Risks / Trade-offs + +- **Breaking change to AdapterOperation interface** — Any custom adapter implementing `AdapterOperation` must add `Pipeline`, `TxPipeline`, and `Watch` methods. This is acceptable because v2 allows breaking changes within the major version, and these are fundamental Redis capabilities that any real adapter should support. Mitigation: provide clear migration documentation. + +- **Large interface surface for Pipeline groups** — ~100 methods across 6 pipeline group interfaces. This is inherent to the "full Group style" decision. The implementation is mechanical (delegate each method to go-redis's pipeliner), and code generation could help if maintainers prefer it. + +- **Cmd type is new to users** — Users must learn the Cmd pattern (queue → Exec → Result). This is standard in Redis client libraries (go-redis, jedis, redis-py) and well-understood. Documentation and examples will ease adoption. + +- **Cluster-safe Del changes semantics silently** — In Cluster mode, a multi-key `Del` that previously errored with CROSSSLOT will now succeed (per-key). This is strictly better behavior but changes the error contract. Mitigation: log a debug-level message when the fallback path is taken. diff --git a/openspec/changes/gredis-pipeline-tx-cluster/proposal.md b/openspec/changes/gredis-pipeline-tx-cluster/proposal.md new file mode 100644 index 00000000000..aeaa98179f1 --- /dev/null +++ b/openspec/changes/gredis-pipeline-tx-cluster/proposal.md @@ -0,0 +1,37 @@ +## Why + +GoFrame's `database/gredis` abstract layer currently exposes only single-command operations (Do, group-based commands like HSet/Get/etc.). It lacks support for Pipeline (batch command execution in a single network round-trip), Transaction (MULTI/EXEC atomic execution), and optimistic locking (WATCH). Users who need these capabilities must resort to type-asserting the raw `Client()` return to `redis.UniversalClient` and manually constructing pipelines with go-redis types — breaking the driver-agnostic abstraction. + +Additionally, the concrete `contrib/nosql/redis` driver has no Cluster-aware SCAN (global key iteration across all master nodes) and no Cluster-safe multi-key DEL (naive multi-key DEL triggers CROSSSLOT errors). These are common infrastructure-level pitfalls that every Cluster-mode user encounters. + +## What Changes + +### Pipeline & Transaction Support +- Add a `Cmd` type as a future-result container for pipelined commands. +- Add a `Pipeliner` interface with full group-style command access (Generic, Hash, String, List, Set, SortedSet), mirroring existing group interfaces but returning `*Cmd` instead of `(value, error)`. +- Extend `AdapterOperation` interface with `Pipeline()`, `TxPipeline()`, and `Watch()`. +- Add `Tx` interface for transaction context (embeds `Pipeliner`). +- Implement the full `Pipeliner` in `contrib/nosql/redis` by delegating to go-redis's `redis.Pipeliner`. + +### Cluster Enhancement +- Add `ScanAll` method to `IGroupGeneric` — Cluster-safe global key scan that iterates all master nodes. +- Implement Cluster-safe `Del` in the driver — auto-detects Cluster mode and falls back to per-key deletion to avoid CROSSSLOT errors. + +## Capabilities + +### New Capabilities +- `gredis-pipeline-tx-cluster`: Pipeline, Transaction, optimistic-locking, and Cluster-safe operation support for the Redis adapter layer. + +### Modified Capabilities +- None. + +## Impact + +- `database/gredis/gredis_adapter.go` — extend `AdapterOperation` interface +- `database/gredis/gredis_redis.go` — add Pipeline/TxPipeline/Watch methods to `Redis` +- `database/gredis/gredis_redis_group_generic.go` — add `ScanAll` to `IGroupGeneric` +- `database/gredis/` — new files for `Cmd`, `Pipeliner`, pipeline group interfaces +- `contrib/nosql/redis/redis.go` — implement Pipeline/TxPipeline/Watch on the driver +- `contrib/nosql/redis/redis_group_generic.go` — implement ScanAll and Cluster-safe Del +- `contrib/nosql/redis/` — new file for pipeline group implementations +- New and updated test files across both modules diff --git a/openspec/changes/gredis-pipeline-tx-cluster/specs/gredis-pipeline-tx-cluster/spec.md b/openspec/changes/gredis-pipeline-tx-cluster/specs/gredis-pipeline-tx-cluster/spec.md new file mode 100644 index 00000000000..d27dafbc51e --- /dev/null +++ b/openspec/changes/gredis-pipeline-tx-cluster/specs/gredis-pipeline-tx-cluster/spec.md @@ -0,0 +1,73 @@ +## ADDED Requirements + +### Requirement: Pipeline support in AdapterOperation +The `AdapterOperation` interface SHALL provide a `Pipeline` method that returns a `Pipeliner` instance for batching multiple Redis commands into a single network round-trip. + +#### Scenario: User batches multiple commands via Pipeline +- **WHEN** a user calls `Pipeline(ctx)` on a Redis client +- **THEN** the method SHALL return a `Pipeliner` that queues commands locally without sending them to the server until `Exec` is called + +#### Scenario: Pipeline Exec sends all queued commands +- **WHEN** a user calls `Exec(ctx)` on a `Pipeliner` with queued commands +- **THEN** all queued commands SHALL be sent to the Redis server in a single batch +- **AND** each queued command's `Cmd` result SHALL be populated with the server's reply + +#### Scenario: Pipeline Discard clears queued commands +- **WHEN** a user calls `Discard()` on a `Pipeliner` with queued commands +- **THEN** all queued commands SHALL be discarded without being sent to the server + +### Requirement: Transaction support via TxPipeline +The `AdapterOperation` interface SHALL provide a `TxPipeline` method that returns a `Pipeliner` wrapping commands in a Redis MULTI/EXEC transaction. + +#### Scenario: TxPipeline executes atomically +- **WHEN** a user queues commands via `TxPipeline(ctx)` and calls `Exec(ctx)` +- **THEN** all queued commands SHALL be wrapped in MULTI/EXEC and executed atomically by the Redis server + +### Requirement: Optimistic locking via Watch +The `AdapterOperation` interface SHALL provide a `Watch` method that accepts a callback function and a list of keys to watch for changes. + +#### Scenario: Watch detects key modification +- **WHEN** a watched key is modified by another client before the transaction executes +- **THEN** the transaction SHALL be aborted and `Watch` SHALL return a transaction-abort error + +#### Scenario: Watch succeeds without modification +- **WHEN** no watched key is modified before the transaction executes +- **THEN** the callback SHALL receive a `Tx` instance and the queued commands SHALL execute successfully + +### Requirement: Pipeliner group-style interface +The `Pipeliner` interface SHALL provide typed command access through pipeline group interfaces (`IPipelineGroupGeneric`, `IPipelineGroupHash`, `IPipelineGroupString`, `IPipelineGroupList`, `IPipelineGroupSet`, `IPipelineGroupSortedSet`) mirroring the existing group interfaces with `*Cmd` return types. + +#### Scenario: Pipeline Hash commands +- **WHEN** a user calls `Pipeliner.PipelineGroupHash().HSet(ctx, key, fields)` +- **THEN** the command SHALL be queued and a `*Cmd` future SHALL be returned +- **AND** the `Cmd` result SHALL be populated after `Exec(ctx)` is called + +### Requirement: Cluster-safe ScanAll +The `IGroupGeneric` interface SHALL provide a `ScanAll` method that returns all keys matching a pattern, transparently handling Cluster mode by iterating all master nodes. + +#### Scenario: ScanAll in standalone mode +- **WHEN** `ScanAll` is called on a standalone Redis connection +- **THEN** it SHALL repeatedly call `Scan` until the cursor returns to 0 and return all accumulated keys + +#### Scenario: ScanAll in Cluster mode +- **WHEN** `ScanAll` is called on a Redis Cluster connection +- **THEN** it SHALL iterate all master nodes, scan each node, and return aggregated results from all nodes + +### Requirement: Cluster-safe Del +The driver implementation of `Del` SHALL automatically handle multi-key deletion in Cluster mode by performing per-key deletion to avoid CROSSSLOT errors. + +#### Scenario: Del multiple keys in standalone mode +- **WHEN** `Del(ctx, key1, key2, key3)` is called on a standalone Redis connection +- **THEN** a single DEL command with all keys SHALL be sent to the server + +#### Scenario: Del multiple keys in Cluster mode +- **WHEN** `Del(ctx, key1, key2, key3)` is called on a Redis Cluster connection +- **THEN** the driver SHALL perform per-key DEL operations to avoid CROSSSLOT errors +- **AND** the driver SHALL return the total number of keys deleted across all operations + +### Requirement: Driver-agnostic Pipeline interface +The `Pipeliner`, `Cmd`, and `Tx` types defined in `database/gredis` SHALL NOT reference any driver-specific types (e.g., `go-redis` types). The concrete implementation in `contrib/nosql/redis` SHALL translate between the framework types and go-redis types internally. + +#### Scenario: Custom adapter implements Pipeline +- **WHEN** a third-party adapter implements the `AdapterOperation` interface +- **THEN** it SHALL be able to implement `Pipeline`, `TxPipeline`, and `Watch` without depending on go-redis diff --git a/openspec/changes/gredis-pipeline-tx-cluster/tasks.md b/openspec/changes/gredis-pipeline-tx-cluster/tasks.md new file mode 100644 index 00000000000..b4b1aba074a --- /dev/null +++ b/openspec/changes/gredis-pipeline-tx-cluster/tasks.md @@ -0,0 +1,48 @@ +## 1. Abstract Layer — Cmd Type & Pipeline Interfaces (`database/gredis/`) + +- [x] 1.1 Create `gredis_cmd.go` — define `Cmd` struct with `val *gvar.Var`, `err error`, `Result()`, `Val()` methods +- [x] 1.2 Create `gredis_pipeline.go` — define `Pipeliner`, `PipelinerOperation`, `PipelinerGroup` interfaces +- [x] 1.3 Define pipeline group interfaces (`IPipelineGroupGeneric`, `IPipelineGroupHash`, `IPipelineGroupString`, `IPipelineGroupList`, `IPipelineGroupSet`, `IPipelineGroupSortedSet`) — mirror existing group method signatures with `*Cmd` return type +- [x] 1.4 Define `Tx` interface — embeds `Pipeliner` +- [x] 1.5 Extend `AdapterOperation` in `gredis_adapter.go` — add `Pipeline()`, `TxPipeline()`, `Watch()` +- [x] 1.6 Add `Pipeline()`, `TxPipeline()`, `Watch()` wrapper methods on `Redis` in `gredis_redis.go` +- [x] 1.7 Add `ScanAll` method to `IGroupGeneric` in `gredis_redis_group_generic.go` +- [x] 1.8 Verify: `go build ./database/gredis/...` compiles, `go vet` passes + +## 2. Concrete Driver — Pipeline & Transaction (`contrib/nosql/redis/`) + +- [x] 2.1 Create `redis_pipeline.go` — implement `Pipeliner` interface wrapping go-redis `redis.Pipeliner` +- [x] 2.2 Implement all pipeline group structs (`pipelineGroupGeneric`, `pipelineGroupHash`, etc.) delegating to `redis.Pipeliner` methods +- [x] 2.3 Implement `Cmd` population logic — translate go-redis `Cmder` results to `gredis.Cmd` after `Exec()` +- [x] 2.4 Implement `Pipeline()`, `TxPipeline()`, `Watch()` on the `*Redis` driver type in `redis.go` +- [x] 2.5 Verify: `go build ./...` in `contrib/nosql/redis/` compiles + +## 3. Concrete Driver — Cluster Enhancements (`contrib/nosql/redis/`) + +- [x] 3.1 Implement `ScanAll` in `redis_group_generic.go` — standalone/sentinel: loop Scan until cursor=0; cluster: `ForEachMaster` + per-node scan aggregation +- [x] 3.2 Implement Cluster-safe `Del` in `redis_group_generic.go` — detect `*redis.ClusterClient`, fall back to per-key DEL, return total deleted count +- [x] 3.3 Verify: `go build ./...` in `contrib/nosql/redis/` compiles + +## 4. Unit Tests — Abstract Layer (`database/gredis/`) + +- [x] 4.1 Test `Cmd` type — `Result()`, `Val()` before and after population +- [x] 4.2 Test `Redis.Pipeline()` returns non-nil `Pipeliner` +- [x] 4.3 Test `Redis.TxPipeline()` returns non-nil `Pipeliner` +- [x] 4.4 Test `ScanAll` interface exists on `IGroupGeneric` +- [x] 4.5 Verify: `go test ./database/gredis/... -count=1 -race` passes, coverage ≥ 80% + +## 5. Unit Tests — Concrete Driver (`contrib/nosql/redis/`) + +- [x] 5.1 Test Pipeline basic operations — queue HSet + Get, Exec, verify Cmd results populated +- [x] 5.2 Test Pipeline Discard — queue commands, Discard, verify no server interaction +- [x] 5.3 Test TxPipeline — queue multiple commands, Exec, verify atomic execution +- [x] 5.4 Test Watch — optimistic locking success and abort scenarios +- [x] 5.5 Test ScanAll in standalone mode — verify all matching keys returned +- [x] 5.6 Test Cluster-safe Del — verify per-key deletion path (may require mock/skip if no cluster) +- [x] 5.7 Verify: `go test ./... -count=1 -race` in `contrib/nosql/redis/` passes, coverage ≥ 80% + +## 6. Lint & Tidy + +- [x] 6.1 Run `make tidy` from repo root +- [x] 6.2 Run `make lint` and fix any issues +- [x] 6.3 Run `go build ./...` from repo root to verify no breakage