queue

メッセージキュー設計ガイド - 非同期処理とイベント駆動アーキテクチャ

2025.12.02

メッセージキューは、分散システムにおけるコンポーネント間の非同期通信を実現する重要な基盤技術です。適切に設計されたメッセージングシステムは、スケーラビリティ、信頼性、疎結合なアーキテクチャを可能にします。

メッセージキューの基本概念

同期処理 vs 非同期処理

flowchart TB
    subgraph Sync["同期処理"]
        C1["Client"] --> S1["Server"] --> D1["DB"]
        D1 --> S1 --> C1
        Note1["全処理完了まで待機<br/>レスポンス時間: 500ms+"]
    end

    subgraph Async["非同期処理(メッセージキュー使用)"]
        C2["Client"] --> S2["Server"]
        S2 --> C2
        S2 --> Q["Queue"]
        Q --> W["Consumer<br/>(Worker)"]
        W --> D2["DB"]
        Note2["即座にレスポンス (50ms)"]
    end

メッセージングパターン

flowchart LR
    subgraph P2P["1. Point-to-Point (キュー)"]
        Prod["Producer"] --> Queue["Queue"] --> Cons["Consumer"]
    end

    subgraph PubSub["2. Publish/Subscribe (トピック)"]
        Pub["Publisher"] --> Topic["Topic"]
        Topic --> SubA["Subscriber A"]
        Topic --> SubB["Subscriber B"]
        Topic --> SubC["Subscriber C"]
    end

    subgraph ReqRep["3. Request/Reply"]
        Client["Client"] --> ReqQ["Request Queue"] --> Server["Server"]
        Server --> RepQ["Reply Queue"] --> Client
    end

Point-to-Point: 1つのメッセージは1つのConsumerのみが受信、負荷分散に適している

Publish/Subscribe: 1つのメッセージを複数のSubscriberが受信、イベント通知に適している

Request/Reply: 非同期のRPC的なパターン

主要なメッセージキューシステム

システム比較

特性RabbitMQApache KafkaAmazon SQSRedis Streams
プロトコルAMQP独自HTTP/HTTPSRedis Protocol
メッセージ保持消費後削除永続保持最大14日設定可能
順序保証キュー単位パーティション単位FIFO可ストリーム単位
スループット非常に高
レイテンシ非常に低
用途タスク分散イベントストリーミングクラウドネイティブリアルタイム

RabbitMQ

// RabbitMQ with amqplib

import amqp from 'amqplib';

// 接続管理クラス
class RabbitMQConnection {
  private connection: amqp.Connection | null = null;
  private channel: amqp.Channel | null = null;

  async connect(url: string): Promise<void> {
    this.connection = await amqp.connect(url);
    this.channel = await this.connection.createChannel();

    // プリフェッチ設定(同時処理数)
    await this.channel.prefetch(10);
  }

  async setupExchangeAndQueue(
    exchange: string,
    queue: string,
    routingKey: string
  ): Promise<void> {
    if (!this.channel) throw new Error('Not connected');

    // Exchangeの宣言
    await this.channel.assertExchange(exchange, 'topic', {
      durable: true,
    });

    // キューの宣言
    await this.channel.assertQueue(queue, {
      durable: true,
      deadLetterExchange: `${exchange}.dlx`,  // デッドレターキュー
      messageTtl: 86400000,  // 24時間
    });

    // バインディング
    await this.channel.bindQueue(queue, exchange, routingKey);
  }

  async publish(
    exchange: string,
    routingKey: string,
    message: object
  ): Promise<boolean> {
    if (!this.channel) throw new Error('Not connected');

    return this.channel.publish(
      exchange,
      routingKey,
      Buffer.from(JSON.stringify(message)),
      {
        persistent: true,  // 永続化
        contentType: 'application/json',
        timestamp: Date.now(),
        messageId: crypto.randomUUID(),
      }
    );
  }

