この記事の要点
• Apache KafkaのProducer/Consumerモデルとトピック・パーティションを理解する
• Docker Composeで開発環境を構築しkafkajsで実装する
• コンシューマグループとスキーマ管理のベストプラクティスを学ぶ
このチュートリアルで学ぶこと
- ✓ Kafkaのアーキテクチャと用語
- ✓ Docker Composeでの開発環境構築
- ✓ トピック作成と基本CLI
- ✓ Node.js (kafkajs) からの Producer/Consumer
- ✓ コンシューマグループとパーティション
- ✓ スキーマ・エラーハンドリング・運用上の注意
前提条件
- Docker / Docker Compose の基本
- Node.js 20 以上
- 非同期プログラミング (async/await) の基礎
- キュー / Pub-Sub パターンの概念
基本概念
ポイント: Kafkaは「ログベース」のメッセージング基盤です。メッセージは消費後も保持されるため、再処理やリプレイが可能です。
- Broker: Kafkaサーバー (クラスタを構成)
- Topic: メッセージのカテゴリ (論理的なチャネル)
- Partition: Topicを分割した単位 (並列度とスケールの軸)
- Offset: パーティション内のメッセージ位置
- Producer: メッセージを送信するクライアント
- Consumer: メッセージを受信するクライアント
- Consumer Group: 同じgroupIdを持つConsumerでパーティションを分担
Producer ──▶ [Topic]
├─ Partition 0 ──▶ Consumer A (group X)
├─ Partition 1 ──▶ Consumer B (group X)
└─ Partition 2 ──▶ Consumer C (group X)
Kafkaは「高スループット」「永続化」「順序保証 (パーティション内)」が強みで、 イベント駆動アーキテクチャやログ基盤、ストリーム処理の土台として広く使われます。
プロジェクトのセットアップ
mkdir kafka-demo && cd kafka-demo
mkdir app
touch docker-compose.yml
docker-compose.yml (KRaftモード)
services:
kafka:
image: bitnami/kafka:3.7
container_name: kafka
ports:
- "9092:9092"
- "9094:9094"
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
- kafka_data:/bitnami/kafka
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
depends_on:
- kafka
volumes:
kafka_data:
docker compose up -d
# Kafka UI: http://localhost:8080
Step 1: トピック作成 (CLI)
# コンテナに入って操作
docker exec -it kafka bash
# トピック作成
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic orders --partitions 3 --replication-factor 1
# 一覧
kafka-topics.sh --bootstrap-server localhost:9092 --list
# 詳細
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic orders
CLIで簡易Producer/Consumer:
# Producer
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic orders
# Consumer (別ターミナル)
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic orders --from-beginning
Step 2: Node.jsアプリのセットアップ
実践メモ: kafkajsはNode.js用の純粋なJavaScript実装で、ネイティブ依存なしで動作します。
cd app
npm init -y
npm install kafkajs
npm install -D typescript @types/node tsx
npx tsc --init
// app/package.json (scripts)
{
"scripts": {
"producer": "tsx src/producer.ts",
"consumer": "tsx src/consumer.ts"
}
}
共通クライアント
// app/src/kafka.ts
import { Kafka, logLevel } from "kafkajs";
export const kafka = new Kafka({
clientId: "demo-app",
brokers: ["localhost:9094"],
logLevel: logLevel.INFO,
retry: { retries: 5, initialRetryTime: 300 },
});
Step 3: Producer の実装
// app/src/producer.ts
import { kafka } from "./kafka";
type OrderEvent = {
id: string;
userId: string;
amount: number;
createdAt: string;
};
async function main() {
const producer = kafka.producer({
allowAutoTopicCreation: false,
idempotent: true,
});
await producer.connect();
for (let i = 0; i < 10; i++) {
const event: OrderEvent = {
id: crypto.randomUUID(),
userId: `user-${i % 3}`,
amount: Math.floor(Math.random() * 10000),
createdAt: new Date().toISOString(),
};
await producer.send({
topic: "orders",
messages: [
{
// userId を key にすると同じユーザーの順序が保たれる
key: event.userId,
value: JSON.stringify(event),
headers: { "content-type": "application/json" },
},
],
});
console.log("sent", event);
}
await producer.disconnect();
}
main().catch((e) => {
console.error(e);
process.exit(1);
});
idempotent: true によりリトライ時の重複書き込みを防ぎます (同一プロデューサ内)。
Step 4: Consumer の実装
// app/src/consumer.ts
import { kafka } from "./kafka";
async function main() {
const consumer = kafka.consumer({ groupId: "orders-worker" });
await consumer.connect();
await consumer.subscribe({ topic: "orders", fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const key = message.key?.toString();
const value = message.value?.toString();
if (!value) return;
try {
const event = JSON.parse(value);
console.log(`[p${partition}] ${key}`, event);
// 実ビジネスロジック: DB保存, メール送信, etc.
} catch (err) {
console.error("parse error", err);
// DLQ (Dead Letter Queue) への退避などを検討
}
},
});
const shutdown = async () => {
await consumer.disconnect();
process.exit(0);
};
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);
}
main().catch((e) => {
console.error(e);
process.exit(1);
});
別ターミナルでproducerとconsumerを起動:
npm run consumer
# 別窓
npm run producer
Step 5: コンシューマグループとスケール
同じ groupId のConsumerを複数起動すると、パーティション数まで並列に分担されます。
Topic orders (3 partitions)
groupId: orders-worker
Consumer A → P0
Consumer B → P1
Consumer C → P2
groupId: analytics (別グループは全件独立受信)
Consumer X → P0, P1, P2
オフセットはグループ単位で管理されるため、複数の異なる用途 (業務処理と分析基盤) で 同じトピックを並行消費できます。
Step 6: エラーハンドリングとDLQ
// app/src/consumer-dlq.ts
import { kafka } from "./kafka";
async function main() {
const consumer = kafka.consumer({ groupId: "orders-worker" });
const producer = kafka.producer();
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: "orders", fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
try {
const event = JSON.parse(message.value!.toString());
if (typeof event.amount !== "number") {
throw new Error("invalid amount");
}
// 処理
} catch (err) {
await producer.send({
topic: "orders.dlq",
messages: [
{
key: message.key,
value: message.value,
headers: {
error: (err as Error).message,
"original-topic": "orders",
},
},
],
});
}
},
});
}
main().catch(console.error);
注意: acks: 0は高速ですがメッセージ消失のリスクがあります。重要なデータにはacks: -1(全レプリカ確認)を使いましょう。
よくあるエラーと対処
-
“Connection refused” (ローカルから接続できない)
- advertised.listeners が localhost:9094 で公開されているか確認
- ホストからは 9094、コンテナ間は 9092 を使用
-
Consumerが同じメッセージを繰り返し受信
- 例外で処理中断 → オフセットコミットされない
- try/catchで処理を完結させる、DLQに逃がす
-
Rebalance が頻発
- sessionTimeout, heartbeatInterval を調整
- 1メッセージあたりの処理時間が長すぎる場合は maxBytes/並列度を見直す
-
順序が保たれない
- 同じ key (例: userId) を使ってproduce
- パーティションを跨いだ全体順序は保証されない
-
メッセージが消える
- acks=all, replication.factor >= 3 を本番で徹底
- min.insync.replicas の設定も併用
ベストプラクティス
- トピック名は ドメイン.エンティティ.イベント 形式で統一 (例: orders.created)
- キー設計で順序保証とパーティション分散を両立
- スキーマレジストリ (Avro/Protobuf) の検討
- 本番は replication.factor = 3 以上
- 監視: consumer lag, under-replicated partitions, request latency
- 冪等な消費者設計 (同じメッセージを複数回処理しても結果が同じ)
次のステップ
- Schema Registry + Avro/Protobuf
- Kafka Streams / ksqlDB でストリーム処理
- Kafka Connect によるDB連携 (CDC)
- MirrorMaker 2 でクラスタ間レプリケーション
- 監視: Prometheus JMX Exporter / Cruise Control
まとめ
Kafkaは高スループット・永続化・順序保証・スケールアウトを兼ね備えた分散ストリーミング基盤です。 パーティション、コンシューマグループ、オフセットという基本概念を押さえ、 冪等性・キー設計・DLQなどの実践的な要点を意識することで、堅牢なイベント駆動システムを構築できます。
FAQ
Q. KRaftモードとZooKeeperモードの違いは? A. Kafka 3.x 以降は KRaft (Kafka Raft) が推奨されており、ZooKeeperを不要にします。 新規構築はKRaftモードで始めるのがベストです。
Q. RabbitMQ との違いは? A. RabbitMQは伝統的なメッセージブローカーでルーティングが柔軟、Kafkaはログベースの高スループット・ 永続化に強みがあります。ストリーム処理やイベントソーシングにはKafkaが向きます。
Q. Exactly Once は実現できる?
A. Kafka側ではトランザクションAPI (enable.idempotence=true + transactional.id) で実現可能です。
ただしConsumer側の処理も冪等に設計する必要があります。
Q. トピックの設計は? A. 「1トピック1イベント種別」を基本に、ドメイン駆動設計の境界に合わせて分割します。 パーティション数は後から減らせないため、将来のスループットを見越して設定します。
チートシート
# Apache Kafka 3.7 / kafkajs 2.x
# https://kafka.apache.org/documentation/
# CLI
kafka-topics.sh # トピック作成/一覧/削除
kafka-configs.sh # 設定変更 (保持期間等)
kafka-consumer-groups.sh # lag確認, offsetリセット
kafka-console-producer.sh / kafka-console-consumer.sh # 動作確認
# 主要設定
# Producer側:
# acks=all # 永続化保証
# retries=Integer.MAX_VALUE # リトライ
# enable.idempotence=true # 冪等化
# max.in.flight.requests.per.connection=5
# compression.type=lz4 # 圧縮
#
# Consumer側:
# auto.offset.reset=earliest|latest
# enable.auto.commit=false # 明示コミット推奨
# max.poll.records=500
参考リソース
- Apache Kafka公式ドキュメント
- KafkaJS公式
- Confluent Kafka Tutorials
- Bitnami Kafka Docker Image
- Kafka KRaft Mode解説
補足: トピック命名と運用
大規模になるとトピック名の一貫性が重要になります。以下は実運用でよく使われるパターンです。
<domain>.<entity>.<event> 例: orders.order.created
<env>.<domain>.<entity>.<event> 例: prod.billing.invoice.issued
保持期間とクリーンアップポリシー
# 7日間保持
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name orders \
--alter --add-config retention.ms=604800000
# 圧縮 (compact) ポリシー (キーごとに最新値のみ保持)
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name user.profile \
--alter --add-config cleanup.policy=compact
compact ポリシーは、ユーザプロファイルのような「最新状態」を保持したいトピックで使います。
一方、イベントログには delete ポリシー + 期間指定が基本です。
Consumer Lag の監視
Consumerがどれだけメッセージ処理に遅れているかは運用上の最重要メトリクスです。
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group orders-worker
出力の LAG 列が継続的に増えていれば、消費側がボトルネックです。
パーティション数を増やす、Consumerインスタンスを増やす、処理の並列度を上げるといった対策を検討します。