Search
Full-text search with typo tolerance via tsvector and pg_trgm. No Elasticsearch cluster required.
Most teams reach for Redis, Kafka, and Elasticsearch before they actually need them. The result is fragile distributed infrastructure that costs more to operate than the product earns.
PostgreSQL already handles full-text search, job queues, pub/sub messaging, query result caching, and vector search natively. PgShift gives you a clean, consistent API on top of those capabilities, and tells you exactly when it is time to move on.
Search
Full-text search with typo tolerance via tsvector and pg_trgm. No Elasticsearch cluster required.
Cache
Pre-compute expensive queries via materialized views. Instant reads, non-blocking refresh.
Queue
At-least-once job processing via SKIP LOCKED. Retries, dead letter queue, and priority built in.
More modules available
PgShift also includes realtime messaging, pub/sub, and other Postgres-powered infrastructure primitives. Explore the documentation to discover all available modules and migration strategies.
npm install @pgshift/searchimport { createClient } from '@pgshift/search'
const db = createClient({ url: process.env.DATABASE_URL })
await db.search('products').index({ fields: ['name', 'description'], weights: { name: 'A', description: 'B' }, fuzzy: true,})
await db.search('products').upsert('1', { name: 'Nike Air Max 90', description: 'Classic sneaker.', category: 'shoes',})
const results = await db.search('products').query('air maxx', { fuzzy: true, filters: { category: 'shoes' },})npm install @pgshift/stateimport { createClient, normalizers } from '@pgshift/state'
const db = createClient({ url: process.env.DATABASE_URL })
await db.state('loans') .define({ field: 'status', states: ['pending', 'approved', 'rejected'], transitions: { pending: ['approved', 'rejected'], approved: [], rejected: [] }, initial: 'pending', }) .normalize({ amount: 'ABS({value})' }) .audit({ track: ['status', 'amount'] }) .consensus({ transition: 'approved', require: 2, roles: ['finance', 'manager'], when: 'NEW.amount > 10000000', })npm install @pgshift/vectorimport { createClient } from '@pgshift/vector'
const db = createClient({ url: process.env.DATABASE_URL })
await db.vector('documents').index({ dimensions: 1536, metric: 'cosine',})
await db.vector('documents').upsert('1', { embedding: await embed('Getting started with PgShift'), data: { title: 'Getting started', userId: '123' },})
// Hybrid search — vector similarity + relational filter in one queryconst results = await db.vector('documents').query({ embedding: await embed('how to install pgshift'), topK: 5, filters: { userId: '123' }, minScore: 0.7,})npm install @pgshift/workflowimport { createClient } from '@pgshift/workflow'
const db = createClient({ url: process.env.DATABASE_URL })
await db.workflow('order-fulfillment').define({ steps: { validate_stock: { handler: 'validateStock', retries: 3 }, validate_fraud: { handler: 'validateFraud', retries: 3 }, charge_card: { handler: 'chargeCard', retries: 1, compensate: 'refundCard' }, emit_invoice: { handler: 'emitInvoice', retries: 3, compensate: 'voidInvoice' }, send_email: { handler: 'sendEmail', retries: 5 }, update_analytics: { handler: 'updateAnalytics', retries: 5 }, }, dag: { validate_stock: [], validate_fraud: [], charge_card: ['validate_stock', 'validate_fraud'], emit_invoice: ['charge_card'], send_email: ['emit_invoice'], update_analytics: ['emit_invoice'], },})
await db.workflow('order-fulfillment').handlers({ validateStock: async (ctx) => { /* validate */ }, validateFraud: async (ctx) => { /* check fraud */ }, chargeCard: async (ctx) => { /* charge */ }, refundCard: async (ctx) => { /* compensation */ }, emitInvoice: async (ctx) => { /* invoice */ }, voidInvoice: async (ctx) => { /* compensation */ }, sendEmail: async (ctx) => { /* notify */ }, updateAnalytics: async (ctx) => { /* record */ },})
await db.workflow('order-fulfillment').work()
const runId = await db.workflow('order-fulfillment').run({ orderId: 'order-123', amount: 299.99,})npm install @pgshift/queueimport { createClient } from '@pgshift/queue'
const db = createClient({ url: process.env.DATABASE_URL })
await db.queue('emails').setup()
await db.queue('emails').push( { to: 'user@example.com', subject: 'Welcome' }, { priority: 1, retries: 3 },)
await db.queue('emails').process(async (job) => { await sendEmail(job.payload)})npm install @pgshift/cacheimport { createClient } from '@pgshift/cache'
const db = createClient({ url: process.env.DATABASE_URL })
await db.cache('top_products').register({ query: ` SELECT id AS _pgshift_id, name, SUM(amount) AS revenue FROM orders JOIN products USING (product_id) GROUP BY id, name ORDER BY revenue DESC LIMIT 10 `, refreshEvery: 60,})
const rows = await db.cache('top_products').get()npm install @pgshift/cron @pgshift/queueimport { createClient as createCronClient, schedule } from '@pgshift/cron'import { createClient as createQueueClient } from '@pgshift/queue'
const cron = createCronClient({ url: process.env.DATABASE_URL, queue: 'tasks' })const queue = createQueueClient({ url: process.env.DATABASE_URL })
await cron.cron.setup()await queue.queue('tasks').setup()
await cron.cron('weekly-digest').schedule( schedule.weekly({ day: 'monday', hour: 8 }), { payload: { type: 'weekly-digest' } },)
await queue.queue('tasks').process(async (job) => { const { type } = job.payload as { type: string } if (type === 'weekly-digest') await sendWeeklyDigest()})PgShift tracks latency for every operation. When a module consistently hits the limits of what Postgres can handle efficiently, it emits a migration hint, telling you exactly when it is time to move to Elasticsearch, Redis, Pinecone, or Kafka.
const db = createClient({ url: process.env.DATABASE_URL, onMigrationHint(hint) { console.warn(`Consider migrating ${hint.module} to ${hint.suggestedAdapter}.`) console.warn(hint.reason) },})When the time comes, the migration is infrastructure work, not application code. Your db.search().query(), db.cache().get(), db.queue().push(), and db.vector().query() calls stay exactly the same.