  async consume(
    queue: string,
    handler: (msg: amqp.Message) => Promise<void>
  ): Promise<void> {
    if (!this.channel) throw new Error('Not connected');

    await this.channel.consume(queue, async (msg) => {
      if (!msg) return;

      try {
        await handler(msg);
        this.channel!.ack(msg);  // 成功時にack
      } catch (error) {
        console.error('Message processing failed:', error);

        // リトライ回数をチェック
        const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;

        if (retryCount < 3) {
          // リトライ(遅延付き)
          await this.publishWithDelay(
            msg.fields.exchange,
            msg.fields.routingKey,
            msg.content,
            retryCount
          );
        }

        this.channel!.nack(msg, false, false);  // DLQへ
      }
    });
  }

  private async publishWithDelay(
    exchange: string,
    routingKey: string,
    content: Buffer,
    retryCount: number
  ): Promise<void> {
    const delay = Math.pow(2, retryCount) * 1000;  // 指数バックオフ

    this.channel!.publish(
      `${exchange}.delayed`,
      routingKey,
      content,
      {
        headers: {
          'x-delay': delay,
          'x-retry-count': retryCount,
        },
      }
    );
  }

  async close(): Promise<void> {
    await this.channel?.close();
    await this.connection?.close();
  }
}

// 使用例
const rabbit = new RabbitMQConnection();
await rabbit.connect('amqp://localhost');

await rabbit.setupExchangeAndQueue(
  'orders',
  'order-processing',
  'order.created'
);

// Producer
await rabbit.publish('orders', 'order.created', {
  orderId: '12345',
  userId: 'user-1',
  items: [{ productId: 'prod-1', quantity: 2 }],
});

// Consumer
await rabbit.consume('order-processing', async (msg) => {
  const order = JSON.parse(msg.content.toString());
  console.log('Processing order:', order.orderId);
  // 注文処理ロジック
});

Apache Kafka

// Kafka with kafkajs

import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs';

// Kafka接続設定
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
  ssl: true,
  sasl: {
    mechanism: 'scram-sha-256',
    username: process.env.KAFKA_USERNAME!,
    password: process.env.KAFKA_PASSWORD!,
  },
  retry: {
    initialRetryTime: 100,
    retries: 8,
  },
});

// Producer
class KafkaProducer {
  private producer: Producer;

  constructor() {
    this.producer = kafka.producer({
      idempotent: true,  // 冪等性保証
      maxInFlightRequests: 5,
      transactionalId: 'my-transactional-id',  // トランザクション用
    });
  }

  async connect(): Promise<void> {
    await this.producer.connect();
  }

  async sendBatch(
    topic: string,
    messages: Array<{ key: string; value: object }>
  ): Promise<void> {
    await this.producer.send({
      topic,
      messages: messages.map(({ key, value }) => ({
        key,
        value: JSON.stringify(value),
        headers: {
          'correlation-id': crypto.randomUUID(),
          timestamp: Date.now().toString(),
        },
      })),
      acks: -1,  // 全レプリカに書き込み完了を待つ
      timeout: 30000,
    });
  }

  // トランザクション付き送信
  async sendWithTransaction(
    messages: Array<{ topic: string; key: string; value: object }>
  ): Promise<void> {
    const transaction = await this.producer.transaction();

    try {
      for (const { topic, key, value } of messages) {
        await transaction.send({
          topic,
          messages: [{ key, value: JSON.stringify(value) }],
        });
      }
      await transaction.commit();
    } catch (error) {
      await transaction.abort();
      throw error;
    }
  }

  async disconnect(): Promise<void> {
    await this.producer.disconnect();
  }
}

// Consumer
class KafkaConsumer {
  private consumer: Consumer;

  constructor(groupId: string) {
    this.consumer = kafka.consumer({
      groupId,
      sessionTimeout: 30000,
      heartbeatInterval: 3000,
      maxBytesPerPartition: 1048576,  // 1MB
    });
  }

