メッセージキューは、分散システムにおけるコンポーネント間の非同期通信を実現する重要な基盤技術です。適切に設計されたメッセージングシステムは、スケーラビリティ、信頼性、疎結合なアーキテクチャを可能にします。
メッセージキューの基本概念
同期処理 vs 非同期処理
flowchart TB
subgraph Sync["同期処理"]
C1["Client"] --> S1["Server"] --> D1["DB"]
D1 --> S1 --> C1
Note1["全処理完了まで待機<br/>レスポンス時間: 500ms+"]
end
subgraph Async["非同期処理(メッセージキュー使用)"]
C2["Client"] --> S2["Server"]
S2 --> C2
S2 --> Q["Queue"]
Q --> W["Consumer<br/>(Worker)"]
W --> D2["DB"]
Note2["即座にレスポンス (50ms)"]
end
メッセージングパターン
flowchart LR
subgraph P2P["1. Point-to-Point (キュー)"]
Prod["Producer"] --> Queue["Queue"] --> Cons["Consumer"]
end
subgraph PubSub["2. Publish/Subscribe (トピック)"]
Pub["Publisher"] --> Topic["Topic"]
Topic --> SubA["Subscriber A"]
Topic --> SubB["Subscriber B"]
Topic --> SubC["Subscriber C"]
end
subgraph ReqRep["3. Request/Reply"]
Client["Client"] --> ReqQ["Request Queue"] --> Server["Server"]
Server --> RepQ["Reply Queue"] --> Client
end
Point-to-Point: 1つのメッセージは1つのConsumerのみが受信、負荷分散に適している
Publish/Subscribe: 1つのメッセージを複数のSubscriberが受信、イベント通知に適している
Request/Reply: 非同期のRPC的なパターン
主要なメッセージキューシステム
システム比較
| 特性 | RabbitMQ | Apache Kafka | Amazon SQS | Redis Streams |
|---|---|---|---|---|
| プロトコル | AMQP | 独自 | HTTP/HTTPS | Redis Protocol |
| メッセージ保持 | 消費後削除 | 永続保持 | 最大14日 | 設定可能 |
| 順序保証 | キュー単位 | パーティション単位 | FIFO可 | ストリーム単位 |
| スループット | 中 | 非常に高 | 高 | 高 |
| レイテンシ | 低 | 中 | 中 | 非常に低 |
| 用途 | タスク分散 | イベントストリーミング | クラウドネイティブ | リアルタイム |
RabbitMQ
// RabbitMQ with amqplib
import amqp from 'amqplib';
// 接続管理クラス
class RabbitMQConnection {
private connection: amqp.Connection | null = null;
private channel: amqp.Channel | null = null;
async connect(url: string): Promise<void> {
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
// プリフェッチ設定(同時処理数)
await this.channel.prefetch(10);
}
async setupExchangeAndQueue(
exchange: string,
queue: string,
routingKey: string
): Promise<void> {
if (!this.channel) throw new Error('Not connected');
// Exchangeの宣言
await this.channel.assertExchange(exchange, 'topic', {
durable: true,
});
// キューの宣言
await this.channel.assertQueue(queue, {
durable: true,
deadLetterExchange: `${exchange}.dlx`, // デッドレターキュー
messageTtl: 86400000, // 24時間
});
// バインディング
await this.channel.bindQueue(queue, exchange, routingKey);
}
async publish(
exchange: string,
routingKey: string,
message: object
): Promise<boolean> {
if (!this.channel) throw new Error('Not connected');
return this.channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(message)),
{
persistent: true, // 永続化
contentType: 'application/json',
timestamp: Date.now(),
messageId: crypto.randomUUID(),
}
);
}
async consume(
queue: string,
handler: (msg: amqp.Message) => Promise<void>
): Promise<void> {
if (!this.channel) throw new Error('Not connected');
await this.channel.consume(queue, async (msg) => {
if (!msg) return;
try {
await handler(msg);
this.channel!.ack(msg); // 成功時にack
} catch (error) {
console.error('Message processing failed:', error);
// リトライ回数をチェック
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
if (retryCount < 3) {
// リトライ(遅延付き)
await this.publishWithDelay(
msg.fields.exchange,
msg.fields.routingKey,
msg.content,
retryCount
);
}
this.channel!.nack(msg, false, false); // DLQへ
}
});
}
private async publishWithDelay(
exchange: string,
routingKey: string,
content: Buffer,
retryCount: number
): Promise<void> {
const delay = Math.pow(2, retryCount) * 1000; // 指数バックオフ
this.channel!.publish(
`${exchange}.delayed`,
routingKey,
content,
{
headers: {
'x-delay': delay,
'x-retry-count': retryCount,
},
}
);
}
async close(): Promise<void> {
await this.channel?.close();
await this.connection?.close();
}
}
// 使用例
const rabbit = new RabbitMQConnection();
await rabbit.connect('amqp://localhost');
await rabbit.setupExchangeAndQueue(
'orders',
'order-processing',
'order.created'
);
// Producer
await rabbit.publish('orders', 'order.created', {
orderId: '12345',
userId: 'user-1',
items: [{ productId: 'prod-1', quantity: 2 }],
});
// Consumer
await rabbit.consume('order-processing', async (msg) => {
const order = JSON.parse(msg.content.toString());
console.log('Processing order:', order.orderId);
// 注文処理ロジック
});
Apache Kafka
// Kafka with kafkajs
import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs';
// Kafka接続設定
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
ssl: true,
sasl: {
mechanism: 'scram-sha-256',
username: process.env.KAFKA_USERNAME!,
password: process.env.KAFKA_PASSWORD!,
},
retry: {
initialRetryTime: 100,
retries: 8,
},
});
// Producer
class KafkaProducer {
private producer: Producer;
constructor() {
this.producer = kafka.producer({
idempotent: true, // 冪等性保証
maxInFlightRequests: 5,
transactionalId: 'my-transactional-id', // トランザクション用
});
}
async connect(): Promise<void> {
await this.producer.connect();
}
async sendBatch(
topic: string,
messages: Array<{ key: string; value: object }>
): Promise<void> {
await this.producer.send({
topic,
messages: messages.map(({ key, value }) => ({
key,
value: JSON.stringify(value),
headers: {
'correlation-id': crypto.randomUUID(),
timestamp: Date.now().toString(),
},
})),
acks: -1, // 全レプリカに書き込み完了を待つ
timeout: 30000,
});
}
// トランザクション付き送信
async sendWithTransaction(
messages: Array<{ topic: string; key: string; value: object }>
): Promise<void> {
const transaction = await this.producer.transaction();
try {
for (const { topic, key, value } of messages) {
await transaction.send({
topic,
messages: [{ key, value: JSON.stringify(value) }],
});
}
await transaction.commit();
} catch (error) {
await transaction.abort();
throw error;
}
}
async disconnect(): Promise<void> {
await this.producer.disconnect();
}
}
// Consumer
class KafkaConsumer {
private consumer: Consumer;
constructor(groupId: string) {
this.consumer = kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576, // 1MB
});
}
async connect(): Promise<void> {
await this.consumer.connect();
}
async subscribe(topics: string[]): Promise<void> {
await this.consumer.subscribe({
topics,
fromBeginning: false,
});
}
async run(
handler: (payload: EachMessagePayload) => Promise<void>
): Promise<void> {
await this.consumer.run({
eachMessage: async (payload) => {
const { topic, partition, message } = payload;
console.log({
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
});
try {
await handler(payload);
} catch (error) {
console.error('Message processing failed:', error);
// エラーハンドリング(DLQへの送信など)
}
},
});
}
async disconnect(): Promise<void> {
await this.consumer.disconnect();
}
}
// Exactly-Once Semantics (EOS) の実装
class KafkaEOSProcessor {
private producer: Producer;
private consumer: Consumer;
async processWithEOS(
inputTopic: string,
outputTopic: string,
transform: (value: any) => any
): Promise<void> {
await this.consumer.subscribe({ topics: [inputTopic] });
await this.consumer.run({
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => {
const transaction = await this.producer.transaction();
try {
for (const message of batch.messages) {
const value = JSON.parse(message.value!.toString());
const transformed = transform(value);
await transaction.send({
topic: outputTopic,
messages: [{ value: JSON.stringify(transformed) }],
});
resolveOffset(message.offset);
}
await transaction.sendOffsets({
consumerGroupId: 'my-group',
topics: [{
topic: inputTopic,
partitions: [{
partition: batch.partition,
offset: batch.lastOffset(),
}],
}],
});
await transaction.commit();
} catch (error) {
await transaction.abort();
throw error;
}
},
});
}
}
Amazon SQS
// AWS SQS with @aws-sdk/client-sqs
import {
SQSClient,
SendMessageCommand,
ReceiveMessageCommand,
DeleteMessageCommand,
SendMessageBatchCommand,
Message,
} from '@aws-sdk/client-sqs';
class SQSQueue {
private client: SQSClient;
private queueUrl: string;
constructor(queueUrl: string, region: string = 'ap-northeast-1') {
this.client = new SQSClient({ region });
this.queueUrl = queueUrl;
}
async sendMessage(
body: object,
options: {
delaySeconds?: number;
messageGroupId?: string; // FIFO用
deduplicationId?: string; // FIFO用
} = {}
): Promise<string> {
const command = new SendMessageCommand({
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify(body),
DelaySeconds: options.delaySeconds,
MessageGroupId: options.messageGroupId,
MessageDeduplicationId: options.deduplicationId,
MessageAttributes: {
Timestamp: {
DataType: 'Number',
StringValue: Date.now().toString(),
},
},
});
const response = await this.client.send(command);
return response.MessageId!;
}
async sendBatch(
messages: Array<{ id: string; body: object; delaySeconds?: number }>
): Promise<void> {
// SQSは1回のバッチで最大10メッセージ
const chunks = this.chunkArray(messages, 10);
for (const chunk of chunks) {
const command = new SendMessageBatchCommand({
QueueUrl: this.queueUrl,
Entries: chunk.map(({ id, body, delaySeconds }) => ({
Id: id,
MessageBody: JSON.stringify(body),
DelaySeconds: delaySeconds,
})),
});
await this.client.send(command);
}
}
async receiveMessages(maxMessages: number = 10): Promise<Message[]> {
const command = new ReceiveMessageCommand({
QueueUrl: this.queueUrl,
MaxNumberOfMessages: Math.min(maxMessages, 10),
WaitTimeSeconds: 20, // ロングポーリング
VisibilityTimeout: 30,
AttributeNames: ['All'],
MessageAttributeNames: ['All'],
});
const response = await this.client.send(command);
return response.Messages || [];
}
async deleteMessage(receiptHandle: string): Promise<void> {
const command = new DeleteMessageCommand({
QueueUrl: this.queueUrl,
ReceiptHandle: receiptHandle,
});
await this.client.send(command);
}
// ポーリングループ
async startPolling(
handler: (message: Message) => Promise<void>,
options: { concurrency?: number } = {}
): Promise<void> {
const { concurrency = 5 } = options;
while (true) {
const messages = await this.receiveMessages(concurrency);
if (messages.length === 0) continue;
await Promise.all(
messages.map(async (message) => {
try {
await handler(message);
await this.deleteMessage(message.ReceiptHandle!);
} catch (error) {
console.error('Message processing failed:', error);
// メッセージは自動的にキューに戻る(Visibility Timeout後)
}
})
);
}
}
private chunkArray<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
}
// Lambda統合(SQSトリガー)
export const handler = async (event: {
Records: Array<{
body: string;
receiptHandle: string;
messageId: string;
}>;
}): Promise<{ batchItemFailures: Array<{ itemIdentifier: string }> }> => {
const failures: string[] = [];
await Promise.all(
event.Records.map(async (record) => {
try {
const body = JSON.parse(record.body);
await processMessage(body);
} catch (error) {
console.error(`Failed to process ${record.messageId}:`, error);
failures.push(record.messageId);
}
})
);
// 部分的なバッチ失敗をレポート
return {
batchItemFailures: failures.map((id) => ({ itemIdentifier: id })),
};
};
Redis Streams
// Redis Streams with ioredis
import Redis from 'ioredis';
class RedisStreamQueue {
private redis: Redis;
private consumerGroup: string;
private consumerName: string;
constructor(
redisUrl: string,
consumerGroup: string,
consumerName: string
) {
this.redis = new Redis(redisUrl);
this.consumerGroup = consumerGroup;
this.consumerName = consumerName;
}
// Consumer Groupの作成
async createConsumerGroup(stream: string): Promise<void> {
try {
await this.redis.xgroup(
'CREATE',
stream,
this.consumerGroup,
'0',
'MKSTREAM'
);
} catch (error: any) {
if (!error.message.includes('BUSYGROUP')) {
throw error;
}
// グループが既に存在する場合は無視
}
}
// メッセージの追加
async addMessage(
stream: string,
data: Record<string, string>,
maxLen?: number
): Promise<string> {
const args: (string | number)[] = [stream];
if (maxLen) {
args.push('MAXLEN', '~', maxLen); // 近似的な長さ制限
}
args.push('*'); // 自動ID生成
// データをフラット化
for (const [key, value] of Object.entries(data)) {
args.push(key, value);
}
return (await this.redis.xadd(...args)) as string;
}
// メッセージの読み取り(Consumer Group)
async readMessages(
stream: string,
count: number = 10,
blockMs: number = 5000
): Promise<Array<{ id: string; data: Record<string, string> }>> {
const result = await this.redis.xreadgroup(
'GROUP',
this.consumerGroup,
this.consumerName,
'COUNT',
count,
'BLOCK',
blockMs,
'STREAMS',
stream,
'>' // 未配信メッセージのみ
);
if (!result) return [];
const messages: Array<{ id: string; data: Record<string, string> }> = [];
for (const [, entries] of result) {
for (const [id, fields] of entries) {
const data: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) {
data[fields[i]] = fields[i + 1];
}
messages.push({ id, data });
}
}
return messages;
}
// 処理完了を確認
async acknowledgeMessage(stream: string, messageId: string): Promise<void> {
await this.redis.xack(stream, this.consumerGroup, messageId);
}
// 未確認メッセージの再処理
async claimPendingMessages(
stream: string,
minIdleMs: number = 60000,
count: number = 10
): Promise<Array<{ id: string; data: Record<string, string> }>> {
// ペンディングメッセージを取得
const pending = await this.redis.xpending(
stream,
this.consumerGroup,
'-',
'+',
count
);
const messages: Array<{ id: string; data: Record<string, string> }> = [];
for (const [id, , idleTime] of pending) {
if (idleTime >= minIdleMs) {
// メッセージを奪取
const claimed = await this.redis.xclaim(
stream,
this.consumerGroup,
this.consumerName,
minIdleMs,
id
);
if (claimed.length > 0) {
const [, fields] = claimed[0];
const data: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) {
data[fields[i]] = fields[i + 1];
}
messages.push({ id, data });
}
}
}
return messages;
}
// メッセージ処理ループ
async startProcessing(
stream: string,
handler: (message: { id: string; data: Record<string, string> }) => Promise<void>
): Promise<void> {
await this.createConsumerGroup(stream);
while (true) {
// 新規メッセージを処理
const messages = await this.readMessages(stream);
for (const message of messages) {
try {
await handler(message);
await this.acknowledgeMessage(stream, message.id);
} catch (error) {
console.error(`Failed to process message ${message.id}:`, error);
// メッセージはpendingのまま残る
}
}
// 古いpendingメッセージを再処理
const pendingMessages = await this.claimPendingMessages(stream);
for (const message of pendingMessages) {
try {
await handler(message);
await this.acknowledgeMessage(stream, message.id);
} catch (error) {
console.error(`Failed to reprocess message ${message.id}:`, error);
}
}
}
}
async close(): Promise<void> {
await this.redis.quit();
}
}
設計パターンとベストプラクティス
メッセージの冪等性
// 冪等性キーによる重複排除
interface IdempotentMessage {
idempotencyKey: string;
payload: unknown;
timestamp: number;
}
class IdempotentProcessor {
private processedKeys: Map<string, { result: unknown; expiry: number }> = new Map();
private ttlMs: number = 24 * 60 * 60 * 1000; // 24時間
async process<T>(
message: IdempotentMessage,
handler: (payload: unknown) => Promise<T>
): Promise<T> {
const { idempotencyKey, payload } = message;
// 既に処理済みかチェック
const cached = this.processedKeys.get(idempotencyKey);
if (cached && cached.expiry > Date.now()) {
console.log(`Returning cached result for ${idempotencyKey}`);
return cached.result as T;
}
// 処理実行
const result = await handler(payload);
// 結果をキャッシュ
this.processedKeys.set(idempotencyKey, {
result,
expiry: Date.now() + this.ttlMs,
});
return result;
}
// Redis版(分散環境向け)
async processWithRedis<T>(
redis: Redis,
message: IdempotentMessage,
handler: (payload: unknown) => Promise<T>
): Promise<T> {
const { idempotencyKey, payload } = message;
const cacheKey = `idempotency:${idempotencyKey}`;
// 既に処理済みかチェック
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached) as T;
}
// 排他制御(処理中フラグ)
const lockKey = `lock:${idempotencyKey}`;
const acquired = await redis.set(lockKey, '1', 'EX', 30, 'NX');
if (!acquired) {
// 他のワーカーが処理中、結果を待機
await this.waitForResult(redis, cacheKey);
const result = await redis.get(cacheKey);
return JSON.parse(result!) as T;
}
try {
const result = await handler(payload);
// 結果をキャッシュ
await redis.setex(cacheKey, this.ttlMs / 1000, JSON.stringify(result));
return result;
} finally {
await redis.del(lockKey);
}
}
private async waitForResult(
redis: Redis,
key: string,
maxWaitMs: number = 30000
): Promise<void> {
const startTime = Date.now();
while (Date.now() - startTime < maxWaitMs) {
const result = await redis.get(key);
if (result) return;
await new Promise((resolve) => setTimeout(resolve, 100));
}
throw new Error('Timeout waiting for result');
}
}
デッドレターキュー(DLQ)
flowchart TB
MQ["Main Queue"] --> Consumer["Consumer"]
Consumer --> Success{"処理結果"}
Success -->|成功| Complete["完了"]
Success -->|失敗| Retry{"リトライ回数 < 3?"}
Retry -->|YES| Requeue["再キューイング<br/>(遅延付き)"]
Requeue --> MQ
Retry -->|NO| DLQ["Dead Letter Queue"]
DLQ --> Alert["監視・アラート<br/>手動対応"]
style DLQ fill:#f99,stroke:#f00
style Alert fill:#ff9,stroke:#f90
// DLQ実装例
interface DeadLetterMessage {
originalMessage: unknown;
error: string;
failedAt: string;
retryCount: number;
originalQueue: string;
}
class DeadLetterQueueHandler {
constructor(
private mainQueue: MessageQueue,
private dlq: MessageQueue
) {}
async processWithDLQ<T>(
message: Message,
handler: (msg: Message) => Promise<T>,
maxRetries: number = 3
): Promise<T | null> {
const retryCount = this.getRetryCount(message);
try {
return await handler(message);
} catch (error) {
if (retryCount < maxRetries) {
// リトライ
await this.requeueWithDelay(message, retryCount + 1);
} else {
// DLQへ送信
await this.sendToDLQ(message, error as Error, retryCount);
}
return null;
}
}
private getRetryCount(message: Message): number {
return message.headers?.['x-retry-count'] || 0;
}
private async requeueWithDelay(
message: Message,
retryCount: number
): Promise<void> {
const delay = Math.min(
1000 * Math.pow(2, retryCount), // 指数バックオフ
60000 // 最大1分
);
await this.mainQueue.publish({
...message,
headers: {
...message.headers,
'x-retry-count': retryCount,
'x-retry-delay': delay,
},
}, { delayMs: delay });
}
private async sendToDLQ(
message: Message,
error: Error,
retryCount: number
): Promise<void> {
const dlqMessage: DeadLetterMessage = {
originalMessage: message,
error: error.message,
failedAt: new Date().toISOString(),
retryCount,
originalQueue: this.mainQueue.name,
};
await this.dlq.publish(dlqMessage);
// アラート送信
await this.sendAlert(dlqMessage);
}
private async sendAlert(message: DeadLetterMessage): Promise<void> {
console.error('Message sent to DLQ:', {
error: message.error,
retryCount: message.retryCount,
originalQueue: message.originalQueue,
});
// Slack/PagerDuty等への通知
}
}
順序保証
// パーティションキーによる順序保証
class OrderedMessageProcessor {
private processingMap: Map<string, Promise<void>> = new Map();
async processInOrder<T>(
partitionKey: string,
handler: () => Promise<T>
): Promise<T> {
// 同じパーティションキーの前の処理を待つ
const previousPromise = this.processingMap.get(partitionKey) || Promise.resolve();
const currentPromise = previousPromise.then(handler);
this.processingMap.set(partitionKey, currentPromise.then(() => {}));
try {
return await currentPromise;
} finally {
// 処理完了後、マップをクリーンアップ
if (this.processingMap.get(partitionKey) === currentPromise) {
this.processingMap.delete(partitionKey);
}
}
}
}
// 使用例
const processor = new OrderedMessageProcessor();
// ユーザーIDをパーティションキーとして使用
// 同じユーザーのメッセージは順序通りに処理される
await processor.processInOrder(message.userId, async () => {
await processUserMessage(message);
});
監視とオブザーバビリティ
// メトリクス収集
interface QueueMetrics {
messagesPublished: number;
messagesConsumed: number;
messagesFailed: number;
processingLatencyMs: number[];
queueDepth: number;
}
class MetricsCollector {
private metrics: QueueMetrics = {
messagesPublished: 0,
messagesConsumed: 0,
messagesFailed: 0,
processingLatencyMs: [],
queueDepth: 0,
};
recordPublish(): void {
this.metrics.messagesPublished++;
}
recordConsume(latencyMs: number): void {
this.metrics.messagesConsumed++;
this.metrics.processingLatencyMs.push(latencyMs);
}
recordFailure(): void {
this.metrics.messagesFailed++;
}
updateQueueDepth(depth: number): void {
this.metrics.queueDepth = depth;
}
getStats(): {
throughput: number;
errorRate: number;
avgLatency: number;
p99Latency: number;
} {
const total = this.metrics.messagesConsumed + this.metrics.messagesFailed;
return {
throughput: this.metrics.messagesConsumed,
errorRate: total > 0 ? this.metrics.messagesFailed / total : 0,
avgLatency: this.calculateAverage(this.metrics.processingLatencyMs),
p99Latency: this.calculatePercentile(this.metrics.processingLatencyMs, 99),
};
}
private calculateAverage(values: number[]): number {
if (values.length === 0) return 0;
return values.reduce((a, b) => a + b, 0) / values.length;
}
private calculatePercentile(values: number[], percentile: number): number {
if (values.length === 0) return 0;
const sorted = [...values].sort((a, b) => a - b);
const index = Math.ceil((percentile / 100) * sorted.length) - 1;
return sorted[index];
}
}
まとめ
メッセージキューは分散システムの要となる技術です。
選定基準
| 要件 | 推奨システム |
|---|---|
| シンプルなタスクキュー | RabbitMQ, SQS |
| イベントストリーミング | Kafka |
| リアルタイム処理 | Redis Streams |
| サーバーレス | SQS + Lambda |
| ログ集約 | Kafka |
設計時の注意点
- 冪等性の確保: 重複処理を防ぐ
- 順序保証: 必要な場合のみ、パーティション単位で
- エラーハンドリング: DLQとリトライ戦略
- 監視: スループット、レイテンシ、エラー率
適切なメッセージキューの選択と設計により、スケーラブルで信頼性の高いシステムを構築できます。
参考リンク
- RabbitMQ Documentation
- Apache Kafka Documentation
- Amazon SQS Developer Guide
- Redis Streams Introduction