Introdução a CQRS e Event Sourcing - Design de Arquitetura Escalável

Avancado | 2025.12.02

O que é CQRS

CQRS (Command Query Responsibility Segregation) é um padrão de arquitetura que separa leitura (Query) e escrita (Command) de dados.

Arquitetura CRUD Tradicional

flowchart TB
    subgraph CRUD["Arquitetura CRUD Tradicional"]
        C1["Client"]
        C1 -->|Operacoes CRUD| DM["Modelo de Dados Unico<br/>(Read + Write)<br/>← Mesmo modelo para leitura e escrita"]
        DM --> DB1["Database"]
    end

Arquitetura CQRS

flowchart TB
    subgraph CQRS["Arquitetura CQRS"]
        C2["Client"]
        C3["Client"]
        C2 -->|Command| CM["Command Model<br/>(Write)"]
        C3 -->|Query| QM["Query Model<br/>(Read)"]
        CM --> WS["Write Store<br/>(Normalizado)"]
        QM --> RS["Read Store<br/>(Desnormalizado)"]
        WS -->|Sincronizacao| RS
    end

Quando Adotar CQRS

Caso de UsoDescricao
Carga assimetrica de leitura/escritaLeitura muito maior que escrita (redes sociais, e-commerce)
Logica de dominio complexaAplicar regras de negocio complexas na escrita
Otimizacoes diferentes necessariasTransacoes para escrita, cache para leitura
Multiplas views de leituraExibir os mesmos dados em formatos diferentes
Requisitos de auditoria/historicoNecessidade de manter historico de todas as alteracoes

Implementação Básica de CQRS

Lado do Command (Escrita)

// commands/types.ts
export interface Command {
  type: string;
  payload: unknown;
  metadata: {
    userId: string;
    timestamp: Date;
    correlationId: string;
  };
}

export interface CreateOrderCommand extends Command {
  type: 'CreateOrder';
  payload: {
    customerId: string;
    items: Array<{
      productId: string;
      quantity: number;
      unitPrice: number;
    }>;
    shippingAddress: Address;
  };
}

export interface CancelOrderCommand extends Command {
  type: 'CancelOrder';
  payload: {
    orderId: string;
    reason: string;
  };
}
// commands/handlers/order-command-handler.ts
import { Injectable } from '@nestjs/common';
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { CreateOrderCommand, CancelOrderCommand } from '../types';
import { OrderRepository } from '../../domain/repositories/order-repository';
import { Order } from '../../domain/entities/order';
import { EventPublisher } from '../../infrastructure/event-publisher';

@Injectable()
@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
  constructor(
    private readonly orderRepository: OrderRepository,
    private readonly eventPublisher: EventPublisher,
  ) {}

  async execute(command: CreateOrderCommand): Promise<string> {
    // Execucao da logica de dominio
    const order = Order.create({
      customerId: command.payload.customerId,
      items: command.payload.items,
      shippingAddress: command.payload.shippingAddress,
    });

    // Validacao
    order.validate();

    // Persistencia
    await this.orderRepository.save(order);

    // Publicacao de eventos
    await this.eventPublisher.publish(order.getUncommittedEvents());

    return order.id;
  }
}

@Injectable()
@CommandHandler(CancelOrderCommand)
export class CancelOrderHandler implements ICommandHandler<CancelOrderCommand> {
  constructor(
    private readonly orderRepository: OrderRepository,
    private readonly eventPublisher: EventPublisher,
  ) {}

  async execute(command: CancelOrderCommand): Promise<void> {
    const order = await this.orderRepository.findById(command.payload.orderId);

    if (!order) {
      throw new OrderNotFoundError(command.payload.orderId);
    }

    // Logica de dominio: verificar se pode cancelar
    order.cancel(command.payload.reason);

    await this.orderRepository.save(order);
    await this.eventPublisher.publish(order.getUncommittedEvents());
  }
}

Lado do Query (Leitura)

// queries/types.ts
export interface Query {
  type: string;
}

export interface GetOrderByIdQuery extends Query {
  type: 'GetOrderById';
  orderId: string;
}