  async connect(): Promise<void> {
    await this.consumer.connect();
  }

  async subscribe(topics: string[]): Promise<void> {
    await this.consumer.subscribe({
      topics,
      fromBeginning: false,
    });
  }

  async run(
    handler: (payload: EachMessagePayload) => Promise<void>
  ): Promise<void> {
    await this.consumer.run({
      eachMessage: async (payload) => {
        const { topic, partition, message } = payload;

        console.log({
          topic,
          partition,
          offset: message.offset,
          key: message.key?.toString(),
        });

        try {
          await handler(payload);
        } catch (error) {
          console.error('Message processing failed:', error);
          // エラーハンドリング(DLQへの送信など)
        }
      },
    });
  }

  async disconnect(): Promise<void> {
    await this.consumer.disconnect();
  }
}

// Exactly-Once Semantics (EOS) の実装
class KafkaEOSProcessor {
  private producer: Producer;
  private consumer: Consumer;

  async processWithEOS(
    inputTopic: string,
    outputTopic: string,
    transform: (value: any) => any
  ): Promise<void> {
    await this.consumer.subscribe({ topics: [inputTopic] });

    await this.consumer.run({
      eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => {
        const transaction = await this.producer.transaction();

        try {
          for (const message of batch.messages) {
            const value = JSON.parse(message.value!.toString());
            const transformed = transform(value);

            await transaction.send({
              topic: outputTopic,
              messages: [{ value: JSON.stringify(transformed) }],
            });

            resolveOffset(message.offset);
          }

          await transaction.sendOffsets({
            consumerGroupId: 'my-group',
            topics: [{
              topic: inputTopic,
              partitions: [{
                partition: batch.partition,
                offset: batch.lastOffset(),
              }],
            }],
          });

          await transaction.commit();
        } catch (error) {
          await transaction.abort();
          throw error;
        }
      },
    });
  }
}

Amazon SQS

// AWS SQS with @aws-sdk/client-sqs

import {
  SQSClient,
  SendMessageCommand,
  ReceiveMessageCommand,
  DeleteMessageCommand,
  SendMessageBatchCommand,
  Message,
} from '@aws-sdk/client-sqs';

class SQSQueue {
  private client: SQSClient;
  private queueUrl: string;

  constructor(queueUrl: string, region: string = 'ap-northeast-1') {
    this.client = new SQSClient({ region });
    this.queueUrl = queueUrl;
  }

  async sendMessage(
    body: object,
    options: {
      delaySeconds?: number;
      messageGroupId?: string;  // FIFO用
      deduplicationId?: string;  // FIFO用
    } = {}
  ): Promise<string> {
    const command = new SendMessageCommand({
      QueueUrl: this.queueUrl,
      MessageBody: JSON.stringify(body),
      DelaySeconds: options.delaySeconds,
      MessageGroupId: options.messageGroupId,
      MessageDeduplicationId: options.deduplicationId,
      MessageAttributes: {
        Timestamp: {
          DataType: 'Number',
          StringValue: Date.now().toString(),
        },
      },
    });

    const response = await this.client.send(command);
    return response.MessageId!;
  }

  async sendBatch(
    messages: Array<{ id: string; body: object; delaySeconds?: number }>
  ): Promise<void> {
    // SQSは1回のバッチで最大10メッセージ
    const chunks = this.chunkArray(messages, 10);

    for (const chunk of chunks) {
      const command = new SendMessageBatchCommand({
        QueueUrl: this.queueUrl,
        Entries: chunk.map(({ id, body, delaySeconds }) => ({
          Id: id,
          MessageBody: JSON.stringify(body),
          DelaySeconds: delaySeconds,
        })),
      });

      await this.client.send(command);
    }
  }

