¿Qué es CQRS?
CQRS (Command Query Responsibility Segregation) es un patrón de arquitectura que separa la lectura (Query) y la escritura (Command) de datos.
Arquitectura CRUD Tradicional
flowchart TB
subgraph CRUD["Arquitectura CRUD Tradicional"]
C1["Client"]
C1 -->|Operaciones CRUD| DM["Modelo de Datos Único<br/>(Read + Write)<br/>← Mismo modelo para lectura/escritura"]
DM --> DB1["Database"]
end
Arquitectura CQRS
flowchart TB
subgraph CQRS["Arquitectura CQRS"]
C2["Client"]
C3["Client"]
C2 -->|Command| CM["Command Model<br/>(Write)"]
C3 -->|Query| QM["Query Model<br/>(Read)"]
CM --> WS["Write Store<br/>(normalizado)"]
QM --> RS["Read Store<br/>(desnormalizado)"]
WS -->|sincronización| RS
end
Cuándo Adoptar CQRS
| Caso de Uso | Descripción |
|---|---|
| Carga asimétrica de lectura/escritura | La lectura es abrumadoramente mayor (redes sociales, sitios e-commerce) |
| Lógica de dominio compleja | Aplicar reglas de negocio complejas al escribir |
| Se necesitan optimizaciones diferentes | Transacciones para escritura, caché para lectura |
| Múltiples vistas de lectura | Mostrar los mismos datos en diferentes formatos |
| Requisitos de auditoría/historial | Necesidad de mantener todo el historial de cambios |
Implementación Básica de CQRS
Lado del Command (Escritura)
// commands/types.ts
export interface Command {
type: string;
payload: unknown;
metadata: {
userId: string;
timestamp: Date;
correlationId: string;
};
}
export interface CreateOrderCommand extends Command {
type: 'CreateOrder';
payload: {
customerId: string;
items: Array<{
productId: string;
quantity: number;
unitPrice: number;
}>;
shippingAddress: Address;
};
}
export interface CancelOrderCommand extends Command {
type: 'CancelOrder';
payload: {
orderId: string;
reason: string;
};
}
// commands/handlers/order-command-handler.ts
import { Injectable } from '@nestjs/common';
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { CreateOrderCommand, CancelOrderCommand } from '../types';
import { OrderRepository } from '../../domain/repositories/order-repository';
import { Order } from '../../domain/entities/order';
import { EventPublisher } from '../../infrastructure/event-publisher';
@Injectable()
@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
constructor(
private readonly orderRepository: OrderRepository,
private readonly eventPublisher: EventPublisher,
) {}
async execute(command: CreateOrderCommand): Promise<string> {
// Ejecución de lógica de dominio
const order = Order.create({
customerId: command.payload.customerId,
items: command.payload.items,
shippingAddress: command.payload.shippingAddress,
});
// Validación
order.validate();
// Persistencia
await this.orderRepository.save(order);
// Publicación de eventos
await this.eventPublisher.publish(order.getUncommittedEvents());
return order.id;
}
}
@Injectable()
@CommandHandler(CancelOrderCommand)
export class CancelOrderHandler implements ICommandHandler<CancelOrderCommand> {
constructor(
private readonly orderRepository: OrderRepository,
private readonly eventPublisher: EventPublisher,
) {}
async execute(command: CancelOrderCommand): Promise<void> {
const order = await this.orderRepository.findById(command.payload.orderId);
if (!order) {
throw new OrderNotFoundError(command.payload.orderId);
}
// Lógica de dominio: verificar si se puede cancelar
order.cancel(command.payload.reason);
await this.orderRepository.save(order);
await this.eventPublisher.publish(order.getUncommittedEvents());
}
}
Lado del Query (Lectura)
// queries/types.ts
export interface Query {
type: string;
}
export interface GetOrderByIdQuery extends Query {
type: 'GetOrderById';
orderId: string;
}
export interface GetOrdersByCustomerQuery extends Query {
type: 'GetOrdersByCustomer';
customerId: string;
pagination: {
page: number;
limit: number;
};
filters?: {
status?: OrderStatus[];
dateRange?: { from: Date; to: Date };
};
}
// queries/handlers/order-query-handler.ts
import { Injectable } from '@nestjs/common';
import { QueryHandler, IQueryHandler } from '@nestjs/cqrs';
import { GetOrderByIdQuery, GetOrdersByCustomerQuery } from '../types';
import { OrderReadRepository } from '../../infrastructure/read-repositories';
// DTO de solo lectura
interface OrderDetailDto {
id: string;
customerName: string;
items: Array<{
productName: string;
quantity: number;
totalPrice: number;
}>;
status: string;
totalAmount: number;
createdAt: Date;
}
@Injectable()
@QueryHandler(GetOrderByIdQuery)
export class GetOrderByIdHandler implements IQueryHandler<GetOrderByIdQuery> {
constructor(private readonly readRepository: OrderReadRepository) {}
async execute(query: GetOrderByIdQuery): Promise<OrderDetailDto | null> {
// Obtener directamente del almacén optimizado para lectura
return this.readRepository.findById(query.orderId);
}
}
@Injectable()
@QueryHandler(GetOrdersByCustomerQuery)
export class GetOrdersByCustomerHandler
implements IQueryHandler<GetOrdersByCustomerQuery>
{
constructor(private readonly readRepository: OrderReadRepository) {}
async execute(query: GetOrdersByCustomerQuery): Promise<{
orders: OrderDetailDto[];
total: number;
hasMore: boolean;
}> {
return this.readRepository.findByCustomer(
query.customerId,
query.pagination,
query.filters,
);
}
}
Event Sourcing
Event Sourcing es un patrón que registra los cambios de estado como “eventos” y deriva el estado actual de la acumulación de eventos.
Almacenamiento de estado tradicional:
flowchart LR
subgraph Traditional["Almacenamiento Tradicional"]
O["Order #123<br/>status: 'shipped' ← Solo se guarda el último estado<br/>total: 15000"]
end
Event Sourcing:
flowchart TB
subgraph ES["Event Stream para Order #123"]
E1["[1] OrderCreated<br/>items: [...], total: 10000"]
E2["[2] ItemAdded<br/>productId: 'P456', qty: 2"]
E3["[3] OrderConfirmed<br/>confirmedAt: '...'"]
E4["[4] PaymentReceived<br/>amount: 15000"]
E5["[5] OrderShipped<br/>trackingNo: 'TRK123'"]
E1 --> E2 --> E3 --> E4 --> E5
end
E5 --> State["Estado actual = Aplicar eventos [1]~[5] en orden"]
Definición de Eventos de Dominio
// domain/events/order-events.ts
export interface DomainEvent {
eventId: string;
eventType: string;
aggregateId: string;
aggregateType: string;
version: number;
timestamp: Date;
payload: unknown;
metadata: {
userId?: string;
correlationId?: string;
causationId?: string;
};
}
export interface OrderCreatedEvent extends DomainEvent {
eventType: 'OrderCreated';
payload: {
customerId: string;
items: OrderItem[];
shippingAddress: Address;
totalAmount: number;
};
}
export interface OrderItemAddedEvent extends DomainEvent {
eventType: 'OrderItemAdded';
payload: {
item: OrderItem;
newTotal: number;
};
}
export interface OrderShippedEvent extends DomainEvent {
eventType: 'OrderShipped';
payload: {
shippedAt: Date;
trackingNumber: string;
carrier: string;
};
}
export interface OrderCancelledEvent extends DomainEvent {
eventType: 'OrderCancelled';
payload: {
reason: string;
cancelledAt: Date;
refundAmount: number;
};
}
Agregado Compatible con Event Sourcing
// domain/aggregates/order.ts
import { AggregateRoot } from '../base/aggregate-root';
import {
OrderCreatedEvent,
OrderItemAddedEvent,
OrderShippedEvent,
OrderCancelledEvent,
} from '../events/order-events';
export class Order extends AggregateRoot {
private _customerId: string;
private _items: OrderItem[] = [];
private _status: OrderStatus = 'draft';
private _totalAmount: number = 0;
private _shippingAddress: Address;
// Restaurar estado desde eventos
static fromEvents(events: DomainEvent[]): Order {
const order = new Order();
events.forEach((event) => order.apply(event, false));
return order;
}
// Comando: crear pedido
static create(params: CreateOrderParams): Order {
const order = new Order();
const event: OrderCreatedEvent = {
eventId: generateId(),
eventType: 'OrderCreated',
aggregateId: generateId(),
aggregateType: 'Order',
version: 1,
timestamp: new Date(),
payload: {
customerId: params.customerId,
items: params.items,
shippingAddress: params.shippingAddress,
totalAmount: calculateTotal(params.items),
},
metadata: {},
};
order.apply(event, true);
return order;
}
// Comando: añadir artículo
addItem(item: OrderItem): void {
if (this._status !== 'draft' && this._status !== 'pending') {
throw new InvalidOperationError('Cannot add items to this order');
}
const event: OrderItemAddedEvent = {
eventId: generateId(),
eventType: 'OrderItemAdded',
aggregateId: this.id,
aggregateType: 'Order',
version: this.version + 1,
timestamp: new Date(),
payload: {
item,
newTotal: this._totalAmount + item.quantity * item.unitPrice,
},
metadata: {},
};
this.apply(event, true);
}
// Comando: enviar
ship(trackingNumber: string, carrier: string): void {
if (this._status !== 'paid') {
throw new InvalidOperationError('Order must be paid before shipping');
}
const event: OrderShippedEvent = {
eventId: generateId(),
eventType: 'OrderShipped',
aggregateId: this.id,
aggregateType: 'Order',
version: this.version + 1,
timestamp: new Date(),
payload: {
shippedAt: new Date(),
trackingNumber,
carrier,
},
metadata: {},
};
this.apply(event, true);
}
// Manejador de eventos: actualización de estado
protected when(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderCreated': {
const e = event as OrderCreatedEvent;
this._id = e.aggregateId;
this._customerId = e.payload.customerId;
this._items = e.payload.items;
this._shippingAddress = e.payload.shippingAddress;
this._totalAmount = e.payload.totalAmount;
this._status = 'pending';
break;
}
case 'OrderItemAdded': {
const e = event as OrderItemAddedEvent;
this._items.push(e.payload.item);
this._totalAmount = e.payload.newTotal;
break;
}
case 'OrderShipped': {
this._status = 'shipped';
break;
}
case 'OrderCancelled': {
this._status = 'cancelled';
break;
}
}
}
}
Implementación del Event Store
// infrastructure/event-store.ts
import { Injectable } from '@nestjs/common';
import { Pool } from 'pg';
@Injectable()
export class EventStore {
constructor(private readonly pool: Pool) {}
async appendEvents(
streamId: string,
events: DomainEvent[],
expectedVersion: number,
): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Bloqueo optimista: verificación de versión
const versionCheck = await client.query(
`SELECT MAX(version) as current_version
FROM events
WHERE stream_id = $1`,
[streamId],
);
const currentVersion = versionCheck.rows[0]?.current_version ?? 0;
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but found ${currentVersion}`,
);
}
// Añadir eventos
for (const event of events) {
await client.query(
`INSERT INTO events (
event_id, stream_id, event_type, version,
payload, metadata, timestamp
) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
event.eventId,
streamId,
event.eventType,
event.version,
JSON.stringify(event.payload),
JSON.stringify(event.metadata),
event.timestamp,
],
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async getEvents(
streamId: string,
fromVersion: number = 0,
): Promise<DomainEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events
WHERE stream_id = $1 AND version > $2
ORDER BY version ASC`,
[streamId, fromVersion],
);
return result.rows.map(this.mapToEvent);
}
}
Sincronización del Modelo de Lectura (Proyección)
flowchart TB
subgraph EventStore["Event Store"]
E1["Event 1"]
E2["Event 2"]
E3["Event 3"]
E4["..."]
end
P["Projector<br/>(Manejador de eventos)"]
subgraph ReadDBs["Bases de Datos de Lectura"]
RD1["Read DB 1<br/>(Lista de pedidos)"]
RD2["Read DB 2<br/>(Agregado de ventas)"]
end
EventStore --> P
P --> RD1
P --> RD2
Características:
- Procesamiento asíncrono (consistencia eventual)
- Puede generar múltiples modelos de lectura
- Reconstruible (reproduciendo eventos)
// projections/order-list-projection.ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { OrderReadRepository } from '../infrastructure/read-repositories';
@Injectable()
export class OrderListProjection {
constructor(private readonly readRepository: OrderReadRepository) {}
@OnEvent('OrderCreated')
async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
await this.readRepository.insert({
id: event.aggregateId,
customerId: event.payload.customerId,
customerName: await this.getCustomerName(event.payload.customerId),
status: 'pending',
totalAmount: event.payload.totalAmount,
itemCount: event.payload.items.length,
createdAt: event.timestamp,
updatedAt: event.timestamp,
});
}
@OnEvent('OrderShipped')
async handleOrderShipped(event: OrderShippedEvent): Promise<void> {
await this.readRepository.update(event.aggregateId, {
status: 'shipped',
shippedAt: event.payload.shippedAt,
trackingNumber: event.payload.trackingNumber,
updatedAt: event.timestamp,
});
}
@OnEvent('OrderCancelled')
async handleOrderCancelled(event: OrderCancelledEvent): Promise<void> {
await this.readRepository.update(event.aggregateId, {
status: 'cancelled',
cancelledAt: event.payload.cancelledAt,
cancellationReason: event.payload.reason,
updatedAt: event.timestamp,
});
}
}
Patrón Saga
Patrón para gestionar transacciones que abarcan múltiples agregados.
// sagas/order-saga.ts
import { Injectable } from '@nestjs/common';
import { Saga, ICommand, ofType } from '@nestjs/cqrs';
import { Observable } from 'rxjs';
import { map, filter } from 'rxjs/operators';
@Injectable()
export class OrderSaga {
@Saga()
orderCreated = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(OrderCreatedEvent),
map((event) => {
// Emitir comando de reserva de inventario
return new ReserveInventoryCommand({
orderId: event.aggregateId,
items: event.payload.items,
});
}),
);
};
@Saga()
inventoryReserved = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(InventoryReservedEvent),
map((event) => {
// Emitir comando de procesamiento de pago
return new ProcessPaymentCommand({
orderId: event.payload.orderId,
amount: event.payload.totalAmount,
});
}),
);
};
@Saga()
paymentFailed = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(PaymentFailedEvent),
map((event) => {
// Transacción compensatoria: liberar inventario
return new ReleaseInventoryCommand({
orderId: event.payload.orderId,
});
}),
);
};
}
Consideraciones de Implementación
| Desafío | Solución |
|---|---|
| Consistencia eventual | Retroalimentación adecuada en UI, actualización optimista |
| Evolución del esquema de eventos | Versionado de eventos, upcasting |
| Reconstrucción de proyecciones | Snapshots, procesamiento paralelo |
| Complejidad de depuración | ID de correlación, logs detallados, herramientas de visualización de eventos |
| Costo de adopción inicial | Adopción gradual, comenzar con dominios críticos |