Apache Kafka入門 - 分散ストリーミング基盤の実践

intermediate | 60分 で読める | 2026.04.10

公式ドキュメント

この記事の要点

Apache KafkaProducer/Consumerモデルとトピック・パーティションを理解する
• Docker Composeで開発環境を構築しkafkajsで実装する
• コンシューマグループとスキーマ管理のベストプラクティスを学ぶ

このチュートリアルで学ぶこと

  • ✓ Kafkaのアーキテクチャと用語
  • ✓ Docker Composeでの開発環境構築
  • ✓ トピック作成と基本CLI
  • ✓ Node.js (kafkajs) からの Producer/Consumer
  • ✓ コンシューマグループとパーティション
  • ✓ スキーマ・エラーハンドリング・運用上の注意

前提条件

  • Docker / Docker Compose の基本
  • Node.js 20 以上
  • 非同期プログラミング (async/await) の基礎
  • キュー / Pub-Sub パターンの概念

基本概念

ポイント: Kafkaは「ログベース」のメッセージング基盤です。メッセージは消費後も保持されるため、再処理やリプレイが可能です。

  • Broker: Kafkaサーバー (クラスタを構成)
  • Topic: メッセージのカテゴリ (論理的なチャネル)
  • Partition: Topicを分割した単位 (並列度とスケールの軸)
  • Offset: パーティション内のメッセージ位置
  • Producer: メッセージを送信するクライアント
  • Consumer: メッセージを受信するクライアント
  • Consumer Group: 同じgroupIdを持つConsumerでパーティションを分担
Producer ──▶ [Topic]
                ├─ Partition 0 ──▶ Consumer A (group X)
                ├─ Partition 1 ──▶ Consumer B (group X)
                └─ Partition 2 ──▶ Consumer C (group X)

Kafkaは「高スループット」「永続化」「順序保証 (パーティション内)」が強みで、 イベント駆動アーキテクチャやログ基盤、ストリーム処理の土台として広く使われます。

プロジェクトのセットアップ

mkdir kafka-demo && cd kafka-demo
mkdir app
touch docker-compose.yml

docker-compose.yml (KRaftモード)

services:
  kafka:
    image: bitnami/kafka:3.7
    container_name: kafka
    ports:
      - "9092:9092"
      - "9094:9094"
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - ALLOW_PLAINTEXT_LISTENER=yes
    volumes:
      - kafka_data:/bitnami/kafka

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
    depends_on:
      - kafka

volumes:
  kafka_data:
docker compose up -d
# Kafka UI: http://localhost:8080

Step 1: トピック作成 (CLI)

# コンテナに入って操作
docker exec -it kafka bash

# トピック作成
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic orders --partitions 3 --replication-factor 1

# 一覧
kafka-topics.sh --bootstrap-server localhost:9092 --list

# 詳細
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic orders

CLIで簡易Producer/Consumer:

# Producer
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic orders

# Consumer (別ターミナル)
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic orders --from-beginning

Step 2: Node.jsアプリのセットアップ

実践メモ: kafkajsはNode.js用の純粋なJavaScript実装で、ネイティブ依存なしで動作します。

cd app
npm init -y
npm install kafkajs
npm install -D typescript @types/node tsx
npx tsc --init
// app/package.json (scripts)
{
  "scripts": {
    "producer": "tsx src/producer.ts",
    "consumer": "tsx src/consumer.ts"
  }
}

共通クライアント

// app/src/kafka.ts
import { Kafka, logLevel } from "kafkajs";

export const kafka = new Kafka({
  clientId: "demo-app",
  brokers: ["localhost:9094"],
  logLevel: logLevel.INFO,
  retry: { retries: 5, initialRetryTime: 300 },
});

Step 3: Producer の実装

// app/src/producer.ts
import { kafka } from "./kafka";

type OrderEvent = {
  id: string;
  userId: string;
  amount: number;
  createdAt: string;
};

