Introducción a CQRS y Event Sourcing - Diseño de Arquitectura Escalable

Avanzado | 2025.12.02

¿Qué es CQRS?

CQRS (Command Query Responsibility Segregation) es un patrón de arquitectura que separa la lectura (Query) y la escritura (Command) de datos.

Arquitectura CRUD Tradicional

flowchart TB
    subgraph CRUD["Arquitectura CRUD Tradicional"]
        C1["Client"]
        C1 -->|Operaciones CRUD| DM["Modelo de Datos Único<br/>(Read + Write)<br/>← Mismo modelo para lectura/escritura"]
        DM --> DB1["Database"]
    end

Arquitectura CQRS

flowchart TB
    subgraph CQRS["Arquitectura CQRS"]
        C2["Client"]
        C3["Client"]
        C2 -->|Command| CM["Command Model<br/>(Write)"]
        C3 -->|Query| QM["Query Model<br/>(Read)"]
        CM --> WS["Write Store<br/>(normalizado)"]
        QM --> RS["Read Store<br/>(desnormalizado)"]
        WS -->|sincronización| RS
    end

Cuándo Adoptar CQRS

Caso de UsoDescripción
Carga asimétrica de lectura/escrituraLa lectura es abrumadoramente mayor (redes sociales, sitios e-commerce)
Lógica de dominio complejaAplicar reglas de negocio complejas al escribir
Se necesitan optimizaciones diferentesTransacciones para escritura, caché para lectura
Múltiples vistas de lecturaMostrar los mismos datos en diferentes formatos
Requisitos de auditoría/historialNecesidad de mantener todo el historial de cambios

Implementación Básica de CQRS

Lado del Command (Escritura)

// commands/types.ts
export interface Command {
  type: string;
  payload: unknown;
  metadata: {
    userId: string;
    timestamp: Date;
    correlationId: string;
  };
}

export interface CreateOrderCommand extends Command {
  type: 'CreateOrder';
  payload: {
    customerId: string;
    items: Array<{
      productId: string;
      quantity: number;
      unitPrice: number;
    }>;
    shippingAddress: Address;
  };
}

export interface CancelOrderCommand extends Command {
  type: 'CancelOrder';
  payload: {
    orderId: string;
    reason: string;
  };
}
// commands/handlers/order-command-handler.ts
import { Injectable } from '@nestjs/common';
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { CreateOrderCommand, CancelOrderCommand } from '../types';
import { OrderRepository } from '../../domain/repositories/order-repository';
import { Order } from '../../domain/entities/order';
import { EventPublisher } from '../../infrastructure/event-publisher';

@Injectable()
@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
  constructor(
    private readonly orderRepository: OrderRepository,
    private readonly eventPublisher: EventPublisher,
  ) {}

  async execute(command: CreateOrderCommand): Promise<string> {
    // Ejecución de lógica de dominio
    const order = Order.create({
      customerId: command.payload.customerId,
      items: command.payload.items,
      shippingAddress: command.payload.shippingAddress,
    });

    // Validación
    order.validate();

    // Persistencia
    await this.orderRepository.save(order);

    // Publicación de eventos
    await this.eventPublisher.publish(order.getUncommittedEvents());

    return order.id;
  }
}

@Injectable()
@CommandHandler(CancelOrderCommand)
export class CancelOrderHandler implements ICommandHandler<CancelOrderCommand> {
  constructor(
    private readonly orderRepository: OrderRepository,
    private readonly eventPublisher: EventPublisher,
  ) {}

  async execute(command: CancelOrderCommand): Promise<void> {
    const order = await this.orderRepository.findById(command.payload.orderId);

    if (!order) {
      throw new OrderNotFoundError(command.payload.orderId);
    }

    // Lógica de dominio: verificar si se puede cancelar
    order.cancel(command.payload.reason);

    await this.orderRepository.save(order);
    await this.eventPublisher.publish(order.getUncommittedEvents());
  }
}

Lado del Query (Lectura)

// queries/types.ts
export interface Query {
  type: string;
}

export interface GetOrderByIdQuery extends Query {
  type: 'GetOrderById';
  orderId: string;
}

