architecture

CQRS・イベントソーシング入門 - スケーラブルなアーキテクチャ設計

2025.12.02

CQRSとは

CQRS(Command Query Responsibility Segregation)は、データの読み取り(Query)と書き込み(Command)を分離するアーキテクチャパターンです。

従来のCRUDアーキテクチャ

flowchart TB
    subgraph CRUD["従来のCRUDアーキテクチャ"]
        C1["Client"]
        C1 -->|CRUD操作| DM["単一のデータモデル<br/>(Read + Write)<br/>← 読み書き同一モデル"]
        DM --> DB1["Database"]
    end

CQRSアーキテクチャ

flowchart TB
    subgraph CQRS["CQRSアーキテクチャ"]
        C2["Client"]
        C3["Client"]
        C2 -->|Command| CM["Command Model<br/>(Write)"]
        C3 -->|Query| QM["Query Model<br/>(Read)"]
        CM --> WS["Write Store<br/>(正規化)"]
        QM --> RS["Read Store<br/>(非正規化)"]
        WS -->|同期| RS
    end

CQRSを採用すべきケース

ユースケース説明
読み書きの負荷が非対称読み取りが圧倒的に多い(SNS、ECサイト)
複雑なドメインロジック書き込み時に複雑なビジネスルールを適用
異なる最適化が必要書き込みはトランザクション、読み取りはキャッシュ
複数の読み取りビュー同じデータを異なる形式で表示
監査・履歴要件すべての変更履歴を保持する必要がある

基本的なCQRS実装

Command側(書き込み)

// commands/types.ts
export interface Command {
  type: string;
  payload: unknown;
  metadata: {
    userId: string;
    timestamp: Date;
    correlationId: string;
  };
}

export interface CreateOrderCommand extends Command {
  type: 'CreateOrder';
  payload: {
    customerId: string;
    items: Array<{
      productId: string;
      quantity: number;
      unitPrice: number;
    }>;
    shippingAddress: Address;
  };
}

export interface CancelOrderCommand extends Command {
  type: 'CancelOrder';
  payload: {
    orderId: string;
    reason: string;
  };
}
// commands/handlers/order-command-handler.ts
import { Injectable } from '@nestjs/common';
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { CreateOrderCommand, CancelOrderCommand } from '../types';
import { OrderRepository } from '../../domain/repositories/order-repository';
import { Order } from '../../domain/entities/order';
import { EventPublisher } from '../../infrastructure/event-publisher';

@Injectable()
@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
  constructor(
    private readonly orderRepository: OrderRepository,
    private readonly eventPublisher: EventPublisher,
  ) {}

  async execute(command: CreateOrderCommand): Promise<string> {
    // ドメインロジックの実行
    const order = Order.create({
      customerId: command.payload.customerId,
      items: command.payload.items,
      shippingAddress: command.payload.shippingAddress,
    });

    // バリデーション
    order.validate();

    // 永続化
    await this.orderRepository.save(order);

    // イベント発行
    await this.eventPublisher.publish(order.getUncommittedEvents());

    return order.id;
  }
}

@Injectable()
@CommandHandler(CancelOrderCommand)
export class CancelOrderHandler implements ICommandHandler<CancelOrderCommand> {
  constructor(
    private readonly orderRepository: OrderRepository,
    private readonly eventPublisher: EventPublisher,
  ) {}

  async execute(command: CancelOrderCommand): Promise<void> {
    const order = await this.orderRepository.findById(command.payload.orderId);

    if (!order) {
      throw new OrderNotFoundError(command.payload.orderId);
    }

    // ドメインロジック:キャンセル可能かチェック
    order.cancel(command.payload.reason);

    await this.orderRepository.save(order);
    await this.eventPublisher.publish(order.getUncommittedEvents());
  }
}

Query側(読み取り)

// queries/types.ts
export interface Query {
  type: string;
}

export interface GetOrderByIdQuery extends Query {
  type: 'GetOrderById';
  orderId: string;
}

