Queue
Background jobs are one of the first things teams reach for a separate service to handle. Redis, RabbitMQ, SQS — all of them require new infrastructure before you’ve shipped anything. Postgres has been capable of reliable job queues for years. Most teams just don’t know it.
At-least-once background job processing using PostgreSQL’s SKIP LOCKED. Includes retries, exponential backoff, priority ordering, and a dead letter queue.
Install
Section titled “Install”npm install @pgshift/queueimport { createClient } from '@pgshift/queue'
const db = createClient({ url: process.env.DATABASE_URL })db.queue(name).setup()
Section titled “db.queue(name).setup()”Creates the queue table and indexes. Idempotent, safe to call on every startup.
await db.queue('emails').setup()db.queue(name).push(payload, options?)
Section titled “db.queue(name).push(payload, options?)”Inserts a job into the queue. Returns the job ID.
const jobId = await db.queue('emails').push( { to: 'user@example.com', subject: 'Welcome' }, { priority: 1, retries: 3, delay: 5000, },)| Option | Type | Default | Description |
|---|---|---|---|
priority | number | 0 | Higher number = processed first |
retries | number | 3 | Max retry attempts before dead letter |
delay | number | 0 | Milliseconds before job becomes visible |
db.queue(name).process(handler)
Section titled “db.queue(name).process(handler)”Starts a polling worker. The handler runs for each job.
await db.queue('emails').process(async (job) => { await sendEmail(job.payload)})The job object contains:
interface QueueJob<T> { id: string payload: T status: 'processing' attempts: number maxRetries: number priority: number runAt: Date createdAt: Date}If the handler throws, the job is retried with exponential backoff. After exhausting all retries, the job moves to failed status.
db.queue(name).cancel(jobId)
Section titled “db.queue(name).cancel(jobId)”Cancels a pending job. Has no effect on jobs that are already processing.
await db.queue('emails').cancel(jobId)db.queue(name).stats()
Section titled “db.queue(name).stats()”Returns counts per status for the queue.
const stats = await db.queue('emails').stats()// { pending: 10, processing: 2, done: 847, failed: 1 }Retry behavior
Section titled “Retry behavior”Failed jobs are retried with exponential backoff:
| Attempt | Backoff |
|---|---|
| 1 | 2 seconds |
| 2 | 4 seconds |
| 3 | 8 seconds |
| N | min(2^N seconds, 30 seconds) |
After all retry attempts are exhausted, the job moves to failed status and is preserved in the queue table for inspection.
Dead letter queue
Section titled “Dead letter queue”Failed jobs are not deleted. Query them directly for debugging:
import { Pool } from 'pg'
const pool = new Pool({ connectionString: process.env.DATABASE_URL })
const { rows } = await pool.query(` SELECT id, payload, attempts, error, failed_at FROM _pgshift_queue_emails WHERE status = 'failed' ORDER BY failed_at DESC`)Graceful shutdown
Section titled “Graceful shutdown”Call db.destroy() on process exit. The worker waits for all in-flight jobs to complete before shutting down.
process.on('SIGTERM', async () => { await db.destroy() process.exit(0)})