  async receiveMessages(maxMessages: number = 10): Promise<Message[]> {
    const command = new ReceiveMessageCommand({
      QueueUrl: this.queueUrl,
      MaxNumberOfMessages: Math.min(maxMessages, 10),
      WaitTimeSeconds: 20,  // ロングポーリング
      VisibilityTimeout: 30,
      AttributeNames: ['All'],
      MessageAttributeNames: ['All'],
    });

    const response = await this.client.send(command);
    return response.Messages || [];
  }

  async deleteMessage(receiptHandle: string): Promise<void> {
    const command = new DeleteMessageCommand({
      QueueUrl: this.queueUrl,
      ReceiptHandle: receiptHandle,
    });

    await this.client.send(command);
  }

  // ポーリングループ
  async startPolling(
    handler: (message: Message) => Promise<void>,
    options: { concurrency?: number } = {}
  ): Promise<void> {
    const { concurrency = 5 } = options;

    while (true) {
      const messages = await this.receiveMessages(concurrency);

      if (messages.length === 0) continue;

      await Promise.all(
        messages.map(async (message) => {
          try {
            await handler(message);
            await this.deleteMessage(message.ReceiptHandle!);
          } catch (error) {
            console.error('Message processing failed:', error);
            // メッセージは自動的にキューに戻る(Visibility Timeout後)
          }
        })
      );
    }
  }

  private chunkArray<T>(array: T[], size: number): T[][] {
    const chunks: T[][] = [];
    for (let i = 0; i < array.length; i += size) {
      chunks.push(array.slice(i, i + size));
    }
    return chunks;
  }
}

// Lambda統合(SQSトリガー)
export const handler = async (event: {
  Records: Array<{
    body: string;
    receiptHandle: string;
    messageId: string;
  }>;
}): Promise<{ batchItemFailures: Array<{ itemIdentifier: string }> }> => {
  const failures: string[] = [];

  await Promise.all(
    event.Records.map(async (record) => {
      try {
        const body = JSON.parse(record.body);
        await processMessage(body);
      } catch (error) {
        console.error(`Failed to process ${record.messageId}:`, error);
        failures.push(record.messageId);
      }
    })
  );

  // 部分的なバッチ失敗をレポート
  return {
    batchItemFailures: failures.map((id) => ({ itemIdentifier: id })),
  };
};

Redis Streams

// Redis Streams with ioredis

import Redis from 'ioredis';

class RedisStreamQueue {
  private redis: Redis;
  private consumerGroup: string;
  private consumerName: string;

  constructor(
    redisUrl: string,
    consumerGroup: string,
    consumerName: string
  ) {
    this.redis = new Redis(redisUrl);
    this.consumerGroup = consumerGroup;
    this.consumerName = consumerName;
  }

  // Consumer Groupの作成
  async createConsumerGroup(stream: string): Promise<void> {
    try {
      await this.redis.xgroup(
        'CREATE',
        stream,
        this.consumerGroup,
        '0',
        'MKSTREAM'
      );
    } catch (error: any) {
      if (!error.message.includes('BUSYGROUP')) {
        throw error;
      }
      // グループが既に存在する場合は無視
    }
  }

  // メッセージの追加
  async addMessage(
    stream: string,
    data: Record<string, string>,
    maxLen?: number
  ): Promise<string> {
    const args: (string | number)[] = [stream];

    if (maxLen) {
      args.push('MAXLEN', '~', maxLen);  // 近似的な長さ制限
    }

    args.push('*');  // 自動ID生成

    // データをフラット化
    for (const [key, value] of Object.entries(data)) {
      args.push(key, value);
    }

    return (await this.redis.xadd(...args)) as string;
  }