export interface GetOrdersByCustomerQuery extends Query {
  type: 'GetOrdersByCustomer';
  customerId: string;
  pagination: {
    page: number;
    limit: number;
  };
  filters?: {
    status?: OrderStatus[];
    dateRange?: { from: Date; to: Date };
  };
}
// queries/handlers/order-query-handler.ts
import { Injectable } from '@nestjs/common';
import { QueryHandler, IQueryHandler } from '@nestjs/cqrs';
import { GetOrderByIdQuery, GetOrdersByCustomerQuery } from '../types';
import { OrderReadRepository } from '../../infrastructure/read-repositories';

// DTO de solo lectura
interface OrderDetailDto {
  id: string;
  customerName: string;
  items: Array<{
    productName: string;
    quantity: number;
    totalPrice: number;
  }>;
  status: string;
  totalAmount: number;
  createdAt: Date;
}

@Injectable()
@QueryHandler(GetOrderByIdQuery)
export class GetOrderByIdHandler implements IQueryHandler<GetOrderByIdQuery> {
  constructor(private readonly readRepository: OrderReadRepository) {}

  async execute(query: GetOrderByIdQuery): Promise<OrderDetailDto | null> {
    // Obtener directamente del almacén optimizado para lectura
    return this.readRepository.findById(query.orderId);
  }
}

@Injectable()
@QueryHandler(GetOrdersByCustomerQuery)
export class GetOrdersByCustomerHandler
  implements IQueryHandler<GetOrdersByCustomerQuery>
{
  constructor(private readonly readRepository: OrderReadRepository) {}

  async execute(query: GetOrdersByCustomerQuery): Promise<{
    orders: OrderDetailDto[];
    total: number;
    hasMore: boolean;
  }> {
    return this.readRepository.findByCustomer(
      query.customerId,
      query.pagination,
      query.filters,
    );
  }
}

Event Sourcing

Event Sourcing es un patrón que registra los cambios de estado como “eventos” y deriva el estado actual de la acumulación de eventos.

Almacenamiento de estado tradicional:

flowchart LR
    subgraph Traditional["Almacenamiento Tradicional"]
        O["Order #123<br/>status: 'shipped' ← Solo se guarda el último estado<br/>total: 15000"]
    end

Event Sourcing:

flowchart TB
    subgraph ES["Event Stream para Order #123"]
        E1["[1] OrderCreated<br/>items: [...], total: 10000"]
        E2["[2] ItemAdded<br/>productId: 'P456', qty: 2"]
        E3["[3] OrderConfirmed<br/>confirmedAt: '...'"]
        E4["[4] PaymentReceived<br/>amount: 15000"]
        E5["[5] OrderShipped<br/>trackingNo: 'TRK123'"]
        E1 --> E2 --> E3 --> E4 --> E5
    end
    E5 --> State["Estado actual = Aplicar eventos [1]~[5] en orden"]

Definición de Eventos de Dominio

// domain/events/order-events.ts
export interface DomainEvent {
  eventId: string;
  eventType: string;
  aggregateId: string;
  aggregateType: string;
  version: number;
  timestamp: Date;
  payload: unknown;
  metadata: {
    userId?: string;
    correlationId?: string;
    causationId?: string;
  };
}

export interface OrderCreatedEvent extends DomainEvent {
  eventType: 'OrderCreated';
  payload: {
    customerId: string;
    items: OrderItem[];
    shippingAddress: Address;
    totalAmount: number;
  };
}

export interface OrderItemAddedEvent extends DomainEvent {
  eventType: 'OrderItemAdded';
  payload: {
    item: OrderItem;
    newTotal: number;
  };
}

export interface OrderShippedEvent extends DomainEvent {
  eventType: 'OrderShipped';
  payload: {
    shippedAt: Date;
    trackingNumber: string;
    carrier: string;
  };
}

export interface OrderCancelledEvent extends DomainEvent {
  eventType: 'OrderCancelled';
  payload: {
    reason: string;
    cancelledAt: Date;
    refundAmount: number;
  };
}

Agregado Compatible con Event Sourcing

