Skip to content

Bloqueo Distribuido (Locker)

El Locker permite adquirir locks distribuidos para garantizar que solo una instancia de tu aplicación ejecute una sección crítica a la vez. Usa PostgreSQL advisory locks, por lo que funciona automáticamente si ya tienes PostgreSQL configurado.

Conceptos clave

  • Locker / PgLocker: servicio singleton para crear locks.
  • withKey(key): selecciona un lock por clave (string o número).
  • run(fn): adquiere el lock, ejecuta la función y lo libera. Si el lock está ocupado, espera hasta que se libere.
  • tryRun(fn): intenta adquirir el lock sin esperar. Si está ocupado, retorna undefined inmediatamente.

Uso básico

Lock bloqueante (run)

Espera a que el lock se libere antes de ejecutar:

import { PgLocker, injectable } from '@wabot-dev/framework'
@injectable()
export class OrderService {
constructor(private locker: PgLocker) {}
async processOrder(orderId: string) {
const result = await this.locker.withKey(`order-${orderId}`).run(async () => {
// Solo una instancia ejecuta este código a la vez para este orderId
const order = await this.findOrder(orderId)
order.markAsProcessed()
await this.saveOrder(order)
return order
})
return result
}
}

Lock no bloqueante (tryRun)

Retorna undefined si el lock está ocupado, sin esperar:

@injectable()
export class SchedulerService {
constructor(private locker: PgLocker) {}
async runSchedulerLoop() {
const result = await this.locker.withKey('scheduler-loop').tryRun(async () => {
// Solo se ejecuta si ninguna otra instancia tiene el lock
await this.processQueue()
return 'completed'
})
if (result === undefined) {
console.log('Otra instancia ya está ejecutando el scheduler')
}
}
}

Casos de uso

Prevenir procesamiento duplicado

Cuando múltiples instancias de tu app pueden recibir el mismo evento:

@injectable()
export class PaymentWebhookHandler {
constructor(private locker: PgLocker) {}
async handlePaymentConfirmation(paymentId: string) {
await this.locker.withKey(`payment-${paymentId}`).run(async () => {
const payment = await this.findPayment(paymentId)
// Sin el lock, dos webhooks simultáneos podrían procesar el mismo pago
if (payment.isProcessed) return
await this.creditAccount(payment)
payment.markAsProcessed()
await this.savePayment(payment)
})
}
}

Acceso exclusivo a un recurso

Cuando solo un proceso debe modificar un recurso a la vez:

@injectable()
export class InventoryService {
constructor(private locker: PgLocker) {}
async reserveStock(productId: string, quantity: number) {
return await this.locker.withKey(`stock-${productId}`).run(async () => {
const stock = await this.getStock(productId)
if (stock.available < quantity) {
throw new Error('Stock insuficiente')
}
stock.available -= quantity
stock.reserved += quantity
await this.saveStock(stock)
return { reserved: quantity, remaining: stock.available }
})
}
}

Tareas de background únicas

Garantizar que solo una instancia ejecute una tarea periódica:

@injectable()
export class CleanupService {
constructor(private locker: PgLocker) {}
async runCleanup() {
const result = await this.locker.withKey('daily-cleanup').tryRun(async () => {
await this.deleteExpiredSessions()
await this.archiveOldLogs()
return 'cleanup completed'
})
if (result === undefined) {
// Otra instancia ya está haciendo la limpieza — no hacer nada
}
}
}

Ejemplo completo

Sistema de procesamiento de pedidos con protección contra duplicados:

import { PgLocker, singleton, Logger } from '@wabot-dev/framework'
@singleton()
export class OrderProcessor {
private logger = new Logger('wabot:order-processor')
constructor(
private locker: PgLocker,
private orderRepository: OrderRepository,
private paymentService: PaymentService,
private inventoryService: InventoryService,
) {}
async processOrder(orderId: string) {
this.logger.info('Intentando procesar pedido', { orderId })
return await this.locker.withKey(`order-${orderId}`).run(async () => {
const order = await this.orderRepository.findById(orderId)
if (!order) {
this.logger.warn('Pedido no encontrado', { orderId })
throw new Error('Pedido no encontrado')
}
if (order.isProcessed) {
this.logger.debug('Pedido ya fue procesado', { orderId })
return { status: 'already_processed' }
}
// Reservar stock
this.logger.debug('Reservando stock', { orderId })
for (const item of order.items) {
await this.inventoryService.reserveStock(item.productId, item.quantity)
}
// Cobrar
this.logger.debug('Procesando pago', { orderId, total: order.total })
const payment = await this.paymentService.charge(order.total, 'USD')
if (!payment.success) {
this.logger.error('Pago fallido', new Error(payment.error), { orderId })
// Liberar stock reservado
for (const item of order.items) {
await this.inventoryService.releaseStock(item.productId, item.quantity)
}
return { status: 'payment_failed' }
}
// Confirmar pedido
order.markAsProcessed(payment.transactionId)
await this.orderRepository.update(order)
this.logger.info('Pedido procesado exitosamente', {
orderId,
transactionId: payment.transactionId,
})
return { status: 'success', transactionId: payment.transactionId }
})
}
}