Skip to content

Queue

Background jobs are one of the first things teams reach for a separate service to handle. Redis, RabbitMQ, SQS — all of them require new infrastructure before you’ve shipped anything. Postgres has been capable of reliable job queues for years. Most teams just don’t know it.

At-least-once background job processing using PostgreSQL’s SKIP LOCKED. Includes retries, exponential backoff, priority ordering, and a dead letter queue.

Terminal window
npm install @pgshift/queue
import { createClient } from '@pgshift/queue'
const db = createClient({ url: process.env.DATABASE_URL })

Creates the queue table and indexes. Idempotent, safe to call on every startup.

await db.queue('emails').setup()

Inserts a job into the queue. Returns the job ID.

const jobId = await db.queue('emails').push(
{ to: 'user@example.com', subject: 'Welcome' },
{
priority: 1,
retries: 3,
delay: 5000,
},
)
OptionTypeDefaultDescription
prioritynumber0Higher number = processed first
retriesnumber3Max retry attempts before dead letter
delaynumber0Milliseconds before job becomes visible

Starts a polling worker. The handler runs for each job.

await db.queue('emails').process(async (job) => {
await sendEmail(job.payload)
})

The job object contains:

interface QueueJob<T> {
id: string
payload: T
status: 'processing'
attempts: number
maxRetries: number
priority: number
runAt: Date
createdAt: Date
}

If the handler throws, the job is retried with exponential backoff. After exhausting all retries, the job moves to failed status.

Cancels a pending job. Has no effect on jobs that are already processing.

await db.queue('emails').cancel(jobId)

Returns counts per status for the queue.

const stats = await db.queue('emails').stats()
// { pending: 10, processing: 2, done: 847, failed: 1 }

Failed jobs are retried with exponential backoff:

AttemptBackoff
12 seconds
24 seconds
38 seconds
Nmin(2^N seconds, 30 seconds)

After all retry attempts are exhausted, the job moves to failed status and is preserved in the queue table for inspection.

Failed jobs are not deleted. Query them directly for debugging:

import { Pool } from 'pg'
const pool = new Pool({ connectionString: process.env.DATABASE_URL })
const { rows } = await pool.query(`
SELECT id, payload, attempts, error, failed_at
FROM _pgshift_queue_emails
WHERE status = 'failed'
ORDER BY failed_at DESC
`)

Call db.destroy() on process exit. The worker waits for all in-flight jobs to complete before shutting down.

process.on('SIGTERM', async () => {
await db.destroy()
process.exit(0)
})