Skip to content

Workflow

Most teams manage multi-step processes with sequential await chains. That works until one step fails mid-way, the process restarts, the server crashes, or two workers try to process the same job simultaneously. Then you need retries, idempotency, state persistence, and compensation logic — all bolted on after the fact.

@pgshift/workflow gives you durable workflow orchestration backed by PostgreSQL. Define your steps and their dependencies as a DAG. The engine handles execution order, parallelism, retries, and saga-pattern compensation when things go wrong.

Terminal window
npm install @pgshift/workflow
import { createClient } from '@pgshift/workflow'
const db = createClient({ url: process.env.DATABASE_URL })

DAG — a directed acyclic graph where each step declares its dependencies. Steps with no dependencies run immediately and in parallel. Steps with dependencies wait until all their dependencies are completed.

Compensation — each step can declare a compensate handler. If a step fails permanently after exhausting retries, PgShift runs the compensation handlers of all completed steps in reverse order, leaving the system in a consistent state.

At-least-once execution — steps are dispatched via SKIP LOCKED. A step may be retried if a worker crashes mid-execution. Make your handlers idempotent.

Registers the workflow definition. Idempotent, safe to call on every startup.

await db.workflow('order-fulfillment').define({
steps: {
validate_stock: { handler: 'validateStock', retries: 3 },
validate_fraud: { handler: 'validateFraud', retries: 3 },
charge_card: { handler: 'chargeCard', retries: 1, compensate: 'refundCard' },
emit_invoice: { handler: 'emitInvoice', retries: 3, compensate: 'voidInvoice' },
send_email: { handler: 'sendEmail', retries: 5 },
update_analytics: { handler: 'updateAnalytics', retries: 5 },
},
dag: {
validate_stock: [], // starts immediately
validate_fraud: [], // starts immediately (parallel)
charge_card: ['validate_stock', 'validate_fraud'], // waits for both
emit_invoice: ['charge_card'],
send_email: ['emit_invoice'], // parallel with update_analytics
update_analytics: ['emit_invoice'], // parallel with send_email
},
})

Step config:

OptionTypeDefaultDescription
handlerstringrequiredName of the handler function registered via .handlers()
retriesnumber3Max retry attempts before the step fails permanently
compensatestringnoneName of the compensation handler to run on workflow failure

DAG config: each key is a step name, each value is a list of step names it depends on. Steps with an empty list start immediately.

Registers the handler functions for a workflow. Must be called before .work().

await db.workflow('order-fulfillment').handlers({
validateStock: async (ctx) => {
const { items } = ctx.input as { items: string[] }
// returns value available to subsequent steps via ctx.previousSteps
return { reservationId: 'res-123' }
},
chargeCard: async (ctx) => {
const { reservationId } = ctx.previousSteps['validate_stock'] as { reservationId: string }
return { chargeId: 'ch-456', amount: ctx.input.amount }
},
refundCard: async (ctx) => {
// compensation — undo chargeCard
const { chargeId } = ctx.previousSteps['charge_card'] as { chargeId: string }
await refund(chargeId)
},
// ... other handlers
})

The ctx object:

FieldTypeDescription
ctx.runIdstringCurrent run ID
ctx.stepstringCurrent step name
ctx.inputobjectPayload passed to run()
ctx.attemptnumberCurrent attempt number (1-based)
ctx.previousStepsobjectOutput of completed steps, keyed by step name

Starts the polling worker. The worker polls the database for active runs and advances them.

await db.workflow('order-fulfillment').work()

Call this once on startup, after define() and handlers().

Creates a new workflow run and returns the run ID.

const runId = await db.workflow('order-fulfillment').run({
orderId: 'order-123',
amount: 299.99,
items: ['ITEM_A', 'ITEM_B'],
})

Returns the current status of a run and all its steps.

const status = await db.workflow('order-fulfillment').status(runId)
interface WorkflowRunStatus {
runId: string
workflow: string
status: 'running' | 'completed' | 'failed' | 'compensating' | 'compensated'
input: Record<string, unknown>
startedAt: Date
finishedAt?: Date
steps: Record<string, {
status: string
attempts: number
output?: Record<string, unknown>
error?: string
startedAt?: Date
completedAt?: Date
}>
}
Run starts
├─ validate_stock (pending → running → completed) ─┐
│ ├─ charge_card (pending → running → completed)
└─ validate_fraud (pending → running → completed) ─┘
emit_invoice
┌───────────────────┴────────────────────┐
send_email update_analytics
│ │
completed completed
run: completed

When a step exhausts its retries, the run enters compensating status. PgShift runs the compensation handlers of completed steps in reverse execution order.

charge_card completed ✓ (has compensate: refundCard)
emit_invoice failed ✗ (exhausted 3 retries)
Compensation order:
1. voidInvoice ← emit_invoice's compensate (failed, so no-op)
2. refundCard ← charge_card's compensate

Only steps that are completed and have a compensate handler defined are compensated. Steps without compensate are skipped in the compensation chain.

Failed steps are retried with exponential backoff:

AttemptBackoff
11 second
22 seconds
34 seconds
Nmin(2^N seconds, 30 seconds)

After all retry attempts are exhausted, the step moves to failed and compensation begins.

import { createClient } from '@pgshift/workflow'
const db = createClient({ url: process.env.DATABASE_URL })
await db.workflow('order-fulfillment').define({
steps: {
validate_stock: { handler: 'validateStock', retries: 3 },
validate_fraud: { handler: 'validateFraud', retries: 3 },
charge_card: { handler: 'chargeCard', retries: 1, compensate: 'refundCard' },
emit_invoice: { handler: 'emitInvoice', retries: 3, compensate: 'voidInvoice' },
send_email: { handler: 'sendEmail', retries: 5 },
update_analytics: { handler: 'updateAnalytics', retries: 5 },
},
dag: {
validate_stock: [],
validate_fraud: [],
charge_card: ['validate_stock', 'validate_fraud'],
emit_invoice: ['charge_card'],
send_email: ['emit_invoice'],
update_analytics: ['emit_invoice'],
},
})
await db.workflow('order-fulfillment').handlers({
validateStock: async (ctx) => { /* check inventory */ return { available: true } },
validateFraud: async (ctx) => { /* run fraud check */ return { approved: true } },
chargeCard: async (ctx) => { /* charge payment */ return { chargeId: 'ch_123' } },
refundCard: async (ctx) => { /* refund payment */ },
emitInvoice: async (ctx) => { /* create invoice */ return { invoiceId: 'inv_456' } },
voidInvoice: async (ctx) => { /* void invoice */ },
sendEmail: async (ctx) => { /* send confirmation */ },
updateAnalytics: async (ctx) => { /* record event */ },
})
await db.workflow('order-fulfillment').work()
const runId = await db.workflow('order-fulfillment').run({
orderId: 'order-123',
amount: 299.99,
})
process.on('SIGTERM', () => db.destroy())
TablePurpose
_pgshift_workflow_definitionsStores workflow step config and DAG
_pgshift_workflow_runsOne row per workflow execution
_pgshift_workflow_stepsOne row per step per run — the unit of work dispatched via SKIP LOCKED