Bloqueo Distribuido (Locker)
El Locker permite adquirir locks distribuidos para garantizar que solo una instancia de tu aplicación ejecute una sección crítica a la vez. Usa PostgreSQL advisory locks, por lo que funciona automáticamente si ya tienes PostgreSQL configurado.
Conceptos clave
Locker/PgLocker: servicio singleton para crear locks.withKey(key): selecciona un lock por clave (string o número).run(fn): adquiere el lock, ejecuta la función y lo libera. Si el lock está ocupado, espera hasta que se libere.tryRun(fn): intenta adquirir el lock sin esperar. Si está ocupado, retornaundefinedinmediatamente.
Uso básico
Lock bloqueante (run)
Espera a que el lock se libere antes de ejecutar:
import { PgLocker, injectable } from '@wabot-dev/framework'
@injectable()export class OrderService { constructor(private locker: PgLocker) {}
async processOrder(orderId: string) { const result = await this.locker.withKey(`order-${orderId}`).run(async () => { // Solo una instancia ejecuta este código a la vez para este orderId const order = await this.findOrder(orderId) order.markAsProcessed() await this.saveOrder(order) return order })
return result }}Lock no bloqueante (tryRun)
Retorna undefined si el lock está ocupado, sin esperar:
@injectable()export class SchedulerService { constructor(private locker: PgLocker) {}
async runSchedulerLoop() { const result = await this.locker.withKey('scheduler-loop').tryRun(async () => { // Solo se ejecuta si ninguna otra instancia tiene el lock await this.processQueue() return 'completed' })
if (result === undefined) { console.log('Otra instancia ya está ejecutando el scheduler') } }}Casos de uso
Prevenir procesamiento duplicado
Cuando múltiples instancias de tu app pueden recibir el mismo evento:
@injectable()export class PaymentWebhookHandler { constructor(private locker: PgLocker) {}
async handlePaymentConfirmation(paymentId: string) { await this.locker.withKey(`payment-${paymentId}`).run(async () => { const payment = await this.findPayment(paymentId)
// Sin el lock, dos webhooks simultáneos podrían procesar el mismo pago if (payment.isProcessed) return
await this.creditAccount(payment) payment.markAsProcessed() await this.savePayment(payment) }) }}Acceso exclusivo a un recurso
Cuando solo un proceso debe modificar un recurso a la vez:
@injectable()export class InventoryService { constructor(private locker: PgLocker) {}
async reserveStock(productId: string, quantity: number) { return await this.locker.withKey(`stock-${productId}`).run(async () => { const stock = await this.getStock(productId)
if (stock.available < quantity) { throw new Error('Stock insuficiente') }
stock.available -= quantity stock.reserved += quantity await this.saveStock(stock)
return { reserved: quantity, remaining: stock.available } }) }}Tareas de background únicas
Garantizar que solo una instancia ejecute una tarea periódica:
@injectable()export class CleanupService { constructor(private locker: PgLocker) {}
async runCleanup() { const result = await this.locker.withKey('daily-cleanup').tryRun(async () => { await this.deleteExpiredSessions() await this.archiveOldLogs() return 'cleanup completed' })
if (result === undefined) { // Otra instancia ya está haciendo la limpieza — no hacer nada } }}Ejemplo completo
Sistema de procesamiento de pedidos con protección contra duplicados:
import { PgLocker, singleton, Logger } from '@wabot-dev/framework'
@singleton()export class OrderProcessor { private logger = new Logger('wabot:order-processor')
constructor( private locker: PgLocker, private orderRepository: OrderRepository, private paymentService: PaymentService, private inventoryService: InventoryService, ) {}
async processOrder(orderId: string) { this.logger.info('Intentando procesar pedido', { orderId })
return await this.locker.withKey(`order-${orderId}`).run(async () => { const order = await this.orderRepository.findById(orderId)
if (!order) { this.logger.warn('Pedido no encontrado', { orderId }) throw new Error('Pedido no encontrado') }
if (order.isProcessed) { this.logger.debug('Pedido ya fue procesado', { orderId }) return { status: 'already_processed' } }
// Reservar stock this.logger.debug('Reservando stock', { orderId }) for (const item of order.items) { await this.inventoryService.reserveStock(item.productId, item.quantity) }
// Cobrar this.logger.debug('Procesando pago', { orderId, total: order.total }) const payment = await this.paymentService.charge(order.total, 'USD')
if (!payment.success) { this.logger.error('Pago fallido', new Error(payment.error), { orderId }) // Liberar stock reservado for (const item of order.items) { await this.inventoryService.releaseStock(item.productId, item.quantity) } return { status: 'payment_failed' } }
// Confirmar pedido order.markAsProcessed(payment.transactionId) await this.orderRepository.update(order)
this.logger.info('Pedido procesado exitosamente', { orderId, transactionId: payment.transactionId, })
return { status: 'success', transactionId: payment.transactionId } }) }}