export interface GetOrdersByCustomerQuery extends Query {
  type: 'GetOrdersByCustomer';
  customerId: string;
  pagination: {
    page: number;
    limit: number;
  };
  filters?: {
    status?: OrderStatus[];
    dateRange?: { from: Date; to: Date };
  };
}
// queries/handlers/order-query-handler.ts
import { Injectable } from '@nestjs/common';
import { QueryHandler, IQueryHandler } from '@nestjs/cqrs';
import { GetOrderByIdQuery, GetOrdersByCustomerQuery } from '../types';
import { OrderReadRepository } from '../../infrastructure/read-repositories';

// 読み取り専用DTO
interface OrderDetailDto {
  id: string;
  customerName: string;
  items: Array<{
    productName: string;
    quantity: number;
    totalPrice: number;
  }>;
  status: string;
  totalAmount: number;
  createdAt: Date;
}

@Injectable()
@QueryHandler(GetOrderByIdQuery)
export class GetOrderByIdHandler implements IQueryHandler<GetOrderByIdQuery> {
  constructor(private readonly readRepository: OrderReadRepository) {}

  async execute(query: GetOrderByIdQuery): Promise<OrderDetailDto | null> {
    // 読み取り最適化されたストアから直接取得
    return this.readRepository.findById(query.orderId);
  }
}

@Injectable()
@QueryHandler(GetOrdersByCustomerQuery)
export class GetOrdersByCustomerHandler
  implements IQueryHandler<GetOrdersByCustomerQuery>
{
  constructor(private readonly readRepository: OrderReadRepository) {}

  async execute(query: GetOrdersByCustomerQuery): Promise<{
    orders: OrderDetailDto[];
    total: number;
    hasMore: boolean;
  }> {
    return this.readRepository.findByCustomer(
      query.customerId,
      query.pagination,
      query.filters,
    );
  }
}

イベントソーシング

イベントソーシングは、状態の変更を「イベント」として記録し、現在の状態をイベントの積み重ねから導出するパターンです。

従来の状態保存:

flowchart LR
    subgraph Traditional["従来の状態保存"]
        O["Order #123<br/>status: 'shipped' ← 最新の状態のみ保存<br/>total: 15000"]
    end

イベントソーシング:

flowchart TB
    subgraph ES["Event Stream for Order #123"]
        E1["[1] OrderCreated<br/>items: [...], total: 10000"]
        E2["[2] ItemAdded<br/>productId: 'P456', qty: 2"]
        E3["[3] OrderConfirmed<br/>confirmedAt: '...'"]
        E4["[4] PaymentReceived<br/>amount: 15000"]
        E5["[5] OrderShipped<br/>trackingNo: 'TRK123'"]
        E1 --> E2 --> E3 --> E4 --> E5
    end
    E5 --> State["現在の状態 = イベント[1]〜[5]を順番に適用"]

ドメインイベントの定義

// domain/events/order-events.ts
export interface DomainEvent {
  eventId: string;
  eventType: string;
  aggregateId: string;
  aggregateType: string;
  version: number;
  timestamp: Date;
  payload: unknown;
  metadata: {
    userId?: string;
    correlationId?: string;
    causationId?: string;
  };
}

export interface OrderCreatedEvent extends DomainEvent {
  eventType: 'OrderCreated';
  payload: {
    customerId: string;
    items: OrderItem[];
    shippingAddress: Address;
    totalAmount: number;
  };
}

export interface OrderItemAddedEvent extends DomainEvent {
  eventType: 'OrderItemAdded';
  payload: {
    item: OrderItem;
    newTotal: number;
  };
}

export interface OrderShippedEvent extends DomainEvent {
  eventType: 'OrderShipped';
  payload: {
    shippedAt: Date;
    trackingNumber: string;
    carrier: string;
  };
}

export interface OrderCancelledEvent extends DomainEvent {
  eventType: 'OrderCancelled';
  payload: {
    reason: string;
    cancelledAt: Date;
    refundAmount: number;
  };
}

イベントソーシング対応の集約

// domain/aggregates/order.ts
import { AggregateRoot } from '../base/aggregate-root';
import {
  OrderCreatedEvent,
  OrderItemAddedEvent,
  OrderShippedEvent,
  OrderCancelledEvent,
} from '../events/order-events';

