CQRSとは
CQRS(Command Query Responsibility Segregation)は、データの読み取り(Query)と書き込み(Command)を分離するアーキテクチャパターンです。
従来のCRUDアーキテクチャ
flowchart TB
subgraph CRUD["従来のCRUDアーキテクチャ"]
C1["Client"]
C1 -->|CRUD操作| DM["単一のデータモデル<br/>(Read + Write)<br/>← 読み書き同一モデル"]
DM --> DB1["Database"]
end
CQRSアーキテクチャ
flowchart TB
subgraph CQRS["CQRSアーキテクチャ"]
C2["Client"]
C3["Client"]
C2 -->|Command| CM["Command Model<br/>(Write)"]
C3 -->|Query| QM["Query Model<br/>(Read)"]
CM --> WS["Write Store<br/>(正規化)"]
QM --> RS["Read Store<br/>(非正規化)"]
WS -->|同期| RS
end
CQRSを採用すべきケース
| ユースケース | 説明 |
|---|---|
| 読み書きの負荷が非対称 | 読み取りが圧倒的に多い(SNS、ECサイト) |
| 複雑なドメインロジック | 書き込み時に複雑なビジネスルールを適用 |
| 異なる最適化が必要 | 書き込みはトランザクション、読み取りはキャッシュ |
| 複数の読み取りビュー | 同じデータを異なる形式で表示 |
| 監査・履歴要件 | すべての変更履歴を保持する必要がある |
基本的なCQRS実装
Command側(書き込み)
// commands/types.ts
export interface Command {
type: string;
payload: unknown;
metadata: {
userId: string;
timestamp: Date;
correlationId: string;
};
}
export interface CreateOrderCommand extends Command {
type: 'CreateOrder';
payload: {
customerId: string;
items: Array<{
productId: string;
quantity: number;
unitPrice: number;
}>;
shippingAddress: Address;
};
}
export interface CancelOrderCommand extends Command {
type: 'CancelOrder';
payload: {
orderId: string;
reason: string;
};
}
// commands/handlers/order-command-handler.ts
import { Injectable } from '@nestjs/common';
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { CreateOrderCommand, CancelOrderCommand } from '../types';
import { OrderRepository } from '../../domain/repositories/order-repository';
import { Order } from '../../domain/entities/order';
import { EventPublisher } from '../../infrastructure/event-publisher';
@Injectable()
@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
constructor(
private readonly orderRepository: OrderRepository,
private readonly eventPublisher: EventPublisher,
) {}
async execute(command: CreateOrderCommand): Promise<string> {
// ドメインロジックの実行
const order = Order.create({
customerId: command.payload.customerId,
items: command.payload.items,
shippingAddress: command.payload.shippingAddress,
});
// バリデーション
order.validate();
// 永続化
await this.orderRepository.save(order);
// イベント発行
await this.eventPublisher.publish(order.getUncommittedEvents());
return order.id;
}
}
@Injectable()
@CommandHandler(CancelOrderCommand)
export class CancelOrderHandler implements ICommandHandler<CancelOrderCommand> {
constructor(
private readonly orderRepository: OrderRepository,
private readonly eventPublisher: EventPublisher,
) {}
async execute(command: CancelOrderCommand): Promise<void> {
const order = await this.orderRepository.findById(command.payload.orderId);
if (!order) {
throw new OrderNotFoundError(command.payload.orderId);
}
// ドメインロジック:キャンセル可能かチェック
order.cancel(command.payload.reason);
await this.orderRepository.save(order);
await this.eventPublisher.publish(order.getUncommittedEvents());
}
}
Query側(読み取り)
// queries/types.ts
export interface Query {
type: string;
}
export interface GetOrderByIdQuery extends Query {
type: 'GetOrderById';
orderId: string;
}
export interface GetOrdersByCustomerQuery extends Query {
type: 'GetOrdersByCustomer';
customerId: string;
pagination: {
page: number;
limit: number;
};
filters?: {
status?: OrderStatus[];
dateRange?: { from: Date; to: Date };
};
}
// queries/handlers/order-query-handler.ts
import { Injectable } from '@nestjs/common';
import { QueryHandler, IQueryHandler } from '@nestjs/cqrs';
import { GetOrderByIdQuery, GetOrdersByCustomerQuery } from '../types';
import { OrderReadRepository } from '../../infrastructure/read-repositories';
// 読み取り専用DTO
interface OrderDetailDto {
id: string;
customerName: string;
items: Array<{
productName: string;
quantity: number;
totalPrice: number;
}>;
status: string;
totalAmount: number;
createdAt: Date;
}
@Injectable()
@QueryHandler(GetOrderByIdQuery)
export class GetOrderByIdHandler implements IQueryHandler<GetOrderByIdQuery> {
constructor(private readonly readRepository: OrderReadRepository) {}
async execute(query: GetOrderByIdQuery): Promise<OrderDetailDto | null> {
// 読み取り最適化されたストアから直接取得
return this.readRepository.findById(query.orderId);
}
}
@Injectable()
@QueryHandler(GetOrdersByCustomerQuery)
export class GetOrdersByCustomerHandler
implements IQueryHandler<GetOrdersByCustomerQuery>
{
constructor(private readonly readRepository: OrderReadRepository) {}
async execute(query: GetOrdersByCustomerQuery): Promise<{
orders: OrderDetailDto[];
total: number;
hasMore: boolean;
}> {
return this.readRepository.findByCustomer(
query.customerId,
query.pagination,
query.filters,
);
}
}
イベントソーシング
イベントソーシングは、状態の変更を「イベント」として記録し、現在の状態をイベントの積み重ねから導出するパターンです。
従来の状態保存:
flowchart LR
subgraph Traditional["従来の状態保存"]
O["Order #123<br/>status: 'shipped' ← 最新の状態のみ保存<br/>total: 15000"]
end
イベントソーシング:
flowchart TB
subgraph ES["Event Stream for Order #123"]
E1["[1] OrderCreated<br/>items: [...], total: 10000"]
E2["[2] ItemAdded<br/>productId: 'P456', qty: 2"]
E3["[3] OrderConfirmed<br/>confirmedAt: '...'"]
E4["[4] PaymentReceived<br/>amount: 15000"]
E5["[5] OrderShipped<br/>trackingNo: 'TRK123'"]
E1 --> E2 --> E3 --> E4 --> E5
end
E5 --> State["現在の状態 = イベント[1]〜[5]を順番に適用"]
ドメインイベントの定義
// domain/events/order-events.ts
export interface DomainEvent {
eventId: string;
eventType: string;
aggregateId: string;
aggregateType: string;
version: number;
timestamp: Date;
payload: unknown;
metadata: {
userId?: string;
correlationId?: string;
causationId?: string;
};
}
export interface OrderCreatedEvent extends DomainEvent {
eventType: 'OrderCreated';
payload: {
customerId: string;
items: OrderItem[];
shippingAddress: Address;
totalAmount: number;
};
}
export interface OrderItemAddedEvent extends DomainEvent {
eventType: 'OrderItemAdded';
payload: {
item: OrderItem;
newTotal: number;
};
}
export interface OrderShippedEvent extends DomainEvent {
eventType: 'OrderShipped';
payload: {
shippedAt: Date;
trackingNumber: string;
carrier: string;
};
}
export interface OrderCancelledEvent extends DomainEvent {
eventType: 'OrderCancelled';
payload: {
reason: string;
cancelledAt: Date;
refundAmount: number;
};
}
イベントソーシング対応の集約
// domain/aggregates/order.ts
import { AggregateRoot } from '../base/aggregate-root';
import {
OrderCreatedEvent,
OrderItemAddedEvent,
OrderShippedEvent,
OrderCancelledEvent,
} from '../events/order-events';
export class Order extends AggregateRoot {
private _customerId: string;
private _items: OrderItem[] = [];
private _status: OrderStatus = 'draft';
private _totalAmount: number = 0;
private _shippingAddress: Address;
// イベントから状態を復元
static fromEvents(events: DomainEvent[]): Order {
const order = new Order();
events.forEach((event) => order.apply(event, false));
return order;
}
// コマンド:注文作成
static create(params: CreateOrderParams): Order {
const order = new Order();
const event: OrderCreatedEvent = {
eventId: generateId(),
eventType: 'OrderCreated',
aggregateId: generateId(),
aggregateType: 'Order',
version: 1,
timestamp: new Date(),
payload: {
customerId: params.customerId,
items: params.items,
shippingAddress: params.shippingAddress,
totalAmount: calculateTotal(params.items),
},
metadata: {},
};
order.apply(event, true);
return order;
}
// コマンド:商品追加
addItem(item: OrderItem): void {
if (this._status !== 'draft' && this._status !== 'pending') {
throw new InvalidOperationError('Cannot add items to this order');
}
const event: OrderItemAddedEvent = {
eventId: generateId(),
eventType: 'OrderItemAdded',
aggregateId: this.id,
aggregateType: 'Order',
version: this.version + 1,
timestamp: new Date(),
payload: {
item,
newTotal: this._totalAmount + item.quantity * item.unitPrice,
},
metadata: {},
};
this.apply(event, true);
}
// コマンド:出荷
ship(trackingNumber: string, carrier: string): void {
if (this._status !== 'paid') {
throw new InvalidOperationError('Order must be paid before shipping');
}
const event: OrderShippedEvent = {
eventId: generateId(),
eventType: 'OrderShipped',
aggregateId: this.id,
aggregateType: 'Order',
version: this.version + 1,
timestamp: new Date(),
payload: {
shippedAt: new Date(),
trackingNumber,
carrier,
},
metadata: {},
};
this.apply(event, true);
}
// イベントハンドラ:状態の更新
protected when(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderCreated': {
const e = event as OrderCreatedEvent;
this._id = e.aggregateId;
this._customerId = e.payload.customerId;
this._items = e.payload.items;
this._shippingAddress = e.payload.shippingAddress;
this._totalAmount = e.payload.totalAmount;
this._status = 'pending';
break;
}
case 'OrderItemAdded': {
const e = event as OrderItemAddedEvent;
this._items.push(e.payload.item);
this._totalAmount = e.payload.newTotal;
break;
}
case 'OrderShipped': {
this._status = 'shipped';
break;
}
case 'OrderCancelled': {
this._status = 'cancelled';
break;
}
}
}
}
イベントストアの実装
// infrastructure/event-store.ts
import { Injectable } from '@nestjs/common';
import { Pool } from 'pg';
@Injectable()
export class EventStore {
constructor(private readonly pool: Pool) {}
async appendEvents(
streamId: string,
events: DomainEvent[],
expectedVersion: number,
): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// 楽観的ロック:バージョンチェック
const versionCheck = await client.query(
`SELECT MAX(version) as current_version
FROM events
WHERE stream_id = $1`,
[streamId],
);
const currentVersion = versionCheck.rows[0]?.current_version ?? 0;
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but found ${currentVersion}`,
);
}
// イベントの追加
for (const event of events) {
await client.query(
`INSERT INTO events (
event_id, stream_id, event_type, version,
payload, metadata, timestamp
) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
event.eventId,
streamId,
event.eventType,
event.version,
JSON.stringify(event.payload),
JSON.stringify(event.metadata),
event.timestamp,
],
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async getEvents(
streamId: string,
fromVersion: number = 0,
): Promise<DomainEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events
WHERE stream_id = $1 AND version > $2
ORDER BY version ASC`,
[streamId, fromVersion],
);
return result.rows.map(this.mapToEvent);
}
}
読み取りモデルの同期(プロジェクション)
flowchart TB
subgraph EventStore["Event Store"]
E1["Event 1"]
E2["Event 2"]
E3["Event 3"]
E4["..."]
end
P["Projector<br/>(イベントハンドラ)"]
subgraph ReadDBs["Read Databases"]
RD1["Read DB 1<br/>(注文一覧)"]
RD2["Read DB 2<br/>(売上集計)"]
end
EventStore --> P
P --> RD1
P --> RD2
特徴:
- 非同期処理(結果整合性)
- 複数の読み取りモデルを生成可能
- 再構築可能(イベントを再生)
// projections/order-list-projection.ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { OrderReadRepository } from '../infrastructure/read-repositories';
@Injectable()
export class OrderListProjection {
constructor(private readonly readRepository: OrderReadRepository) {}
@OnEvent('OrderCreated')
async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
await this.readRepository.insert({
id: event.aggregateId,
customerId: event.payload.customerId,
customerName: await this.getCustomerName(event.payload.customerId),
status: 'pending',
totalAmount: event.payload.totalAmount,
itemCount: event.payload.items.length,
createdAt: event.timestamp,
updatedAt: event.timestamp,
});
}
@OnEvent('OrderShipped')
async handleOrderShipped(event: OrderShippedEvent): Promise<void> {
await this.readRepository.update(event.aggregateId, {
status: 'shipped',
shippedAt: event.payload.shippedAt,
trackingNumber: event.payload.trackingNumber,
updatedAt: event.timestamp,
});
}
@OnEvent('OrderCancelled')
async handleOrderCancelled(event: OrderCancelledEvent): Promise<void> {
await this.readRepository.update(event.aggregateId, {
status: 'cancelled',
cancelledAt: event.payload.cancelledAt,
cancellationReason: event.payload.reason,
updatedAt: event.timestamp,
});
}
}
Sagaパターン
複数の集約にまたがるトランザクションを管理するパターンです。
// sagas/order-saga.ts
import { Injectable } from '@nestjs/common';
import { Saga, ICommand, ofType } from '@nestjs/cqrs';
import { Observable } from 'rxjs';
import { map, filter } from 'rxjs/operators';
@Injectable()
export class OrderSaga {
@Saga()
orderCreated = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(OrderCreatedEvent),
map((event) => {
// 在庫予約コマンドを発行
return new ReserveInventoryCommand({
orderId: event.aggregateId,
items: event.payload.items,
});
}),
);
};
@Saga()
inventoryReserved = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(InventoryReservedEvent),
map((event) => {
// 決済処理コマンドを発行
return new ProcessPaymentCommand({
orderId: event.payload.orderId,
amount: event.payload.totalAmount,
});
}),
);
};
@Saga()
paymentFailed = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(PaymentFailedEvent),
map((event) => {
// 補償トランザクション:在庫を解放
return new ReleaseInventoryCommand({
orderId: event.payload.orderId,
});
}),
);
};
}
実装時の注意点
| 課題 | 対策 |
|---|---|
| 結果整合性 | UIでの適切なフィードバック、楽観的更新 |
| イベントのスキーマ進化 | イベントのバージョニング、アップキャスト |
| プロジェクションの再構築 | スナップショット、並列処理 |
| デバッグの複雑さ | 相関ID、詳細なログ、イベント可視化ツール |
| 初期導入コスト | 段階的導入、重要なドメインから開始 |