diff --git a/.changeset/seven-bottles-study.md b/.changeset/seven-bottles-study.md new file mode 100644 index 000000000..43d00fa6e --- /dev/null +++ b/.changeset/seven-bottles-study.md @@ -0,0 +1,5 @@ +--- +"@voltagent/redis": patch +--- + +feat: add @voltagent/redis memory storage adapter diff --git a/packages/redis/package.json b/packages/redis/package.json new file mode 100644 index 000000000..9173f76d1 --- /dev/null +++ b/packages/redis/package.json @@ -0,0 +1,52 @@ +{ + "name": "@voltagent/redis", + "description": "VoltAgent Redis - Redis Memory provider integration for VoltAgent", + "version": "0.1.0", + "dependencies": { + "@voltagent/internal": "^1.0.2", + "ioredis": "^5.6.1" + }, + "devDependencies": { + "@vitest/coverage-v8": "^3.2.4", + "@voltagent/core": "^2.4.4", + "ai": "^6.0.0" + }, + "exports": { + ".": { + "import": { + "types": "./dist/index.d.mts", + "default": "./dist/index.mjs" + }, + "require": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + } + } + }, + "files": [ + "dist" + ], + "license": "MIT", + "main": "dist/index.js", + "module": "dist/index.mjs", + "peerDependencies": { + "@voltagent/core": "^2.0.0", + "ai": "^6.0.0" + }, + "repository": { + "type": "git", + "url": "https://github.com/VoltAgent/voltagent.git", + "directory": "packages/redis" + }, + "scripts": { + "attw": "attw --pack", + "build": "tsup", + "dev": "tsup --watch", + "lint": "biome check .", + "lint:fix": "biome check . --write", + "publint": "publint --strict", + "test": "vitest", + "test:coverage": "vitest run --coverage" + }, + "types": "dist/index.d.ts" +} diff --git a/packages/redis/src/index.ts b/packages/redis/src/index.ts new file mode 100644 index 000000000..2672568fe --- /dev/null +++ b/packages/redis/src/index.ts @@ -0,0 +1 @@ +export { RedisMemoryAdapter, type RedisMemoryOptions } from "./memory-adapter"; diff --git a/packages/redis/src/memory-adapter.spec.ts b/packages/redis/src/memory-adapter.spec.ts new file mode 100644 index 000000000..cb49045fc --- /dev/null +++ b/packages/redis/src/memory-adapter.spec.ts @@ -0,0 +1,304 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { RedisMemoryAdapter } from "./memory-adapter"; + +// Mock ioredis +const mockPipeline = { + set: vi.fn().mockReturnThis(), + get: vi.fn().mockReturnThis(), + del: vi.fn().mockReturnThis(), + zadd: vi.fn().mockReturnThis(), + zrem: vi.fn().mockReturnThis(), + sadd: vi.fn().mockReturnThis(), + srem: vi.fn().mockReturnThis(), + exec: vi.fn().mockResolvedValue([]), +}; + +const mockRedis = { + get: vi.fn(), + set: vi.fn(), + del: vi.fn(), + exists: vi.fn(), + zadd: vi.fn(), + zrange: vi.fn().mockResolvedValue([]), + zrevrange: vi.fn().mockResolvedValue([]), + zrangebyscore: vi.fn().mockResolvedValue([]), + zrem: vi.fn(), + sadd: vi.fn(), + srem: vi.fn(), + smembers: vi.fn().mockResolvedValue([]), + pipeline: vi.fn(() => mockPipeline), + quit: vi.fn().mockResolvedValue("OK"), +}; + +vi.mock("ioredis", () => ({ + default: vi.fn(() => mockRedis), +})); + +describe("RedisMemoryAdapter", () => { + let adapter: RedisMemoryAdapter; + + beforeEach(() => { + vi.clearAllMocks(); + adapter = new RedisMemoryAdapter({ + connection: "redis://localhost:6379", + keyPrefix: "test", + }); + }); + + // ── Conversation tests ─────────────────────────────────────────────── + + describe("createConversation", () => { + it("creates a conversation and indexes it", async () => { + mockRedis.exists.mockResolvedValue(0); + + const result = await adapter.createConversation({ + id: "conv-1", + resourceId: "agent-1", + userId: "user-1", + title: "Test Conversation", + metadata: {}, + }); + + expect(result.id).toBe("conv-1"); + expect(result.resourceId).toBe("agent-1"); + expect(result.userId).toBe("user-1"); + expect(result.title).toBe("Test Conversation"); + expect(result.createdAt).toBeDefined(); + + expect(mockPipeline.set).toHaveBeenCalledWith("test:conv:conv-1", expect.any(String)); + expect(mockPipeline.zadd).toHaveBeenCalledWith( + "test:convs:resource:agent-1", + expect.any(Number), + "conv-1", + ); + expect(mockPipeline.zadd).toHaveBeenCalledWith( + "test:convs:user:user-1", + expect.any(Number), + "conv-1", + ); + expect(mockPipeline.exec).toHaveBeenCalled(); + }); + + it("throws ConversationAlreadyExistsError for duplicate IDs", async () => { + mockRedis.exists.mockResolvedValue(1); + + await expect( + adapter.createConversation({ + id: "conv-1", + resourceId: "agent-1", + userId: "user-1", + title: "Test", + metadata: {}, + }), + ).rejects.toThrow(); + }); + }); + + describe("getConversation", () => { + it("returns a conversation by ID", async () => { + const conv = { + id: "conv-1", + resourceId: "agent-1", + userId: "user-1", + title: "Test", + metadata: {}, + createdAt: "2026-01-01T00:00:00.000Z", + updatedAt: "2026-01-01T00:00:00.000Z", + }; + mockRedis.get.mockResolvedValue(JSON.stringify(conv)); + + const result = await adapter.getConversation("conv-1"); + expect(result).toEqual(conv); + expect(mockRedis.get).toHaveBeenCalledWith("test:conv:conv-1"); + }); + + it("returns null for nonexistent conversation", async () => { + mockRedis.get.mockResolvedValue(null); + const result = await adapter.getConversation("nonexistent"); + expect(result).toBeNull(); + }); + }); + + describe("updateConversation", () => { + it("updates title and updatedAt", async () => { + const existing = { + id: "conv-1", + resourceId: "agent-1", + userId: "user-1", + title: "Old Title", + metadata: {}, + createdAt: "2026-01-01T00:00:00.000Z", + updatedAt: "2026-01-01T00:00:00.000Z", + }; + mockRedis.get.mockResolvedValue(JSON.stringify(existing)); + + const result = await adapter.updateConversation("conv-1", { title: "New Title" }); + expect(result.title).toBe("New Title"); + expect(result.createdAt).toBe(existing.createdAt); + expect(result.updatedAt).not.toBe(existing.updatedAt); + }); + + it("throws ConversationNotFoundError for missing conversation", async () => { + mockRedis.get.mockResolvedValue(null); + await expect(adapter.updateConversation("nonexistent", { title: "X" })).rejects.toThrow(); + }); + }); + + describe("deleteConversation", () => { + it("deletes conversation and all related data", async () => { + const conv = { + id: "conv-1", + resourceId: "agent-1", + userId: "user-1", + title: "Test", + metadata: {}, + createdAt: "2026-01-01T00:00:00.000Z", + updatedAt: "2026-01-01T00:00:00.000Z", + }; + mockRedis.get.mockResolvedValue(JSON.stringify(conv)); + + await adapter.deleteConversation("conv-1"); + + expect(mockPipeline.del).toHaveBeenCalledWith("test:conv:conv-1"); + expect(mockPipeline.del).toHaveBeenCalledWith("test:msgs:conv-1"); + expect(mockPipeline.del).toHaveBeenCalledWith("test:steps:conv-1"); + expect(mockPipeline.zrem).toHaveBeenCalledWith("test:convs:resource:agent-1", "conv-1"); + expect(mockPipeline.zrem).toHaveBeenCalledWith("test:convs:user:user-1", "conv-1"); + }); + }); + + // ── Message tests ──────────────────────────────────────────────────── + + describe("addMessage", () => { + it("adds a message to the conversation sorted set", async () => { + await adapter.addMessage( + { id: "msg-1", role: "user", parts: [{ type: "text", text: "hello" }] } as UIMessage, + "user-1", + "conv-1", + ); + + expect(mockRedis.zadd).toHaveBeenCalledWith( + "test:msgs:conv-1", + expect.any(Number), + expect.stringContaining("msg-1"), + ); + }); + }); + + describe("getMessages", () => { + it("returns messages from the sorted set", async () => { + const msg = { + id: "msg-1", + role: "user", + parts: [{ type: "text", text: "hi" }], + createdAt: "2026-01-01T00:00:00.000Z", + }; + mockRedis.zrange.mockResolvedValue([JSON.stringify(msg)]); + + const result = await adapter.getMessages("user-1", "conv-1"); + expect(result).toHaveLength(1); + expect(result[0].id).toBe("msg-1"); + expect(result[0].createdAt).toBeInstanceOf(Date); + }); + + it("applies limit option", async () => { + const messages = Array.from({ length: 5 }, (_, i) => + JSON.stringify({ + id: `msg-${i}`, + role: "user", + parts: [], + createdAt: new Date(2026, 0, 1, 0, i).toISOString(), + }), + ); + mockRedis.zrange.mockResolvedValue(messages); + + const result = await adapter.getMessages("user-1", "conv-1", { limit: 2 }); + expect(result).toHaveLength(2); + }); + }); + + describe("clearMessages", () => { + it("clears messages for a specific conversation", async () => { + await adapter.clearMessages("user-1", "conv-1"); + expect(mockRedis.del).toHaveBeenCalledWith("test:msgs:conv-1"); + }); + }); + + // ── Working memory tests ───────────────────────────────────────────── + + describe("workingMemory", () => { + it("sets and gets conversation-scoped working memory", async () => { + mockRedis.get.mockResolvedValue("memory content"); + + const result = await adapter.getWorkingMemory({ + conversationId: "conv-1", + scope: "conversation", + }); + + expect(result).toBe("memory content"); + expect(mockRedis.get).toHaveBeenCalledWith("test:wm:conv:conv-1"); + }); + + it("sets user-scoped working memory", async () => { + await adapter.setWorkingMemory({ + userId: "user-1", + content: "user memory", + scope: "user", + }); + + expect(mockRedis.set).toHaveBeenCalledWith("test:wm:user:user-1", "user memory"); + }); + + it("deletes working memory", async () => { + await adapter.deleteWorkingMemory({ + conversationId: "conv-1", + scope: "conversation", + }); + + expect(mockRedis.del).toHaveBeenCalledWith("test:wm:conv:conv-1"); + }); + }); + + // ── Workflow state tests ───────────────────────────────────────────── + + describe("workflowState", () => { + it("stores and retrieves workflow state", async () => { + const state = { + id: "exec-1", + workflowId: "wf-1", + workflowName: "Test Workflow", + status: "running" as const, + createdAt: new Date("2026-01-01"), + updatedAt: new Date("2026-01-01"), + }; + + mockRedis.get.mockResolvedValue( + JSON.stringify({ + ...state, + createdAt: state.createdAt.toISOString(), + updatedAt: state.updatedAt.toISOString(), + }), + ); + + const result = await adapter.getWorkflowState("exec-1"); + expect(result?.id).toBe("exec-1"); + expect(result?.workflowId).toBe("wf-1"); + expect(result?.createdAt).toBeInstanceOf(Date); + }); + + it("returns null for nonexistent workflow state", async () => { + mockRedis.get.mockResolvedValue(null); + const result = await adapter.getWorkflowState("nonexistent"); + expect(result).toBeNull(); + }); + }); + + // ── Disconnect ─────────────────────────────────────────────────────── + + describe("disconnect", () => { + it("calls quit on the Redis client", async () => { + await adapter.disconnect(); + expect(mockRedis.quit).toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/redis/src/memory-adapter.ts b/packages/redis/src/memory-adapter.ts new file mode 100644 index 000000000..2507a68dc --- /dev/null +++ b/packages/redis/src/memory-adapter.ts @@ -0,0 +1,524 @@ +/** + * Redis Storage Adapter for VoltAgent Memory + * Provides fast, in-memory persistence for conversations, messages, + * working memory, and workflow state using Redis. + */ + +import { ConversationAlreadyExistsError, ConversationNotFoundError } from "@voltagent/core"; +import type { + Conversation, + ConversationQueryOptions, + ConversationStepRecord, + CreateConversationInput, + GetConversationStepsOptions, + GetMessagesOptions, + OperationContext, + StorageAdapter, + WorkflowRunQuery, + WorkflowStateEntry, + WorkingMemoryScope, +} from "@voltagent/core"; +import { safeStringify } from "@voltagent/internal"; +import type { UIMessage } from "ai"; +import Redis, { type RedisOptions } from "ioredis"; + +// ============================================================================ +// Configuration +// ============================================================================ + +export interface RedisMemoryOptions { + /** + * Redis connection configuration. + * Can be a connection URL string or ioredis options object. + */ + connection: string | RedisOptions; + + /** + * Key prefix for all Redis keys. + * @default "voltagent" + */ + keyPrefix?: string; + + /** + * Whether to enable debug logging. + * @default false + */ + debug?: boolean; +} + +// ============================================================================ +// Adapter +// ============================================================================ + +export class RedisMemoryAdapter implements StorageAdapter { + private client: Redis; + private prefix: string; + private debug: boolean; + + constructor(options: RedisMemoryOptions) { + this.prefix = options.keyPrefix ?? "voltagent"; + this.debug = options.debug ?? false; + + if (typeof options.connection === "string") { + this.client = new Redis(options.connection); + } else { + this.client = new Redis(options.connection); + } + } + + // ── Key helpers ────────────────────────────────────────────────────── + + private key(...parts: string[]): string { + return [this.prefix, ...parts].join(":"); + } + + private log(message: string, data?: unknown): void { + if (this.debug) { + console.log(`[RedisMemory] ${message}`, data ? safeStringify(data) : ""); + } + } + + // ── Conversation operations ────────────────────────────────────────── + + async createConversation(input: CreateConversationInput): Promise { + const existing = await this.client.exists(this.key("conv", input.id)); + if (existing) { + throw new ConversationAlreadyExistsError(input.id); + } + + const now = new Date().toISOString(); + const conversation: Conversation = { + id: input.id, + resourceId: input.resourceId, + userId: input.userId, + title: input.title, + metadata: input.metadata, + createdAt: now, + updatedAt: now, + }; + + const pipeline = this.client.pipeline(); + pipeline.set(this.key("conv", input.id), safeStringify(conversation)); + pipeline.zadd(this.key("convs:resource", input.resourceId), Date.now(), input.id); + pipeline.zadd(this.key("convs:user", input.userId), Date.now(), input.id); + pipeline.zadd(this.key("convs:all"), Date.now(), input.id); + await pipeline.exec(); + + this.log("createConversation", { id: input.id }); + return conversation; + } + + async getConversation(id: string): Promise { + const data = await this.client.get(this.key("conv", id)); + if (!data) return null; + return JSON.parse(data) as Conversation; + } + + async getConversations(resourceId: string): Promise { + const ids = await this.client.zrevrange(this.key("convs:resource", resourceId), 0, -1); + return this.getConversationsByIds(ids); + } + + async getConversationsByUserId( + userId: string, + options?: Omit, + ): Promise { + const ids = await this.client.zrevrange(this.key("convs:user", userId), 0, -1); + const conversations = await this.getConversationsByIds(ids); + return this.applyQueryOptions(conversations, { ...options, userId }); + } + + async queryConversations(options: ConversationQueryOptions): Promise { + let ids: string[]; + + if (options.userId) { + ids = await this.client.zrevrange(this.key("convs:user", options.userId), 0, -1); + } else if (options.resourceId) { + ids = await this.client.zrevrange(this.key("convs:resource", options.resourceId), 0, -1); + } else { + ids = await this.client.zrevrange(this.key("convs:all"), 0, -1); + } + + const conversations = await this.getConversationsByIds(ids); + return this.applyQueryOptions(conversations, options); + } + + async countConversations(options: ConversationQueryOptions): Promise { + const conversations = await this.queryConversations({ + ...options, + limit: undefined, + offset: undefined, + }); + return conversations.length; + } + + async updateConversation( + id: string, + updates: Partial>, + ): Promise { + const existing = await this.getConversation(id); + if (!existing) { + throw new ConversationNotFoundError(id); + } + + const updated: Conversation = { + ...existing, + ...updates, + updatedAt: new Date().toISOString(), + }; + + await this.client.set(this.key("conv", id), safeStringify(updated)); + this.log("updateConversation", { id }); + return updated; + } + + async deleteConversation(id: string): Promise { + const conversation = await this.getConversation(id); + if (!conversation) return; + + const pipeline = this.client.pipeline(); + pipeline.del(this.key("conv", id)); + pipeline.del(this.key("msgs", id)); + pipeline.del(this.key("steps", id)); + pipeline.zrem(this.key("convs:resource", conversation.resourceId), id); + pipeline.zrem(this.key("convs:user", conversation.userId), id); + pipeline.zrem(this.key("convs:all"), id); + await pipeline.exec(); + + this.log("deleteConversation", { id }); + } + + // ── Message operations ─────────────────────────────────────────────── + + async addMessage( + message: UIMessage, + userId: string, + conversationId: string, + _context?: OperationContext, + ): Promise { + const createdAt = (message as UIMessage & { createdAt?: Date }).createdAt ?? new Date(); + const entry = safeStringify({ + ...message, + userId, + conversationId, + createdAt: createdAt.toISOString(), + }); + await this.client.zadd(this.key("msgs", conversationId), createdAt.getTime(), entry); + this.log("addMessage", { id: message.id, conversationId }); + } + + async addMessages( + messages: UIMessage[], + userId: string, + conversationId: string, + context?: OperationContext, + ): Promise { + for (const message of messages) { + await this.addMessage(message, userId, conversationId, context); + } + } + + async getMessages( + _userId: string, + conversationId: string, + options?: GetMessagesOptions, + _context?: OperationContext, + ): Promise[]> { + let entries: string[]; + + if (options?.before || options?.after) { + const min = options.after ? options.after.getTime() : "-inf"; + const max = options.before ? options.before.getTime() : "+inf"; + entries = await this.client.zrangebyscore(this.key("msgs", conversationId), min, max); + } else { + entries = await this.client.zrange(this.key("msgs", conversationId), 0, -1); + } + + let messages = entries.map((entry) => { + const parsed = JSON.parse(entry); + return { + ...parsed, + createdAt: new Date(parsed.createdAt), + } as UIMessage<{ createdAt: Date }>; + }); + + if (options?.roles) { + messages = messages.filter((m) => options.roles?.includes(m.role)); + } + + if (options?.limit) { + messages = messages.slice(-options.limit); + } + + return messages; + } + + async clearMessages( + userId: string, + conversationId?: string, + _context?: OperationContext, + ): Promise { + if (conversationId) { + await this.client.del(this.key("msgs", conversationId)); + } else { + // Clear all messages for the user's conversations + const convIds = await this.client.zrange(this.key("convs:user", userId), 0, -1); + if (convIds.length > 0) { + const pipeline = this.client.pipeline(); + for (const id of convIds) { + pipeline.del(this.key("msgs", id)); + } + await pipeline.exec(); + } + } + + this.log("clearMessages", { userId, conversationId }); + } + + async deleteMessages( + messageIds: string[], + _userId: string, + conversationId: string, + _context?: OperationContext, + ): Promise { + const key = this.key("msgs", conversationId); + const entries = await this.client.zrange(key, 0, -1); + const idsToDelete = new Set(messageIds); + + const pipeline = this.client.pipeline(); + for (const entry of entries) { + const parsed = JSON.parse(entry); + if (idsToDelete.has(parsed.id)) { + pipeline.zrem(key, entry); + } + } + await pipeline.exec(); + + this.log("deleteMessages", { messageIds, conversationId }); + } + + // ── Conversation steps ─────────────────────────────────────────────── + + async saveConversationSteps(steps: ConversationStepRecord[]): Promise { + if (steps.length === 0) return; + + const pipeline = this.client.pipeline(); + for (const step of steps) { + const score = new Date(step.createdAt).getTime(); + pipeline.zadd(this.key("steps", step.conversationId), score, safeStringify(step)); + } + await pipeline.exec(); + + this.log("saveConversationSteps", { count: steps.length }); + } + + async getConversationSteps( + _userId: string, + conversationId: string, + options?: GetConversationStepsOptions, + ): Promise { + const entries = await this.client.zrange(this.key("steps", conversationId), 0, -1); + let steps = entries.map((e) => JSON.parse(e) as ConversationStepRecord); + + if (options?.operationId) { + steps = steps.filter((s) => s.operationId === options.operationId); + } + + if (options?.limit) { + steps = steps.slice(-options.limit); + } + + return steps; + } + + // ── Working memory ─────────────────────────────────────────────────── + + async getWorkingMemory(params: { + conversationId?: string; + userId?: string; + scope: WorkingMemoryScope; + }): Promise { + const scopeKey = + params.scope === "conversation" ? `conv:${params.conversationId}` : `user:${params.userId}`; + return this.client.get(this.key("wm", scopeKey)); + } + + async setWorkingMemory(params: { + conversationId?: string; + userId?: string; + content: string; + scope: WorkingMemoryScope; + }): Promise { + const scopeKey = + params.scope === "conversation" ? `conv:${params.conversationId}` : `user:${params.userId}`; + await this.client.set(this.key("wm", scopeKey), params.content); + } + + async deleteWorkingMemory(params: { + conversationId?: string; + userId?: string; + scope: WorkingMemoryScope; + }): Promise { + const scopeKey = + params.scope === "conversation" ? `conv:${params.conversationId}` : `user:${params.userId}`; + await this.client.del(this.key("wm", scopeKey)); + } + + // ── Workflow state ─────────────────────────────────────────────────── + + async getWorkflowState(executionId: string): Promise { + const data = await this.client.get(this.key("wf", executionId)); + if (!data) return null; + return this.deserializeWorkflowState(JSON.parse(data)); + } + + async queryWorkflowRuns(query: WorkflowRunQuery): Promise { + // Get all workflow execution IDs + let ids: string[]; + if (query.workflowId) { + ids = await this.client.zrevrange(this.key("wf:idx", query.workflowId), 0, -1); + } else { + ids = await this.client.zrevrange(this.key("wf:all"), 0, -1); + } + + const results: WorkflowStateEntry[] = []; + for (const id of ids) { + const state = await this.getWorkflowState(id); + if (!state) continue; + if (query.status && state.status !== query.status) continue; + if (query.userId && state.userId !== query.userId) continue; + if (query.from && state.createdAt < query.from) continue; + if (query.to && state.createdAt > query.to) continue; + results.push(state); + } + + const offset = query.offset ?? 0; + const limit = query.limit ?? results.length; + return results.slice(offset, offset + limit); + } + + async setWorkflowState(executionId: string, state: WorkflowStateEntry): Promise { + const pipeline = this.client.pipeline(); + pipeline.set(this.key("wf", executionId), safeStringify(state)); + pipeline.zadd(this.key("wf:idx", state.workflowId), state.createdAt.getTime(), executionId); + pipeline.zadd(this.key("wf:all"), state.createdAt.getTime(), executionId); + if (state.status === "suspended") { + pipeline.sadd(this.key("wf:suspended", state.workflowId), executionId); + } + await pipeline.exec(); + } + + async updateWorkflowState( + executionId: string, + updates: Partial, + ): Promise { + const existing = await this.getWorkflowState(executionId); + if (!existing) return; + + const updated = { ...existing, ...updates, updatedAt: new Date() }; + + const pipeline = this.client.pipeline(); + pipeline.set(this.key("wf", executionId), safeStringify(updated)); + + // Update suspended index + if (updates.status && updates.status !== "suspended") { + pipeline.srem(this.key("wf:suspended", existing.workflowId), executionId); + } else if (updates.status === "suspended") { + pipeline.sadd(this.key("wf:suspended", existing.workflowId), executionId); + } + + await pipeline.exec(); + } + + async getSuspendedWorkflowStates(workflowId: string): Promise { + const ids = await this.client.smembers(this.key("wf:suspended", workflowId)); + const results: WorkflowStateEntry[] = []; + for (const id of ids) { + const state = await this.getWorkflowState(id); + if (state && state.status === "suspended") { + results.push(state); + } + } + return results; + } + + // ── Cleanup ────────────────────────────────────────────────────────── + + /** + * Disconnect the Redis client. Call this when shutting down. + */ + async disconnect(): Promise { + await this.client.quit(); + } + + // ── Private helpers ────────────────────────────────────────────────── + + private async getConversationsByIds(ids: string[]): Promise { + if (ids.length === 0) return []; + + const pipeline = this.client.pipeline(); + for (const id of ids) { + pipeline.get(this.key("conv", id)); + } + const results = await pipeline.exec(); + if (!results) return []; + + const conversations: Conversation[] = []; + for (const [err, data] of results) { + if (!err && data) { + conversations.push(JSON.parse(data as string) as Conversation); + } + } + return conversations; + } + + private applyQueryOptions( + conversations: Conversation[], + options: ConversationQueryOptions, + ): Conversation[] { + let result = [...conversations]; + + if (options.userId) { + result = result.filter((c) => c.userId === options.userId); + } + if (options.resourceId) { + result = result.filter((c) => c.resourceId === options.resourceId); + } + + const orderBy = options.orderBy ?? "created_at"; + const direction = options.orderDirection ?? "DESC"; + result.sort((a, b) => { + let aVal: string; + let bVal: string; + if (orderBy === "title") { + aVal = a.title; + bVal = b.title; + } else if (orderBy === "updated_at") { + aVal = a.updatedAt; + bVal = b.updatedAt; + } else { + aVal = a.createdAt; + bVal = b.createdAt; + } + const cmp = aVal.localeCompare(bVal); + return direction === "ASC" ? cmp : -cmp; + }); + + const offset = options.offset ?? 0; + if (options.limit) { + result = result.slice(offset, offset + options.limit); + } else if (offset > 0) { + result = result.slice(offset); + } + + return result; + } + + private deserializeWorkflowState(raw: Record): WorkflowStateEntry { + return { + ...raw, + createdAt: new Date(raw.createdAt as string), + updatedAt: new Date(raw.updatedAt as string), + } as WorkflowStateEntry; + } +} diff --git a/packages/redis/tsconfig.json b/packages/redis/tsconfig.json new file mode 100644 index 000000000..e467bc5df --- /dev/null +++ b/packages/redis/tsconfig.json @@ -0,0 +1,30 @@ +{ + "compilerOptions": { + "target": "es2018", + "lib": ["dom", "dom.iterable", "esnext"], + "module": "esnext", + "moduleResolution": "node", + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "outDir": "./dist", + "rootDir": "./", + "strict": true, + "noImplicitAny": true, + "strictNullChecks": true, + "strictFunctionTypes": true, + "strictBindCallApply": true, + "strictPropertyInitialization": true, + "noImplicitThis": true, + "noUnusedLocals": true, + "noUnusedParameters": true, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "types": ["node", "vitest/globals"] + }, + "include": ["src/**/*.ts"], + "exclude": ["node_modules", "dist"] +} diff --git a/packages/redis/tsup.config.ts b/packages/redis/tsup.config.ts new file mode 100644 index 000000000..0819104fd --- /dev/null +++ b/packages/redis/tsup.config.ts @@ -0,0 +1,19 @@ +import { defineConfig } from "tsup"; +import { markAsExternalPlugin } from "../shared/tsup-plugins/mark-as-external"; + +export default defineConfig({ + entry: ["src/index.ts"], + format: ["cjs", "esm"], + splitting: false, + sourcemap: true, + clean: false, + target: "es2022", + outDir: "dist", + minify: false, + dts: true, + esbuildPlugins: [markAsExternalPlugin], + esbuildOptions(options) { + options.keepNames = true; + return options; + }, +}); diff --git a/packages/redis/vitest.config.mts b/packages/redis/vitest.config.mts new file mode 100644 index 000000000..46f1107e6 --- /dev/null +++ b/packages/redis/vitest.config.mts @@ -0,0 +1,16 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + include: ["**/*.spec.ts"], + exclude: ["**/node_modules/**"], + environment: "node", + coverage: { + provider: "v8", + reporter: ["text", "json", "html"], + include: ["src/**/*.ts"], + exclude: ["src/**/*.d.ts", "src/**/index.ts"], + }, + globals: true, + }, +});