export class Order extends AggregateRoot {
  private _customerId: string;
  private _items: OrderItem[] = [];
  private _status: OrderStatus = 'draft';
  private _totalAmount: number = 0;
  private _shippingAddress: Address;

  // イベントから状態を復元
  static fromEvents(events: DomainEvent[]): Order {
    const order = new Order();
    events.forEach((event) => order.apply(event, false));
    return order;
  }

  // コマンド:注文作成
  static create(params: CreateOrderParams): Order {
    const order = new Order();
    const event: OrderCreatedEvent = {
      eventId: generateId(),
      eventType: 'OrderCreated',
      aggregateId: generateId(),
      aggregateType: 'Order',
      version: 1,
      timestamp: new Date(),
      payload: {
        customerId: params.customerId,
        items: params.items,
        shippingAddress: params.shippingAddress,
        totalAmount: calculateTotal(params.items),
      },
      metadata: {},
    };

    order.apply(event, true);
    return order;
  }

  // コマンド:商品追加
  addItem(item: OrderItem): void {
    if (this._status !== 'draft' && this._status !== 'pending') {
      throw new InvalidOperationError('Cannot add items to this order');
    }

    const event: OrderItemAddedEvent = {
      eventId: generateId(),
      eventType: 'OrderItemAdded',
      aggregateId: this.id,
      aggregateType: 'Order',
      version: this.version + 1,
      timestamp: new Date(),
      payload: {
        item,
        newTotal: this._totalAmount + item.quantity * item.unitPrice,
      },
      metadata: {},
    };

    this.apply(event, true);
  }

  // コマンド:出荷
  ship(trackingNumber: string, carrier: string): void {
    if (this._status !== 'paid') {
      throw new InvalidOperationError('Order must be paid before shipping');
    }

    const event: OrderShippedEvent = {
      eventId: generateId(),
      eventType: 'OrderShipped',
      aggregateId: this.id,
      aggregateType: 'Order',
      version: this.version + 1,
      timestamp: new Date(),
      payload: {
        shippedAt: new Date(),
        trackingNumber,
        carrier,
      },
      metadata: {},
    };

    this.apply(event, true);
  }

  // イベントハンドラ:状態の更新
  protected when(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderCreated': {
        const e = event as OrderCreatedEvent;
        this._id = e.aggregateId;
        this._customerId = e.payload.customerId;
        this._items = e.payload.items;
        this._shippingAddress = e.payload.shippingAddress;
        this._totalAmount = e.payload.totalAmount;
        this._status = 'pending';
        break;
      }
      case 'OrderItemAdded': {
        const e = event as OrderItemAddedEvent;
        this._items.push(e.payload.item);
        this._totalAmount = e.payload.newTotal;
        break;
      }
      case 'OrderShipped': {
        this._status = 'shipped';
        break;
      }
      case 'OrderCancelled': {
        this._status = 'cancelled';
        break;
      }
    }
  }
}

イベントストアの実装

// infrastructure/event-store.ts
import { Injectable } from '@nestjs/common';
import { Pool } from 'pg';

@Injectable()
export class EventStore {
  constructor(private readonly pool: Pool) {}

  async appendEvents(
    streamId: string,
    events: DomainEvent[],
    expectedVersion: number,
  ): Promise<void> {
    const client = await this.pool.connect();

    try {
      await client.query('BEGIN');

      // 楽観的ロック:バージョンチェック
      const versionCheck = await client.query(
        `SELECT MAX(version) as current_version
         FROM events
         WHERE stream_id = $1`,
        [streamId],
      );

      const currentVersion = versionCheck.rows[0]?.current_version ?? 0;

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but found ${currentVersion}`,
        );
      }

      // イベントの追加
      for (const event of events) {
        await client.query(
          `INSERT INTO events (
            event_id, stream_id, event_type, version,
            payload, metadata, timestamp
          ) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
          [
            event.eventId,
            streamId,
            event.eventType,
            event.version,
            JSON.stringify(event.payload),
            JSON.stringify(event.metadata),
            event.timestamp,
          ],
        );
      }

      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async getEvents(
    streamId: string,
    fromVersion: number = 0,
  ): Promise<DomainEvent[]> {
    const result = await this.pool.query(
      `SELECT * FROM events
       WHERE stream_id = $1 AND version > $2
       ORDER BY version ASC`,
      [streamId, fromVersion],
    );

    return result.rows.map(this.mapToEvent);
  }
}

