Jobs

Pidgey Jobs

Everything you need to know about defining, enqueueing, and managing jobs with Pidgey.

Defining Jobs

Use defineJob to create a type-safe job:

jobs/send-email.ts
import { pidgey } from '@/lib/pidgey';
 
export const sendEmail = pidgey.defineJob({
  name: 'send-email',
  handler: async (data: { to: string; subject: string }) => {
    await emailService.send(data);
    return { sent: true };
  },
  config: {
    retries: 3,
    timeout: 30000,
  },
});

Config options:

OptionDefaultDescription
retries3Max retry attempts
timeout300000Timeout in ms (5 min)
queuejob nameQueue name

Enqueueing Jobs

Basic enqueue

await sendEmail.enqueue({
  to: 'leslie.knope@pawnee.gov',
  subject: 'Welcome!',
});

Delayed jobs

Schedule a job to run later:

// Run in 1 hour
await sendEmail.enqueue(data, { delay: 3600000 });
 
// Or use a helper
const ONE_HOUR = 60 * 60 * 1000;
await sendEmail.enqueue(data, { delay: ONE_HOUR });

Override config at enqueue time

await sendEmail.enqueue(data, {
  maxAttempts: 5, // Override retries
  timeout: 60000, // Override timeout
});

Idempotency

To prevent duplicate jobs (common with webhooks and retries), you can pass an idempotencyKey when enqueuing:

await processOrder.enqueue({ orderId: 123 }, { idempotencyKey: `order-${orderId}` });

If a job with the same key already exists, Pidgey will return the existing job instead of creating a new one.

Idempotency keys are scoped per queue. The same key in different queues creates separate jobs.

Retries & Error Handling

Jobs automatically retry on failure with exponential backoff:

export const unreliableJob = pidgey.defineJob({
  name: 'call-api',
  handler: async (data) => {
    const response = await fetch(data.url);
    if (!response.ok) throw new Error('API failed'); // Triggers retry
    return response.json();
  },
  config: { retries: 5 },
});

Backoff schedule: 1s → 2s → 4s → 8s → 16s

Manual retry

// Retry a specific failed job
await pidgey.retryJob('job_abc123');
 
// Retry all failed jobs
await pidgey.retryAllFailed();
 
// Retry failed jobs in a specific queue
await pidgey.retryAllFailed('emails');

Dead Letter Queue

Jobs that exhaust all retries move to the DLQ for debugging:

// List failed jobs
const failed = await pidgey.listJobs({ status: 'failed' });
 
// Inspect errors
for (const job of failed) {
  console.log(job.id, job.error, job.attempts);
}

CLI access

# List failed jobs
npx pidgey jobs list --status failed
 
# Retry all failed jobs
npx pidgey jobs retry --failed
 
# Delete failed jobs
npx pidgey jobs delete --status failed

Multiple Queues

Separate jobs for different priorities:

export const urgentEmail = pidgey.defineJob({
  name: 'urgent-email',
  handler: async (data) => {
    /* ... */
  },
  config: { queue: 'urgent' },
});
 
export const bulkEmail = pidgey.defineJob({
  name: 'bulk-email',
  handler: async (data) => {
    /* ... */
  },
  config: { queue: 'bulk' },
});

Run workers for specific queues:

# Process urgent queue with high concurrency
npx pidgey worker start --queue urgent --concurrency 20
 
# Process bulk queue with low concurrency
npx pidgey worker start --queue bulk --concurrency 2

Job Status

Track the lifecycle of a job:

const job = await sendEmail.enqueue(data);
const status = await pidgey.getJob(job.id);
 
switch (status?.status) {
  case 'pending': // Waiting to run
  case 'active': // Currently running
  case 'completed': // Finished successfully
  case 'failed': // Exhausted retries
  case 'cancelled': // Manually cancelled
}

Cancelling Jobs

Cancel a pending job before it runs:

await pidgey.cancelJob('job_abc123');
⚠️

The Redis adapter does not support cancelling jobs. Use deleteJob instead.

Next Steps


Made with ❤️ in Chicago