  // メッセージの読み取り(Consumer Group)
  async readMessages(
    stream: string,
    count: number = 10,
    blockMs: number = 5000
  ): Promise<Array<{ id: string; data: Record<string, string> }>> {
    const result = await this.redis.xreadgroup(
      'GROUP',
      this.consumerGroup,
      this.consumerName,
      'COUNT',
      count,
      'BLOCK',
      blockMs,
      'STREAMS',
      stream,
      '>'  // 未配信メッセージのみ
    );

    if (!result) return [];

    const messages: Array<{ id: string; data: Record<string, string> }> = [];

    for (const [, entries] of result) {
      for (const [id, fields] of entries) {
        const data: Record<string, string> = {};
        for (let i = 0; i < fields.length; i += 2) {
          data[fields[i]] = fields[i + 1];
        }
        messages.push({ id, data });
      }
    }

    return messages;
  }

  // 処理完了を確認
  async acknowledgeMessage(stream: string, messageId: string): Promise<void> {
    await this.redis.xack(stream, this.consumerGroup, messageId);
  }

  // 未確認メッセージの再処理
  async claimPendingMessages(
    stream: string,
    minIdleMs: number = 60000,
    count: number = 10
  ): Promise<Array<{ id: string; data: Record<string, string> }>> {
    // ペンディングメッセージを取得
    const pending = await this.redis.xpending(
      stream,
      this.consumerGroup,
      '-',
      '+',
      count
    );

    const messages: Array<{ id: string; data: Record<string, string> }> = [];

    for (const [id, , idleTime] of pending) {
      if (idleTime >= minIdleMs) {
        // メッセージを奪取
        const claimed = await this.redis.xclaim(
          stream,
          this.consumerGroup,
          this.consumerName,
          minIdleMs,
          id
        );

        if (claimed.length > 0) {
          const [, fields] = claimed[0];
          const data: Record<string, string> = {};
          for (let i = 0; i < fields.length; i += 2) {
            data[fields[i]] = fields[i + 1];
          }
          messages.push({ id, data });
        }
      }
    }

    return messages;
  }

  // メッセージ処理ループ
  async startProcessing(
    stream: string,
    handler: (message: { id: string; data: Record<string, string> }) => Promise<void>
  ): Promise<void> {
    await this.createConsumerGroup(stream);

    while (true) {
      // 新規メッセージを処理
      const messages = await this.readMessages(stream);

      for (const message of messages) {
        try {
          await handler(message);
          await this.acknowledgeMessage(stream, message.id);
        } catch (error) {
          console.error(`Failed to process message ${message.id}:`, error);
          // メッセージはpendingのまま残る
        }
      }

      // 古いpendingメッセージを再処理
      const pendingMessages = await this.claimPendingMessages(stream);
      for (const message of pendingMessages) {
        try {
          await handler(message);
          await this.acknowledgeMessage(stream, message.id);
        } catch (error) {
          console.error(`Failed to reprocess message ${message.id}:`, error);
        }
      }
    }
  }

  async close(): Promise<void> {
    await this.redis.quit();
  }
}

設計パターンとベストプラクティス

メッセージの冪等性

// 冪等性キーによる重複排除

interface IdempotentMessage {
  idempotencyKey: string;
  payload: unknown;
  timestamp: number;
}

class IdempotentProcessor {
  private processedKeys: Map<string, { result: unknown; expiry: number }> = new Map();
  private ttlMs: number = 24 * 60 * 60 * 1000;  // 24時間

  async process<T>(
    message: IdempotentMessage,
    handler: (payload: unknown) => Promise<T>
  ): Promise<T> {
    const { idempotencyKey, payload } = message;

    // 既に処理済みかチェック
    const cached = this.processedKeys.get(idempotencyKey);
    if (cached && cached.expiry > Date.now()) {
      console.log(`Returning cached result for ${idempotencyKey}`);
      return cached.result as T;
    }

    // 処理実行
    const result = await handler(payload);

    // 結果をキャッシュ
    this.processedKeys.set(idempotencyKey, {
      result,
      expiry: Date.now() + this.ttlMs,
    });

    return result;
  }