// domain/aggregates/order.ts
import { AggregateRoot } from '../base/aggregate-root';
import {
  OrderCreatedEvent,
  OrderItemAddedEvent,
  OrderShippedEvent,
  OrderCancelledEvent,
} from '../events/order-events';

export class Order extends AggregateRoot {
  private _customerId: string;
  private _items: OrderItem[] = [];
  private _status: OrderStatus = 'draft';
  private _totalAmount: number = 0;
  private _shippingAddress: Address;

  // Restaurar estado desde eventos
  static fromEvents(events: DomainEvent[]): Order {
    const order = new Order();
    events.forEach((event) => order.apply(event, false));
    return order;
  }

  // Comando: crear pedido
  static create(params: CreateOrderParams): Order {
    const order = new Order();
    const event: OrderCreatedEvent = {
      eventId: generateId(),
      eventType: 'OrderCreated',
      aggregateId: generateId(),
      aggregateType: 'Order',
      version: 1,
      timestamp: new Date(),
      payload: {
        customerId: params.customerId,
        items: params.items,
        shippingAddress: params.shippingAddress,
        totalAmount: calculateTotal(params.items),
      },
      metadata: {},
    };

    order.apply(event, true);
    return order;
  }

  // Comando: añadir artículo
  addItem(item: OrderItem): void {
    if (this._status !== 'draft' && this._status !== 'pending') {
      throw new InvalidOperationError('Cannot add items to this order');
    }

    const event: OrderItemAddedEvent = {
      eventId: generateId(),
      eventType: 'OrderItemAdded',
      aggregateId: this.id,
      aggregateType: 'Order',
      version: this.version + 1,
      timestamp: new Date(),
      payload: {
        item,
        newTotal: this._totalAmount + item.quantity * item.unitPrice,
      },
      metadata: {},
    };

    this.apply(event, true);
  }

  // Comando: enviar
  ship(trackingNumber: string, carrier: string): void {
    if (this._status !== 'paid') {
      throw new InvalidOperationError('Order must be paid before shipping');
    }

    const event: OrderShippedEvent = {
      eventId: generateId(),
      eventType: 'OrderShipped',
      aggregateId: this.id,
      aggregateType: 'Order',
      version: this.version + 1,
      timestamp: new Date(),
      payload: {
        shippedAt: new Date(),
        trackingNumber,
        carrier,
      },
      metadata: {},
    };

    this.apply(event, true);
  }

  // Manejador de eventos: actualización de estado
  protected when(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderCreated': {
        const e = event as OrderCreatedEvent;
        this._id = e.aggregateId;
        this._customerId = e.payload.customerId;
        this._items = e.payload.items;
        this._shippingAddress = e.payload.shippingAddress;
        this._totalAmount = e.payload.totalAmount;
        this._status = 'pending';
        break;
      }
      case 'OrderItemAdded': {
        const e = event as OrderItemAddedEvent;
        this._items.push(e.payload.item);
        this._totalAmount = e.payload.newTotal;
        break;
      }
      case 'OrderShipped': {
        this._status = 'shipped';
        break;
      }
      case 'OrderCancelled': {
        this._status = 'cancelled';
        break;
      }
    }
  }
}

Implementación del Event Store

// infrastructure/event-store.ts
import { Injectable } from '@nestjs/common';
import { Pool } from 'pg';

@Injectable()
export class EventStore {
  constructor(private readonly pool: Pool) {}

  async appendEvents(
    streamId: string,
    events: DomainEvent[],
    expectedVersion: number,
  ): Promise<void> {
    const client = await this.pool.connect();

    try {
      await client.query('BEGIN');

      // Bloqueo optimista: verificación de versión
      const versionCheck = await client.query(
        `SELECT MAX(version) as current_version
         FROM events
         WHERE stream_id = $1`,
        [streamId],
      );

      const currentVersion = versionCheck.rows[0]?.current_version ?? 0;

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but found ${currentVersion}`,
        );
      }

      // Añadir eventos
      for (const event of events) {
        await client.query(
          `INSERT INTO events (
            event_id, stream_id, event_type, version,
            payload, metadata, timestamp
          ) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
          [
            event.eventId,
            streamId,
            event.eventType,
            event.version,
            JSON.stringify(event.payload),
            JSON.stringify(event.metadata),
            event.timestamp,
          ],
        );
      }

      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async getEvents(
    streamId: string,
    fromVersion: number = 0,
  ): Promise<DomainEvent[]> {
    const result = await this.pool.query(
      `SELECT * FROM events
       WHERE stream_id = $1 AND version > $2
       ORDER BY version ASC`,
      [streamId, fromVersion],
    );

    return result.rows.map(this.mapToEvent);
  }
}