export interface GetOrdersByCustomerQuery extends Query {
  type: 'GetOrdersByCustomer';
  customerId: string;
  pagination: {
    page: number;
    limit: number;
  };
  filters?: {
    status?: OrderStatus[];
    dateRange?: { from: Date; to: Date };
  };
}
// queries/handlers/order-query-handler.ts
import { Injectable } from '@nestjs/common';
import { QueryHandler, IQueryHandler } from '@nestjs/cqrs';
import { GetOrderByIdQuery, GetOrdersByCustomerQuery } from '../types';
import { OrderReadRepository } from '../../infrastructure/read-repositories';

// DTO somente leitura
interface OrderDetailDto {
  id: string;
  customerName: string;
  items: Array<{
    productName: string;
    quantity: number;
    totalPrice: number;
  }>;
  status: string;
  totalAmount: number;
  createdAt: Date;
}

@Injectable()
@QueryHandler(GetOrderByIdQuery)
export class GetOrderByIdHandler implements IQueryHandler<GetOrderByIdQuery> {
  constructor(private readonly readRepository: OrderReadRepository) {}

  async execute(query: GetOrderByIdQuery): Promise<OrderDetailDto | null> {
    // Obter diretamente do store otimizado para leitura
    return this.readRepository.findById(query.orderId);
  }
}

@Injectable()
@QueryHandler(GetOrdersByCustomerQuery)
export class GetOrdersByCustomerHandler
  implements IQueryHandler<GetOrdersByCustomerQuery>
{
  constructor(private readonly readRepository: OrderReadRepository) {}

  async execute(query: GetOrdersByCustomerQuery): Promise<{
    orders: OrderDetailDto[];
    total: number;
    hasMore: boolean;
  }> {
    return this.readRepository.findByCustomer(
      query.customerId,
      query.pagination,
      query.filters,
    );
  }
}

Event Sourcing

Event Sourcing é um padrão que registra mudanças de estado como “eventos” e deriva o estado atual a partir do acúmulo de eventos.

Armazenamento de Estado Tradicional:

flowchart LR
    subgraph Traditional["Armazenamento de Estado Tradicional"]
        O["Order #123<br/>status: 'shipped' ← Apenas estado mais recente salvo<br/>total: 15000"]
    end

Event Sourcing:

flowchart TB
    subgraph ES["Event Stream para Order #123"]
        E1["[1] OrderCreated<br/>items: [...], total: 10000"]
        E2["[2] ItemAdded<br/>productId: 'P456', qty: 2"]
        E3["[3] OrderConfirmed<br/>confirmedAt: '...'"]
        E4["[4] PaymentReceived<br/>amount: 15000"]
        E5["[5] OrderShipped<br/>trackingNo: 'TRK123'"]
        E1 --> E2 --> E3 --> E4 --> E5
    end
    E5 --> State["Estado atual = Aplicar eventos [1]~[5] em ordem"]

Definição de Eventos de Domínio

// domain/events/order-events.ts
export interface DomainEvent {
  eventId: string;
  eventType: string;
  aggregateId: string;
  aggregateType: string;
  version: number;
  timestamp: Date;
  payload: unknown;
  metadata: {
    userId?: string;
    correlationId?: string;
    causationId?: string;
  };
}

export interface OrderCreatedEvent extends DomainEvent {
  eventType: 'OrderCreated';
  payload: {
    customerId: string;
    items: OrderItem[];
    shippingAddress: Address;
    totalAmount: number;
  };
}

export interface OrderItemAddedEvent extends DomainEvent {
  eventType: 'OrderItemAdded';
  payload: {
    item: OrderItem;
    newTotal: number;
  };
}

export interface OrderShippedEvent extends DomainEvent {
  eventType: 'OrderShipped';
  payload: {
    shippedAt: Date;
    trackingNumber: string;
    carrier: string;
  };
}

export interface OrderCancelledEvent extends DomainEvent {
  eventType: 'OrderCancelled';
  payload: {
    reason: string;
    cancelledAt: Date;
    refundAmount: number;
  };
}

Agregado com Suporte a Event Sourcing