async function main() {
  const producer = kafka.producer({
    allowAutoTopicCreation: false,
    idempotent: true,
  });
  await producer.connect();

  for (let i = 0; i < 10; i++) {
    const event: OrderEvent = {
      id: crypto.randomUUID(),
      userId: `user-${i % 3}`,
      amount: Math.floor(Math.random() * 10000),
      createdAt: new Date().toISOString(),
    };

    await producer.send({
      topic: "orders",
      messages: [
        {
          // userId を key にすると同じユーザーの順序が保たれる
          key: event.userId,
          value: JSON.stringify(event),
          headers: { "content-type": "application/json" },
        },
      ],
    });

    console.log("sent", event);
  }

  await producer.disconnect();
}

main().catch((e) => {
  console.error(e);
  process.exit(1);
});

idempotent: true によりリトライ時の重複書き込みを防ぎます (同一プロデューサ内)。

Step 4: Consumer の実装

// app/src/consumer.ts
import { kafka } from "./kafka";

async function main() {
  const consumer = kafka.consumer({ groupId: "orders-worker" });
  await consumer.connect();
  await consumer.subscribe({ topic: "orders", fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const key = message.key?.toString();
      const value = message.value?.toString();
      if (!value) return;

      try {
        const event = JSON.parse(value);
        console.log(`[p${partition}] ${key}`, event);
        // 実ビジネスロジック: DB保存, メール送信, etc.
      } catch (err) {
        console.error("parse error", err);
        // DLQ (Dead Letter Queue) への退避などを検討
      }
    },
  });

  const shutdown = async () => {
    await consumer.disconnect();
    process.exit(0);
  };
  process.on("SIGINT", shutdown);
  process.on("SIGTERM", shutdown);
}

main().catch((e) => {
  console.error(e);
  process.exit(1);
});

別ターミナルでproducerとconsumerを起動:

npm run consumer
# 別窓
npm run producer

Step 5: コンシューマグループとスケール

同じ groupId のConsumerを複数起動すると、パーティション数まで並列に分担されます。

Topic orders (3 partitions)

groupId: orders-worker
  Consumer A → P0
  Consumer B → P1
  Consumer C → P2

groupId: analytics       (別グループは全件独立受信)
  Consumer X → P0, P1, P2

オフセットはグループ単位で管理されるため、複数の異なる用途 (業務処理と分析基盤) で 同じトピックを並行消費できます。

Step 6: エラーハンドリングとDLQ

// app/src/consumer-dlq.ts
import { kafka } from "./kafka";

async function main() {
  const consumer = kafka.consumer({ groupId: "orders-worker" });
  const producer = kafka.producer();
  await consumer.connect();
  await producer.connect();
  await consumer.subscribe({ topic: "orders", fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ message }) => {
      try {
        const event = JSON.parse(message.value!.toString());
        if (typeof event.amount !== "number") {
          throw new Error("invalid amount");
        }
        // 処理
      } catch (err) {
        await producer.send({
          topic: "orders.dlq",
          messages: [
            {
              key: message.key,
              value: message.value,
              headers: {
                error: (err as Error).message,
                "original-topic": "orders",
              },
            },
          ],
        });
      }
    },
  });
}

main().catch(console.error);

注意: acks: 0は高速ですがメッセージ消失のリスクがあります。重要なデータにはacks: -1(全レプリカ確認)を使いましょう。

よくあるエラーと対処

  1. “Connection refused” (ローカルから接続できない)

    • advertised.listeners が localhost:9094 で公開されているか確認
    • ホストからは 9094、コンテナ間は 9092 を使用
  2. Consumerが同じメッセージを繰り返し受信

    • 例外で処理中断 → オフセットコミットされない
    • try/catchで処理を完結させる、DLQに逃がす
  3. Rebalance が頻発

    • sessionTimeout, heartbeatInterval を調整
    • 1メッセージあたりの処理時間が長すぎる場合は maxBytes/並列度を見直す
  4. 順序が保たれない

    • 同じ key (例: userId) を使ってproduce
    • パーティションを跨いだ全体順序は保証されない
  5. メッセージが消える

    • acks=all, replication.factor >= 3 を本番で徹底
    • min.insync.replicas の設定も併用