Sincronización del Modelo de Lectura (Proyección)

flowchart TB
    subgraph EventStore["Event Store"]
        E1["Event 1"]
        E2["Event 2"]
        E3["Event 3"]
        E4["..."]
    end

    P["Projector<br/>(Manejador de eventos)"]

    subgraph ReadDBs["Bases de Datos de Lectura"]
        RD1["Read DB 1<br/>(Lista de pedidos)"]
        RD2["Read DB 2<br/>(Agregado de ventas)"]
    end

    EventStore --> P
    P --> RD1
    P --> RD2

Características:

  • Procesamiento asíncrono (consistencia eventual)
  • Puede generar múltiples modelos de lectura
  • Reconstruible (reproduciendo eventos)
// projections/order-list-projection.ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { OrderReadRepository } from '../infrastructure/read-repositories';

@Injectable()
export class OrderListProjection {
  constructor(private readonly readRepository: OrderReadRepository) {}

  @OnEvent('OrderCreated')
  async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
    await this.readRepository.insert({
      id: event.aggregateId,
      customerId: event.payload.customerId,
      customerName: await this.getCustomerName(event.payload.customerId),
      status: 'pending',
      totalAmount: event.payload.totalAmount,
      itemCount: event.payload.items.length,
      createdAt: event.timestamp,
      updatedAt: event.timestamp,
    });
  }

  @OnEvent('OrderShipped')
  async handleOrderShipped(event: OrderShippedEvent): Promise<void> {
    await this.readRepository.update(event.aggregateId, {
      status: 'shipped',
      shippedAt: event.payload.shippedAt,
      trackingNumber: event.payload.trackingNumber,
      updatedAt: event.timestamp,
    });
  }

  @OnEvent('OrderCancelled')
  async handleOrderCancelled(event: OrderCancelledEvent): Promise<void> {
    await this.readRepository.update(event.aggregateId, {
      status: 'cancelled',
      cancelledAt: event.payload.cancelledAt,
      cancellationReason: event.payload.reason,
      updatedAt: event.timestamp,
    });
  }
}

Patrón Saga

Patrón para gestionar transacciones que abarcan múltiples agregados.

// sagas/order-saga.ts
import { Injectable } from '@nestjs/common';
import { Saga, ICommand, ofType } from '@nestjs/cqrs';
import { Observable } from 'rxjs';
import { map, filter } from 'rxjs/operators';

@Injectable()
export class OrderSaga {
  @Saga()
  orderCreated = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(OrderCreatedEvent),
      map((event) => {
        // Emitir comando de reserva de inventario
        return new ReserveInventoryCommand({
          orderId: event.aggregateId,
          items: event.payload.items,
        });
      }),
    );
  };

  @Saga()
  inventoryReserved = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(InventoryReservedEvent),
      map((event) => {
        // Emitir comando de procesamiento de pago
        return new ProcessPaymentCommand({
          orderId: event.payload.orderId,
          amount: event.payload.totalAmount,
        });
      }),
    );
  };

  @Saga()
  paymentFailed = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(PaymentFailedEvent),
      map((event) => {
        // Transacción compensatoria: liberar inventario
        return new ReleaseInventoryCommand({
          orderId: event.payload.orderId,
        });
      }),
    );
  };
}

Consideraciones de Implementación

DesafíoSolución
Consistencia eventualRetroalimentación adecuada en UI, actualización optimista
Evolución del esquema de eventosVersionado de eventos, upcasting
Reconstrucción de proyeccionesSnapshots, procesamiento paralelo
Complejidad de depuraciónID de correlación, logs detallados, herramientas de visualización de eventos
Costo de adopción inicialAdopción gradual, comenzar con dominios críticos

Enlaces de Referencia

← Volver a la lista