// domain/aggregates/order.ts
import { AggregateRoot } from '../base/aggregate-root';
import {
  OrderCreatedEvent,
  OrderItemAddedEvent,
  OrderShippedEvent,
  OrderCancelledEvent,
} from '../events/order-events';

export class Order extends AggregateRoot {
  private _customerId: string;
  private _items: OrderItem[] = [];
  private _status: OrderStatus = 'draft';
  private _totalAmount: number = 0;
  private _shippingAddress: Address;

  // Restaurar estado a partir de eventos
  static fromEvents(events: DomainEvent[]): Order {
    const order = new Order();
    events.forEach((event) => order.apply(event, false));
    return order;
  }

  // Command: criar pedido
  static create(params: CreateOrderParams): Order {
    const order = new Order();
    const event: OrderCreatedEvent = {
      eventId: generateId(),
      eventType: 'OrderCreated',
      aggregateId: generateId(),
      aggregateType: 'Order',
      version: 1,
      timestamp: new Date(),
      payload: {
        customerId: params.customerId,
        items: params.items,
        shippingAddress: params.shippingAddress,
        totalAmount: calculateTotal(params.items),
      },
      metadata: {},
    };

    order.apply(event, true);
    return order;
  }

  // Command: adicionar item
  addItem(item: OrderItem): void {
    if (this._status !== 'draft' && this._status !== 'pending') {
      throw new InvalidOperationError('Cannot add items to this order');
    }

    const event: OrderItemAddedEvent = {
      eventId: generateId(),
      eventType: 'OrderItemAdded',
      aggregateId: this.id,
      aggregateType: 'Order',
      version: this.version + 1,
      timestamp: new Date(),
      payload: {
        item,
        newTotal: this._totalAmount + item.quantity * item.unitPrice,
      },
      metadata: {},
    };

    this.apply(event, true);
  }

  // Command: enviar
  ship(trackingNumber: string, carrier: string): void {
    if (this._status !== 'paid') {
      throw new InvalidOperationError('Order must be paid before shipping');
    }

    const event: OrderShippedEvent = {
      eventId: generateId(),
      eventType: 'OrderShipped',
      aggregateId: this.id,
      aggregateType: 'Order',
      version: this.version + 1,
      timestamp: new Date(),
      payload: {
        shippedAt: new Date(),
        trackingNumber,
        carrier,
      },
      metadata: {},
    };

    this.apply(event, true);
  }

  // Event handler: atualizar estado
  protected when(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderCreated': {
        const e = event as OrderCreatedEvent;
        this._id = e.aggregateId;
        this._customerId = e.payload.customerId;
        this._items = e.payload.items;
        this._shippingAddress = e.payload.shippingAddress;
        this._totalAmount = e.payload.totalAmount;
        this._status = 'pending';
        break;
      }
      case 'OrderItemAdded': {
        const e = event as OrderItemAddedEvent;
        this._items.push(e.payload.item);
        this._totalAmount = e.payload.newTotal;
        break;
      }
      case 'OrderShipped': {
        this._status = 'shipped';
        break;
      }
      case 'OrderCancelled': {
        this._status = 'cancelled';
        break;
      }
    }
  }
}

Implementação do Event Store

// infrastructure/event-store.ts
import { Injectable } from '@nestjs/common';
import { Pool } from 'pg';

@Injectable()
export class EventStore {
  constructor(private readonly pool: Pool) {}

  async appendEvents(
    streamId: string,
    events: DomainEvent[],
    expectedVersion: number,
  ): Promise<void> {
    const client = await this.pool.connect();

    try {
      await client.query('BEGIN');

      // Lock otimista: verificacao de versao
      const versionCheck = await client.query(
        `SELECT MAX(version) as current_version
         FROM events
         WHERE stream_id = $1`,
        [streamId],
      );

      const currentVersion = versionCheck.rows[0]?.current_version ?? 0;

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but found ${currentVersion}`,
        );
      }

      // Adicionar eventos
      for (const event of events) {
        await client.query(
          `INSERT INTO events (
            event_id, stream_id, event_type, version,
            payload, metadata, timestamp
          ) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
          [
            event.eventId,
            streamId,
            event.eventType,
            event.version,
            JSON.stringify(event.payload),
            JSON.stringify(event.metadata),
            event.timestamp,
          ],
        );
      }

      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async getEvents(
    streamId: string,
    fromVersion: number = 0,
  ): Promise<DomainEvent[]> {
    const result = await this.pool.query(
      `SELECT * FROM events
       WHERE stream_id = $1 AND version > $2
       ORDER BY version ASC`,
      [streamId, fromVersion],
    );

    return result.rows.map(this.mapToEvent);
  }
}

