Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ const SEND_OWNER_REMINDER_DIGEST_EMAIL_JOB_DEFINITION_ID = 'send.owner.reminder.

const SEND_OWNER_REMINDER_DIGEST_EMAIL_JOB_DEFINITION_SCHEMA = z.object({
teamId: z.number(),
// Array of envelope summaries pre-computed by the sweep handler.
// Handler renders these directly without re-querying, keeping the digest job lightweight.
userId: z.number(),
envelopeIds: z.array(z.string()),
});

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable @typescript-eslint/require-await -- stub; implementer must remove this and add real async/await */
import { DocumentStatus, SigningStatus } from '@prisma/client';
import { DateTime } from 'luxon';

Expand All @@ -9,75 +8,115 @@ import type { JobRunIO } from '../../client/_internal/job';
import type { TSendRemindersSweepJobDefinition } from './send-reminders-sweep';

export const run = async ({ io }: { payload: TSendRemindersSweepJobDefinition; io: JobRunIO }) => {
// TODO(Person 1): Implement sweep logic.
//
// Steps:
// 1. Query pending envelopes with reminderEnabled=true and a set reminderIntervalDays.
// Filter to envelopes where the document is still PENDING.
//
// const envelopes = await prisma.envelope.findMany({
// where: {
// status: DocumentStatus.PENDING,
// documentMeta: {
// reminderEnabled: true,
// reminderIntervalDays: { not: null },
// },
// },
// include: {
// documentMeta: true,
// recipients: {
// where: {
// signingStatus: { notIn: [SigningStatus.SIGNED, SigningStatus.REJECTED] },
// documentDeletedAt: null,
// },
// },
// reminderLogs: {
// orderBy: { createdAt: 'desc' },
// },
// },
// take: 1000,
// });
//
// 2. For each envelope and each unsigned recipient, determine if a reminder is due.
// A reminder is due when:
// (now - lastReminderSentAt) >= reminderIntervalDays
// where lastReminderSentAt is the most recent DocumentReminderLog.createdAt for
// that recipient, or the envelope sentAt (createdAt) if no log exists yet.
//
// Use DateTime from 'luxon' for date arithmetic.
//
// 3. Collect recipients due for a reminder. Cap at 1000 total across all envelopes.
//
// 4. Fan out per-recipient email jobs:
// await Promise.allSettled(
// dueRecipients.map((r) =>
// jobs.triggerJob({
// name: 'send.recipient.reminder.email',
// payload: { recipientId: r.id, envelopeId: r.envelopeId },
// }),
// ),
// );
//
// 5. Group due envelopes by teamId. For each team with at least one due envelope,
// trigger the digest job:
// await Promise.allSettled(
// Object.entries(byTeam).map(([teamId, envIds]) =>
// jobs.triggerJob({
// name: 'send.owner.reminder.digest.email',
// payload: { teamId: Number(teamId), envelopeIds: envIds },
// }),
// ),
// );

// Reference implementations:
// expire-recipients-sweep.handler.ts — sweep query pattern + Promise.allSettled fan-out
// process-recipient-expired.handler.ts — io.runTask idempotency pattern

void DocumentStatus;
void SigningStatus;
void DateTime;
void prisma;
void jobs;

io.logger.info('send-reminders-sweep: not yet implemented');
const now = DateTime.now();

const envelopes = await prisma.envelope.findMany({
where: {
status: DocumentStatus.PENDING,
documentMeta: {
reminderEnabled: true,
reminderIntervalDays: { not: null },
},
},
Comment on lines +14 to +20
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Exclude soft-deleted envelopes in the sweep query.

The current where clause omits deletedAt: null, so reminders may be sent for deleted envelopes.

🔧 Suggested query patch
   const envelopes = await prisma.envelope.findMany({
     where: {
       status: DocumentStatus.PENDING,
+      deletedAt: null,
       documentMeta: {
         reminderEnabled: true,
         reminderIntervalDays: { not: null },
       },
     },
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/lib/jobs/definitions/internal/send-reminders-sweep.handler.ts`
around lines 14 - 20, The sweep query's where clause must exclude soft-deleted
envelopes by adding a deletedAt: null condition; update the query (the where
object used in the send-reminders sweep handler) alongside existing predicates
(DocumentStatus.PENDING, documentMeta.reminderEnabled,
documentMeta.reminderIntervalDays) so that only records with deletedAt === null
are selected and reminders aren't sent for deleted envelopes.

include: {
documentMeta: true,
recipients: {
where: {
signingStatus: { notIn: [SigningStatus.SIGNED, SigningStatus.REJECTED] },
documentDeletedAt: null,
},
},
reminderLogs: {
orderBy: { createdAt: 'desc' as const },
},
},
take: 1000,
});

if (envelopes.length === 0) {
io.logger.info('No envelopes with reminders enabled');
return;
}

const dueRecipients: Array<{ id: number; envelopeId: string }> = [];
const dueEnvelopesByOwner = new Map<
string,
{ teamId: number; userId: number; envelopeIds: Set<string> }
>();

for (const envelope of envelopes) {
const intervalDays = envelope.documentMeta?.reminderIntervalDays;

if (!intervalDays) {
continue;
}

for (const recipient of envelope.recipients) {
const lastLog = envelope.reminderLogs.find((log) => log.recipientId === recipient.id);
const lastSentAt = lastLog
? DateTime.fromJSDate(lastLog.createdAt)
: DateTime.fromJSDate(envelope.createdAt);

const daysSinceLast = now.diff(lastSentAt, 'days').days;

if (daysSinceLast >= intervalDays) {
dueRecipients.push({ id: recipient.id, envelopeId: envelope.id });

const ownerKey = `${envelope.teamId}:${envelope.userId}`;
const ownerBucket = dueEnvelopesByOwner.get(ownerKey) ?? {
teamId: envelope.teamId,
userId: envelope.userId,
envelopeIds: new Set<string>(),
};
ownerBucket.envelopeIds.add(envelope.id);
dueEnvelopesByOwner.set(ownerKey, ownerBucket);
}

// Cap matches the take: 1000 query limit — remainder picked up in next cron run.
if (dueRecipients.length >= 1000) {
break;
}
}

if (dueRecipients.length >= 1000) {
break;
}
}

if (dueRecipients.length === 0) {
io.logger.info('No recipients due for reminders');
return;
}

io.logger.info(`Found ${dueRecipients.length} recipients due for reminders`);

const recipientResults = await Promise.allSettled(
dueRecipients.map(async (r) => {
await jobs.triggerJob({
name: 'send.recipient.reminder.email',
payload: { recipientId: r.id, envelopeId: r.envelopeId },
});
}),
);

for (const result of recipientResults) {
if (result.status === 'rejected') {
io.logger.error('Failed to trigger recipient reminder', { reason: result.reason });
}
}

const digestResults = await Promise.allSettled(
Array.from(dueEnvelopesByOwner.values()).map(async ({ teamId, userId, envelopeIds }) => {
await jobs.triggerJob({
name: 'send.owner.reminder.digest.email',
payload: { teamId, userId, envelopeIds: Array.from(envelopeIds) },
});
}),
);

for (const result of digestResults) {
if (result.status === 'rejected') {
io.logger.error('Failed to trigger owner digest', { reason: result.reason });
}
}
};
Comment on lines 10 to 122
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: All handler steps must be wrapped in io.runTask() for idempotency.

The handler performs multiple steps (database query, fan-out triggers) without io.runTask() wrappers. If the job crashes mid-execution and retries, it will re-query and re-trigger all reminder emails, causing duplicate notifications.

🔒 Proposed fix to wrap steps in io.runTask()
 export const run = async ({ io }: { payload: TSendRemindersSweepJobDefinition; io: JobRunIO }) => {
   const now = DateTime.now();
 
-  const envelopes = await prisma.envelope.findMany({
+  const envelopes = await io.runTask('fetch-pending-envelopes', async () => {
+    return prisma.envelope.findMany({
       where: {
         status: DocumentStatus.PENDING,
         documentMeta: {
           reminderEnabled: true,
           reminderIntervalDays: { not: null },
         },
       },
       include: {
         documentMeta: true,
         recipients: {
           where: {
             signingStatus: { notIn: [SigningStatus.SIGNED, SigningStatus.REJECTED] },
             documentDeletedAt: null,
           },
         },
         reminderLogs: {
           orderBy: { createdAt: 'desc' as const },
         },
       },
       take: 1000,
     });
+  });

   // ... processing logic remains the same ...

-  const recipientResults = await Promise.allSettled(
+  const recipientResults = await io.runTask('fan-out-recipient-reminders', async () => {
+    return Promise.allSettled(
       dueRecipients.map(async (r) => {
         await jobs.triggerJob({
           name: 'send.recipient.reminder.email',
           payload: { recipientId: r.id, envelopeId: r.envelopeId },
         });
       }),
     );
+  });

   // ... failure logging ...

-  const digestResults = await Promise.allSettled(
+  const digestResults = await io.runTask('fan-out-owner-digests', async () => {
+    return Promise.allSettled(
       Array.from(dueEnvelopesByTeam.entries()).map(async ([teamId, envelopeIds]) => {
         await jobs.triggerJob({
           name: 'send.owner.reminder.digest.email',
           payload: { teamId, envelopeIds: Array.from(envelopeIds) },
         });
       }),
     );
+  });

Based on learnings: "All job handler steps must be wrapped in io.runTask('unique-key', fn) for idempotency".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export const run = async ({ io }: { payload: TSendRemindersSweepJobDefinition; io: JobRunIO }) => {
// TODO(Person 1): Implement sweep logic.
//
// Steps:
// 1. Query pending envelopes with reminderEnabled=true and a set reminderIntervalDays.
// Filter to envelopes where the document is still PENDING.
//
// const envelopes = await prisma.envelope.findMany({
// where: {
// status: DocumentStatus.PENDING,
// documentMeta: {
// reminderEnabled: true,
// reminderIntervalDays: { not: null },
// },
// },
// include: {
// documentMeta: true,
// recipients: {
// where: {
// signingStatus: { notIn: [SigningStatus.SIGNED, SigningStatus.REJECTED] },
// documentDeletedAt: null,
// },
// },
// reminderLogs: {
// orderBy: { createdAt: 'desc' },
// },
// },
// take: 1000,
// });
//
// 2. For each envelope and each unsigned recipient, determine if a reminder is due.
// A reminder is due when:
// (now - lastReminderSentAt) >= reminderIntervalDays
// where lastReminderSentAt is the most recent DocumentReminderLog.createdAt for
// that recipient, or the envelope sentAt (createdAt) if no log exists yet.
//
// Use DateTime from 'luxon' for date arithmetic.
//
// 3. Collect recipients due for a reminder. Cap at 1000 total across all envelopes.
//
// 4. Fan out per-recipient email jobs:
// await Promise.allSettled(
// dueRecipients.map((r) =>
// jobs.triggerJob({
// name: 'send.recipient.reminder.email',
// payload: { recipientId: r.id, envelopeId: r.envelopeId },
// }),
// ),
// );
//
// 5. Group due envelopes by teamId. For each team with at least one due envelope,
// trigger the digest job:
// await Promise.allSettled(
// Object.entries(byTeam).map(([teamId, envIds]) =>
// jobs.triggerJob({
// name: 'send.owner.reminder.digest.email',
// payload: { teamId: Number(teamId), envelopeIds: envIds },
// }),
// ),
// );
// Reference implementations:
// expire-recipients-sweep.handler.ts — sweep query pattern + Promise.allSettled fan-out
// process-recipient-expired.handler.ts — io.runTask idempotency pattern
void DocumentStatus;
void SigningStatus;
void DateTime;
void prisma;
void jobs;
io.logger.info('send-reminders-sweep: not yet implemented');
const now = DateTime.now();
const envelopes = await prisma.envelope.findMany({
where: {
status: DocumentStatus.PENDING,
documentMeta: {
reminderEnabled: true,
reminderIntervalDays: { not: null },
},
},
include: {
documentMeta: true,
recipients: {
where: {
signingStatus: { notIn: [SigningStatus.SIGNED, SigningStatus.REJECTED] },
documentDeletedAt: null,
},
},
reminderLogs: {
orderBy: { createdAt: 'desc' as const },
},
},
take: 1000,
});
if (envelopes.length === 0) {
io.logger.info('No envelopes with reminders enabled');
return;
}
const dueRecipients: Array<{ id: number; envelopeId: string }> = [];
const dueEnvelopesByTeam = new Map<number, Set<string>>();
for (const envelope of envelopes) {
const intervalDays = envelope.documentMeta?.reminderIntervalDays;
if (!intervalDays) {
continue;
}
for (const recipient of envelope.recipients) {
const lastLog = envelope.reminderLogs.find((log) => log.recipientId === recipient.id);
const lastSentAt = lastLog
? DateTime.fromJSDate(lastLog.createdAt)
: DateTime.fromJSDate(envelope.createdAt);
const daysSinceLast = now.diff(lastSentAt, 'days').days;
if (daysSinceLast >= intervalDays) {
dueRecipients.push({ id: recipient.id, envelopeId: envelope.id });
const teamEnvelopes = dueEnvelopesByTeam.get(envelope.teamId) ?? new Set<string>();
teamEnvelopes.add(envelope.id);
dueEnvelopesByTeam.set(envelope.teamId, teamEnvelopes);
}
// Cap matches the take: 1000 query limit — remainder picked up in next cron run.
if (dueRecipients.length >= 1000) {
break;
}
}
if (dueRecipients.length >= 1000) {
break;
}
}
if (dueRecipients.length === 0) {
io.logger.info('No recipients due for reminders');
return;
}
io.logger.info(`Found ${dueRecipients.length} recipients due for reminders`);
const recipientResults = await Promise.allSettled(
dueRecipients.map(async (r) => {
await jobs.triggerJob({
name: 'send.recipient.reminder.email',
payload: { recipientId: r.id, envelopeId: r.envelopeId },
});
}),
);
for (const result of recipientResults) {
if (result.status === 'rejected') {
io.logger.error('Failed to trigger recipient reminder', { reason: result.reason });
}
}
const digestResults = await Promise.allSettled(
Array.from(dueEnvelopesByTeam.entries()).map(async ([teamId, envelopeIds]) => {
await jobs.triggerJob({
name: 'send.owner.reminder.digest.email',
payload: { teamId, envelopeIds: Array.from(envelopeIds) },
});
}),
);
for (const result of digestResults) {
if (result.status === 'rejected') {
io.logger.error('Failed to trigger owner digest', { reason: result.reason });
}
}
};
export const run = async ({ io }: { payload: TSendRemindersSweepJobDefinition; io: JobRunIO }) => {
const now = DateTime.now();
const envelopes = await io.runTask('fetch-pending-envelopes', async () => {
return prisma.envelope.findMany({
where: {
status: DocumentStatus.PENDING,
documentMeta: {
reminderEnabled: true,
reminderIntervalDays: { not: null },
},
},
include: {
documentMeta: true,
recipients: {
where: {
signingStatus: { notIn: [SigningStatus.SIGNED, SigningStatus.REJECTED] },
documentDeletedAt: null,
},
},
reminderLogs: {
orderBy: { createdAt: 'desc' as const },
},
},
take: 1000,
});
});
if (envelopes.length === 0) {
io.logger.info('No envelopes with reminders enabled');
return;
}
const dueRecipients: Array<{ id: number; envelopeId: string }> = [];
const dueEnvelopesByTeam = new Map<number, Set<string>>();
for (const envelope of envelopes) {
const intervalDays = envelope.documentMeta?.reminderIntervalDays;
if (!intervalDays) {
continue;
}
for (const recipient of envelope.recipients) {
const lastLog = envelope.reminderLogs.find((log) => log.recipientId === recipient.id);
const lastSentAt = lastLog
? DateTime.fromJSDate(lastLog.createdAt)
: DateTime.fromJSDate(envelope.createdAt);
const daysSinceLast = now.diff(lastSentAt, 'days').days;
if (daysSinceLast >= intervalDays) {
dueRecipients.push({ id: recipient.id, envelopeId: envelope.id });
const teamEnvelopes = dueEnvelopesByTeam.get(envelope.teamId) ?? new Set<string>();
teamEnvelopes.add(envelope.id);
dueEnvelopesByTeam.set(envelope.teamId, teamEnvelopes);
}
// Cap matches the take: 1000 query limit — remainder picked up in next cron run.
if (dueRecipients.length >= 1000) {
break;
}
}
if (dueRecipients.length >= 1000) {
break;
}
}
if (dueRecipients.length === 0) {
io.logger.info('No recipients due for reminders');
return;
}
io.logger.info(`Found ${dueRecipients.length} recipients due for reminders`);
const recipientResults = await io.runTask('fan-out-recipient-reminders', async () => {
return Promise.allSettled(
dueRecipients.map(async (r) => {
await jobs.triggerJob({
name: 'send.recipient.reminder.email',
payload: { recipientId: r.id, envelopeId: r.envelopeId },
});
}),
);
});
for (const result of recipientResults) {
if (result.status === 'rejected') {
io.logger.error('Failed to trigger recipient reminder', { reason: result.reason });
}
}
const digestResults = await io.runTask('fan-out-owner-digests', async () => {
return Promise.allSettled(
Array.from(dueEnvelopesByTeam.entries()).map(async ([teamId, envelopeIds]) => {
await jobs.triggerJob({
name: 'send.owner.reminder.digest.email',
payload: { teamId, envelopeIds: Array.from(envelopeIds) },
});
}),
);
});
for (const result of digestResults) {
if (result.status === 'rejected') {
io.logger.error('Failed to trigger owner digest', { reason: result.reason });
}
}
};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/lib/jobs/definitions/internal/send-reminders-sweep.handler.ts`
around lines 10 - 114, The handler function run currently performs the DB query
(prisma.envelope.findMany), computation of dueRecipients/dueEnvelopesByTeam, and
fan-out triggers (jobs.triggerJob) without idempotency guards; wrap each logical
step in io.runTask('unique-key', async () => { ... }) using distinct keys (e.g.,
'fetch-envelopes', 'compute-due-recipients:<jobIdOrTimestamp>',
'trigger-recipient-reminders:<jobIdOrTimestamp>',
'trigger-owner-digests:<jobIdOrTimestamp>') so the prisma.envelope.findMany
call, the loop that builds dueRecipients/dueEnvelopesByTeam, and the
Promise.allSettled trigger blocks are each executed inside io.runTask to ensure
retries are idempotent and avoid duplicate emails.

Loading
Loading