  // Redis版(分散環境向け)
  async processWithRedis<T>(
    redis: Redis,
    message: IdempotentMessage,
    handler: (payload: unknown) => Promise<T>
  ): Promise<T> {
    const { idempotencyKey, payload } = message;
    const cacheKey = `idempotency:${idempotencyKey}`;

    // 既に処理済みかチェック
    const cached = await redis.get(cacheKey);
    if (cached) {
      return JSON.parse(cached) as T;
    }

    // 排他制御(処理中フラグ)
    const lockKey = `lock:${idempotencyKey}`;
    const acquired = await redis.set(lockKey, '1', 'EX', 30, 'NX');

    if (!acquired) {
      // 他のワーカーが処理中、結果を待機
      await this.waitForResult(redis, cacheKey);
      const result = await redis.get(cacheKey);
      return JSON.parse(result!) as T;
    }

    try {
      const result = await handler(payload);

      // 結果をキャッシュ
      await redis.setex(cacheKey, this.ttlMs / 1000, JSON.stringify(result));

      return result;
    } finally {
      await redis.del(lockKey);
    }
  }

  private async waitForResult(
    redis: Redis,
    key: string,
    maxWaitMs: number = 30000
  ): Promise<void> {
    const startTime = Date.now();

    while (Date.now() - startTime < maxWaitMs) {
      const result = await redis.get(key);
      if (result) return;
      await new Promise((resolve) => setTimeout(resolve, 100));
    }

    throw new Error('Timeout waiting for result');
  }
}

デッドレターキュー(DLQ)

flowchart TB
    MQ["Main Queue"] --> Consumer["Consumer"]
    Consumer --> Success{"処理結果"}
    Success -->|成功| Complete["完了"]
    Success -->|失敗| Retry{"リトライ回数 < 3?"}
    Retry -->|YES| Requeue["再キューイング<br/>(遅延付き)"]
    Requeue --> MQ
    Retry -->|NO| DLQ["Dead Letter Queue"]
    DLQ --> Alert["監視・アラート<br/>手動対応"]

    style DLQ fill:#f99,stroke:#f00
    style Alert fill:#ff9,stroke:#f90
// DLQ実装例

interface DeadLetterMessage {
  originalMessage: unknown;
  error: string;
  failedAt: string;
  retryCount: number;
  originalQueue: string;
}

class DeadLetterQueueHandler {
  constructor(
    private mainQueue: MessageQueue,
    private dlq: MessageQueue
  ) {}

  async processWithDLQ<T>(
    message: Message,
    handler: (msg: Message) => Promise<T>,
    maxRetries: number = 3
  ): Promise<T | null> {
    const retryCount = this.getRetryCount(message);

    try {
      return await handler(message);
    } catch (error) {
      if (retryCount < maxRetries) {
        // リトライ
        await this.requeueWithDelay(message, retryCount + 1);
      } else {
        // DLQへ送信
        await this.sendToDLQ(message, error as Error, retryCount);
      }
      return null;
    }
  }

  private getRetryCount(message: Message): number {
    return message.headers?.['x-retry-count'] || 0;
  }

  private async requeueWithDelay(
    message: Message,
    retryCount: number
  ): Promise<void> {
    const delay = Math.min(
      1000 * Math.pow(2, retryCount),  // 指数バックオフ
      60000  // 最大1分
    );

    await this.mainQueue.publish({
      ...message,
      headers: {
        ...message.headers,
        'x-retry-count': retryCount,
        'x-retry-delay': delay,
      },
    }, { delayMs: delay });
  }

  private async sendToDLQ(
    message: Message,
    error: Error,
    retryCount: number
  ): Promise<void> {
    const dlqMessage: DeadLetterMessage = {
      originalMessage: message,
      error: error.message,
      failedAt: new Date().toISOString(),
      retryCount,
      originalQueue: this.mainQueue.name,
    };

    await this.dlq.publish(dlqMessage);

    // アラート送信
    await this.sendAlert(dlqMessage);
  }

