Skip to content

Cron

Recurring tasks are everywhere. Send weekly digests. Clean up expired records. Generate monthly reports. Most teams pay for an external scheduler or wire up their own cron infrastructure. If you already have Postgres and a queue, you have everything you need.

@pgshift/cron uses the pg_cron extension to schedule recurring jobs. When a job fires, PgShift inserts a payload into a @pgshift/queue table so your existing worker processes it. No extra process. No new service. The same Postgres you already run.

@pgshift/cron works together with @pgshift/queue. You need both installed and configured.

Terminal window
npm install @pgshift/cron @pgshift/queue
  1. Create the cron client

    import { createClient } from '@pgshift/cron'
    const cron = createClient({
    url: process.env.DATABASE_URL,
    queue: 'tasks', // default queue for all cron jobs
    })
  2. Enable pg_cron

    await cron.cron.setup()

    This runs CREATE EXTENSION IF NOT EXISTS pg_cron. Requires superuser or rds_superuser privileges. Safe to call on every startup.

  3. Create the queue

    import { createClient as createQueueClient } from '@pgshift/queue'
    const queue = createQueueClient({ url: process.env.DATABASE_URL })
    await queue.queue('tasks').setup()
  4. Schedule your jobs and start the worker

    import { schedule } from '@pgshift/cron'
    await cron.cron('cleanup').schedule(schedule.daily({ hour: 0 }), {
    payload: { type: 'cleanup-sessions' },
    })
    await queue.queue('tasks').process(async (job) => {
    const { type } = job.payload as { type: string }
    if (type === 'cleanup-sessions') await cleanupSessions()
    })

Ensures the pg_cron extension is installed. Idempotent, safe to call on every startup.

await cron.cron.setup()

Creates or replaces a cron job. When the job fires, a payload is inserted into the target queue.

await cron.cron('weekly-digest').schedule(schedule.weekly({ day: 'monday', hour: 8 }), {
payload: { type: 'weekly-digest' },
})

Use the queue option to send a specific job to a different queue:

await cron.cron('monthly-report').schedule(schedule.monthly({ day: 1, hour: 9 }), {
queue: 'reports',
payload: { type: 'monthly-report' },
})

Raw cron strings are also accepted:

await cron.cron('every-five-minutes').schedule('*/5 * * * *', {
payload: { type: 'poll' },
})
OptionTypeDescription
payloadobjectArbitrary JSON passed to the queue job
queuestringQueue to push into. Defaults to the client-level queue

Removes a scheduled job by name.

await cron.cron('weekly-digest').unschedule()

Returns all PgShift-managed cron jobs.

const jobs = await cron.cron.list()
interface CronJobInfo {
name: string
schedule: string
active: boolean
jobId: number
}

The schedule helper builds cron expressions from readable options.

import { schedule } from '@pgshift/cron'
schedule.every({ minutes: 5 }) // "*/5 * * * *"
schedule.every({ hours: 2 }) // "0 */2 * * *"
schedule.hourly({ minute: 30 }) // "30 * * * *"
schedule.daily({ hour: 8 }) // "0 8 * * *"
schedule.daily({ hour: 8, minute: 30 }) // "30 8 * * *"
schedule.weekly({ day: 'monday', hour: 9 }) // "0 9 * * 1"
schedule.monthly({ day: 1, hour: 0 }) // "0 0 1 * *"
OptionTypeDescription
minutesnumberRun every N minutes
hoursnumberRun every N hours
OptionTypeDefaultDescription
minutenumber0Minute of the hour to run
OptionTypeDefaultDescription
hournumber0Hour of the day in UTC
minutenumber0Minute of the hour
OptionTypeDefaultDescription
dayDayNamerequiredDay of the week ('monday', 'tuesday', etc.)
hournumber0Hour of the day in UTC
minutenumber0Minute of the hour
OptionTypeDefaultDescription
daynumberrequiredDay of the month (1 to 31)
hournumber0Hour of the day in UTC
minutenumber0Minute of the hour
import { createClient, schedule } from '@pgshift/cron'
import { createClient as createQueueClient } from '@pgshift/queue'
const DATABASE_URL = process.env.DATABASE_URL
const cron = createClient({ url: DATABASE_URL, queue: 'tasks' })
const queue = createQueueClient({ url: DATABASE_URL })
// Setup
await cron.cron.setup()
await queue.queue('tasks').setup()
// Schedule recurring jobs
await cron.cron('cleanup').schedule(schedule.daily({ hour: 0 }), {
payload: { type: 'cleanup-sessions' },
})
await cron.cron('weekly-digest').schedule(schedule.weekly({ day: 'monday', hour: 8 }), {
payload: { type: 'weekly-digest' },
})
await cron.cron('monthly-report').schedule(schedule.monthly({ day: 1, hour: 9 }), {
queue: 'reports',
payload: { type: 'monthly-report' },
})
// Worker processes jobs when they fire
await queue.queue('tasks').process(async (job) => {
const { type } = job.payload as { type: string }
switch (type) {
case 'cleanup-sessions':
await cleanupExpiredSessions()
break
case 'weekly-digest':
await sendWeeklyDigest()
break
}
})
// Graceful shutdown
process.on('SIGTERM', async () => {
await cron.destroy()
await queue.destroy()
process.exit(0)
})

pg_cron runs in UTC. Convert your intended local times before scheduling.