Sincronização do Modelo de Leitura (Projection)

flowchart TB
    subgraph EventStore["Event Store"]
        E1["Event 1"]
        E2["Event 2"]
        E3["Event 3"]
        E4["..."]
    end

    P["Projector<br/>(Event Handler)"]

    subgraph ReadDBs["Read Databases"]
        RD1["Read DB 1<br/>(Lista de Pedidos)"]
        RD2["Read DB 2<br/>(Agregacao de Vendas)"]
    end

    EventStore --> P
    P --> RD1
    P --> RD2

Características:

  • Processamento assíncrono (consistência eventual)
  • Possível gerar múltiplos modelos de leitura
  • Reconstruível (replay de eventos)
// projections/order-list-projection.ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { OrderReadRepository } from '../infrastructure/read-repositories';

@Injectable()
export class OrderListProjection {
  constructor(private readonly readRepository: OrderReadRepository) {}

  @OnEvent('OrderCreated')
  async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
    await this.readRepository.insert({
      id: event.aggregateId,
      customerId: event.payload.customerId,
      customerName: await this.getCustomerName(event.payload.customerId),
      status: 'pending',
      totalAmount: event.payload.totalAmount,
      itemCount: event.payload.items.length,
      createdAt: event.timestamp,
      updatedAt: event.timestamp,
    });
  }

  @OnEvent('OrderShipped')
  async handleOrderShipped(event: OrderShippedEvent): Promise<void> {
    await this.readRepository.update(event.aggregateId, {
      status: 'shipped',
      shippedAt: event.payload.shippedAt,
      trackingNumber: event.payload.trackingNumber,
      updatedAt: event.timestamp,
    });
  }

  @OnEvent('OrderCancelled')
  async handleOrderCancelled(event: OrderCancelledEvent): Promise<void> {
    await this.readRepository.update(event.aggregateId, {
      status: 'cancelled',
      cancelledAt: event.payload.cancelledAt,
      cancellationReason: event.payload.reason,
      updatedAt: event.timestamp,
    });
  }
}

Padrão Saga

Um padrão para gerenciar transações que abrangem múltiplos agregados.

// sagas/order-saga.ts
import { Injectable } from '@nestjs/common';
import { Saga, ICommand, ofType } from '@nestjs/cqrs';
import { Observable } from 'rxjs';
import { map, filter } from 'rxjs/operators';

@Injectable()
export class OrderSaga {
  @Saga()
  orderCreated = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(OrderCreatedEvent),
      map((event) => {
        // Emitir comando de reserva de estoque
        return new ReserveInventoryCommand({
          orderId: event.aggregateId,
          items: event.payload.items,
        });
      }),
    );
  };

  @Saga()
  inventoryReserved = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(InventoryReservedEvent),
      map((event) => {
        // Emitir comando de processamento de pagamento
        return new ProcessPaymentCommand({
          orderId: event.payload.orderId,
          amount: event.payload.totalAmount,
        });
      }),
    );
  };

  @Saga()
  paymentFailed = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(PaymentFailedEvent),
      map((event) => {
        // Transacao compensatoria: liberar estoque
        return new ReleaseInventoryCommand({
          orderId: event.payload.orderId,
        });
      }),
    );
  };
}

Considerações de Implementação

DesafioSolucao
Consistencia eventualFeedback adequado na UI, atualizacao otimista
Evolucao de schema de eventosVersionamento de eventos, upcasting
Reconstrucao de projectionsSnapshots, processamento paralelo
Complexidade de debugCorrelation ID, logs detalhados, ferramentas de visualizacao de eventos
Custo inicial de implementacaoAdocao gradual, comecar por dominios importantes
← Voltar para a lista