ベストプラクティス

  • トピック名は ドメイン.エンティティ.イベント 形式で統一 (例: orders.created)
  • キー設計で順序保証とパーティション分散を両立
  • スキーマレジストリ (Avro/Protobuf) の検討
  • 本番は replication.factor = 3 以上
  • 監視: consumer lag, under-replicated partitions, request latency
  • 冪等な消費者設計 (同じメッセージを複数回処理しても結果が同じ)

次のステップ

  • Schema Registry + Avro/Protobuf
  • Kafka Streams / ksqlDB でストリーム処理
  • Kafka Connect によるDB連携 (CDC)
  • MirrorMaker 2 でクラスタ間レプリケーション
  • 監視: Prometheus JMX Exporter / Cruise Control

まとめ

Kafkaは高スループット・永続化・順序保証・スケールアウトを兼ね備えた分散ストリーミング基盤です。 パーティション、コンシューマグループ、オフセットという基本概念を押さえ、 冪等性・キー設計・DLQなどの実践的な要点を意識することで、堅牢なイベント駆動システムを構築できます。

FAQ

Q. KRaftモードとZooKeeperモードの違いは? A. Kafka 3.x 以降は KRaft (Kafka Raft) が推奨されており、ZooKeeperを不要にします。 新規構築はKRaftモードで始めるのがベストです。

Q. RabbitMQ との違いは? A. RabbitMQは伝統的なメッセージブローカーでルーティングが柔軟、Kafkaはログベースの高スループット・ 永続化に強みがあります。ストリーム処理やイベントソーシングにはKafkaが向きます。

Q. Exactly Once は実現できる? A. Kafka側ではトランザクションAPI (enable.idempotence=true + transactional.id) で実現可能です。 ただしConsumer側の処理も冪等に設計する必要があります。

Q. トピックの設計は? A. 「1トピック1イベント種別」を基本に、ドメイン駆動設計の境界に合わせて分割します。 パーティション数は後から減らせないため、将来のスループットを見越して設定します。

チートシート

# Apache Kafka 3.7 / kafkajs 2.x
# https://kafka.apache.org/documentation/

# CLI
kafka-topics.sh      # トピック作成/一覧/削除
kafka-configs.sh     # 設定変更 (保持期間等)
kafka-consumer-groups.sh # lag確認, offsetリセット
kafka-console-producer.sh / kafka-console-consumer.sh # 動作確認

# 主要設定
# Producer側:
#   acks=all                     # 永続化保証
#   retries=Integer.MAX_VALUE    # リトライ
#   enable.idempotence=true      # 冪等化
#   max.in.flight.requests.per.connection=5
#   compression.type=lz4         # 圧縮
#
# Consumer側:
#   auto.offset.reset=earliest|latest
#   enable.auto.commit=false     # 明示コミット推奨
#   max.poll.records=500

参考リソース

補足: トピック命名と運用

大規模になるとトピック名の一貫性が重要になります。以下は実運用でよく使われるパターンです。

<domain>.<entity>.<event>            例: orders.order.created
<env>.<domain>.<entity>.<event>      例: prod.billing.invoice.issued

保持期間とクリーンアップポリシー

# 7日間保持
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name orders \
  --alter --add-config retention.ms=604800000

# 圧縮 (compact) ポリシー (キーごとに最新値のみ保持)
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name user.profile \
  --alter --add-config cleanup.policy=compact

compact ポリシーは、ユーザプロファイルのような「最新状態」を保持したいトピックで使います。 一方、イベントログには delete ポリシー + 期間指定が基本です。

Consumer Lag の監視

Consumerがどれだけメッセージ処理に遅れているかは運用上の最重要メトリクスです。

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group orders-worker

出力の LAG 列が継続的に増えていれば、消費側がボトルネックです。 パーティション数を増やす、Consumerインスタンスを増やす、処理の並列度を上げるといった対策を検討します。

この技術を体系的に学びたいですか?

未来学では東証プライム上場企業のITエンジニアが24時間サポート。月額24,800円から、退会金0円のオンラインIT塾です。

メールで無料相談する
← 一覧に戻る