読み取りモデルの同期(プロジェクション)

flowchart TB
    subgraph EventStore["Event Store"]
        E1["Event 1"]
        E2["Event 2"]
        E3["Event 3"]
        E4["..."]
    end

    P["Projector<br/>(イベントハンドラ)"]

    subgraph ReadDBs["Read Databases"]
        RD1["Read DB 1<br/>(注文一覧)"]
        RD2["Read DB 2<br/>(売上集計)"]
    end

    EventStore --> P
    P --> RD1
    P --> RD2

特徴:

  • 非同期処理(結果整合性)
  • 複数の読み取りモデルを生成可能
  • 再構築可能(イベントを再生)
// projections/order-list-projection.ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { OrderReadRepository } from '../infrastructure/read-repositories';

@Injectable()
export class OrderListProjection {
  constructor(private readonly readRepository: OrderReadRepository) {}

  @OnEvent('OrderCreated')
  async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
    await this.readRepository.insert({
      id: event.aggregateId,
      customerId: event.payload.customerId,
      customerName: await this.getCustomerName(event.payload.customerId),
      status: 'pending',
      totalAmount: event.payload.totalAmount,
      itemCount: event.payload.items.length,
      createdAt: event.timestamp,
      updatedAt: event.timestamp,
    });
  }

  @OnEvent('OrderShipped')
  async handleOrderShipped(event: OrderShippedEvent): Promise<void> {
    await this.readRepository.update(event.aggregateId, {
      status: 'shipped',
      shippedAt: event.payload.shippedAt,
      trackingNumber: event.payload.trackingNumber,
      updatedAt: event.timestamp,
    });
  }

  @OnEvent('OrderCancelled')
  async handleOrderCancelled(event: OrderCancelledEvent): Promise<void> {
    await this.readRepository.update(event.aggregateId, {
      status: 'cancelled',
      cancelledAt: event.payload.cancelledAt,
      cancellationReason: event.payload.reason,
      updatedAt: event.timestamp,
    });
  }
}

Sagaパターン

複数の集約にまたがるトランザクションを管理するパターンです。

// sagas/order-saga.ts
import { Injectable } from '@nestjs/common';
import { Saga, ICommand, ofType } from '@nestjs/cqrs';
import { Observable } from 'rxjs';
import { map, filter } from 'rxjs/operators';

@Injectable()
export class OrderSaga {
  @Saga()
  orderCreated = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(OrderCreatedEvent),
      map((event) => {
        // 在庫予約コマンドを発行
        return new ReserveInventoryCommand({
          orderId: event.aggregateId,
          items: event.payload.items,
        });
      }),
    );
  };

  @Saga()
  inventoryReserved = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(InventoryReservedEvent),
      map((event) => {
        // 決済処理コマンドを発行
        return new ProcessPaymentCommand({
          orderId: event.payload.orderId,
          amount: event.payload.totalAmount,
        });
      }),
    );
  };

  @Saga()
  paymentFailed = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(PaymentFailedEvent),
      map((event) => {
        // 補償トランザクション:在庫を解放
        return new ReleaseInventoryCommand({
          orderId: event.payload.orderId,
        });
      }),
    );
  };
}

実装時の注意点

課題対策
結果整合性UIでの適切なフィードバック、楽観的更新
イベントのスキーマ進化イベントのバージョニング、アップキャスト
プロジェクションの再構築スナップショット、並列処理
デバッグの複雑さ相関ID、詳細なログ、イベント可視化ツール
初期導入コスト段階的導入、重要なドメインから開始

参考リンク

← 一覧に戻る