  private async sendAlert(message: DeadLetterMessage): Promise<void> {
    console.error('Message sent to DLQ:', {
      error: message.error,
      retryCount: message.retryCount,
      originalQueue: message.originalQueue,
    });
    // Slack/PagerDuty等への通知
  }
}

順序保証

// パーティションキーによる順序保証

class OrderedMessageProcessor {
  private processingMap: Map<string, Promise<void>> = new Map();

  async processInOrder<T>(
    partitionKey: string,
    handler: () => Promise<T>
  ): Promise<T> {
    // 同じパーティションキーの前の処理を待つ
    const previousPromise = this.processingMap.get(partitionKey) || Promise.resolve();

    const currentPromise = previousPromise.then(handler);
    this.processingMap.set(partitionKey, currentPromise.then(() => {}));

    try {
      return await currentPromise;
    } finally {
      // 処理完了後、マップをクリーンアップ
      if (this.processingMap.get(partitionKey) === currentPromise) {
        this.processingMap.delete(partitionKey);
      }
    }
  }
}

// 使用例
const processor = new OrderedMessageProcessor();

// ユーザーIDをパーティションキーとして使用
// 同じユーザーのメッセージは順序通りに処理される
await processor.processInOrder(message.userId, async () => {
  await processUserMessage(message);
});

監視とオブザーバビリティ

// メトリクス収集

interface QueueMetrics {
  messagesPublished: number;
  messagesConsumed: number;
  messagesFailed: number;
  processingLatencyMs: number[];
  queueDepth: number;
}

class MetricsCollector {
  private metrics: QueueMetrics = {
    messagesPublished: 0,
    messagesConsumed: 0,
    messagesFailed: 0,
    processingLatencyMs: [],
    queueDepth: 0,
  };

  recordPublish(): void {
    this.metrics.messagesPublished++;
  }

  recordConsume(latencyMs: number): void {
    this.metrics.messagesConsumed++;
    this.metrics.processingLatencyMs.push(latencyMs);
  }

  recordFailure(): void {
    this.metrics.messagesFailed++;
  }

  updateQueueDepth(depth: number): void {
    this.metrics.queueDepth = depth;
  }

  getStats(): {
    throughput: number;
    errorRate: number;
    avgLatency: number;
    p99Latency: number;
  } {
    const total = this.metrics.messagesConsumed + this.metrics.messagesFailed;

    return {
      throughput: this.metrics.messagesConsumed,
      errorRate: total > 0 ? this.metrics.messagesFailed / total : 0,
      avgLatency: this.calculateAverage(this.metrics.processingLatencyMs),
      p99Latency: this.calculatePercentile(this.metrics.processingLatencyMs, 99),
    };
  }

  private calculateAverage(values: number[]): number {
    if (values.length === 0) return 0;
    return values.reduce((a, b) => a + b, 0) / values.length;
  }

  private calculatePercentile(values: number[], percentile: number): number {
    if (values.length === 0) return 0;
    const sorted = [...values].sort((a, b) => a - b);
    const index = Math.ceil((percentile / 100) * sorted.length) - 1;
    return sorted[index];
  }
}

まとめ

メッセージキューは分散システムの要となる技術です。

選定基準

要件推奨システム
シンプルなタスクキューRabbitMQ, SQS
イベントストリーミングKafka
リアルタイム処理Redis Streams
サーバーレスSQS + Lambda
ログ集約Kafka

設計時の注意点

  1. 冪等性の確保: 重複処理を防ぐ
  2. 順序保証: 必要な場合のみ、パーティション単位で
  3. エラーハンドリング: DLQとリトライ戦略
  4. 監視: スループット、レイテンシ、エラー率

適切なメッセージキューの選択と設計により、スケーラブルで信頼性の高いシステムを構築できます。

参考リンク

← 一覧に戻る