Change Data Capture(CDC)とは
Change Data Capture(CDC)は、データベースの変更(INSERT、UPDATE、DELETE)をリアルタイムで検出し、他のシステムに伝播させる技術です。2025年、マイクロサービスアーキテクチャの普及とリアルタイムデータパイプラインの需要増加により、CDCは現代のデータアーキテクチャにおける標準的なアプローチとなっています。
従来のバッチ処理によるデータ同期では、数時間から数日の遅延が発生していました。CDCを導入することで、ミリ秒単位でのデータ同期が可能となり、分析、キャッシュ更新、イベント駆動アーキテクチャの基盤として活用されています。
CDCの仕組み
CDCには主に3つのアプローチがあります。それぞれ特性が異なるため、ユースケースに応じた選択が重要です。
1. Log-based CDC(ログベース)
データベースのトランザクションログ(WAL、binlogなど)を直接読み取る方式。最も推奨されるアプローチです。
-- PostgreSQL での WAL レベル設定
ALTER SYSTEM SET wal_level = 'logical';
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;
-- レプリケーションスロットの作成
SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput');
メリット:
- データベースへの負荷が最小限
- すべての変更を正確にキャプチャ
- トランザクション順序が保証される
- パフォーマンスへの影響が少ない
デメリット:
- データベース固有の設定が必要
- 管理者権限が必要な場合がある
- ログの保持期間に依存
2. Trigger-based CDC(トリガーベース)
データベーストリガーを使用して変更を検出し、シャドウテーブルに記録する方式。
-- PostgreSQL でのトリガーベース CDC 実装例
CREATE TABLE orders_audit (
audit_id SERIAL PRIMARY KEY,
order_id INTEGER,
operation VARCHAR(10),
old_data JSONB,
new_data JSONB,
changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE OR REPLACE FUNCTION capture_order_changes()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO orders_audit (order_id, operation, new_data)
VALUES (NEW.id, 'INSERT', row_to_json(NEW)::jsonb);
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO orders_audit (order_id, operation, old_data, new_data)
VALUES (NEW.id, 'UPDATE', row_to_json(OLD)::jsonb, row_to_json(NEW)::jsonb);
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
INSERT INTO orders_audit (order_id, operation, old_data)
VALUES (OLD.id, 'DELETE', row_to_json(OLD)::jsonb);
RETURN OLD;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER orders_cdc_trigger
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION capture_order_changes();
メリット:
- 特別な権限不要
- カスタムロジックを追加可能
- どのデータベースでも実装可能
デメリット:
- 書き込みパフォーマンスに影響
- シャドウテーブルの管理が必要
- トランザクション分離の複雑さ
3. Query-based CDC(クエリベース)
定期的にテーブルをクエリし、タイムスタンプやバージョン番号で変更を検出する方式。
import asyncio
from datetime import datetime, timedelta
import asyncpg
class QueryBasedCDC:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.last_sync_time = datetime.utcnow() - timedelta(hours=1)
async def capture_changes(self, table: str, timestamp_column: str):
conn = await asyncpg.connect(self.connection_string)
try:
query = f"""
SELECT * FROM {table}
WHERE {timestamp_column} > $1
ORDER BY {timestamp_column}
"""
rows = await conn.fetch(query, self.last_sync_time)
self.last_sync_time = datetime.utcnow()
return rows
finally:
await conn.close()
async def poll_continuously(self, table: str, timestamp_column: str, interval: int = 10):
while True:
changes = await self.capture_changes(table, timestamp_column)
if changes:
await self.process_changes(changes)
await asyncio.sleep(interval)
async def process_changes(self, changes):
for row in changes:
print(f"Change detected: {dict(row)}")
メリット:
- 実装が簡単
- データベース設定変更不要
- レガシーシステムでも使用可能
デメリット:
- DELETE を検出できない
- ポーリング間隔に依存
- 高頻度更新には不向き
主要CDCツール比較
Debezium
Red Hat が開発するオープンソースの CDC プラットフォーム。2025年時点で最も広く採用されています。
# Debezium PostgreSQL Connector 設定
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: postgres-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
database.hostname: postgres
database.port: 5432
database.user: debezium
database.password: ${secrets:postgres-credentials:password}
database.dbname: inventory
database.server.name: dbserver1
plugin.name: pgoutput
slot.name: debezium_slot
publication.name: dbz_publication
table.include.list: public.orders,public.customers
# スナップショット設定
snapshot.mode: initial
# トランスフォーメーション
transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones: false
transforms.unwrap.delete.handling.mode: rewrite
Debezium の出力フォーマット:
{
"schema": {...},
"payload": {
"before": {
"id": 1,
"customer_name": "田中太郎",
"amount": 10000
},
"after": {
"id": 1,
"customer_name": "田中太郎",
"amount": 15000
},
"source": {
"version": "2.5.0.Final",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1704067200000,
"db": "inventory",
"schema": "public",
"table": "orders",
"txId": 12345,
"lsn": 98765432
},
"op": "u",
"ts_ms": 1704067200100
}
}
特徴:
- PostgreSQL、MySQL、MongoDB、SQL Server、Oracle 対応
- Kafka Connect ベースのアーキテクチャ
- 正確な once semantics をサポート
- スキーマレジストリとの統合
Airbyte
オープンソースのデータ統合プラットフォームで、CDC 機能を内蔵しています。
# Airbyte CDC ソース設定(PostgreSQL)
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
connectionConfiguration:
host: localhost
port: 5432
database: mydb
username: airbyte_user
password: ${AIRBYTE_PASSWORD}
schemas:
- public
replication_method:
method: CDC
plugin: pgoutput
replication_slot: airbyte_slot
publication: airbyte_publication
initial_waiting_seconds: 300
# Airbyte Python SDK による CDC ストリーム処理
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.streams import Stream
class CDCStream(Stream):
primary_key = "id"
cursor_field = "_ab_cdc_lsn"
def get_json_schema(self):
return {
"type": "object",
"properties": {
"id": {"type": "integer"},
"data": {"type": "object"},
"_ab_cdc_lsn": {"type": "integer"},
"_ab_cdc_updated_at": {"type": "string"},
"_ab_cdc_deleted_at": {"type": ["string", "null"]}
}
}
def read_records(self, sync_mode, cursor_field=None, stream_slice=None, stream_state=None):
# CDC レコードの読み取り処理
last_lsn = stream_state.get("_ab_cdc_lsn", 0) if stream_state else 0
for record in self.fetch_cdc_records(last_lsn):
yield record
特徴:
- 300+ のコネクタ
- WebUI による設定管理
- Kubernetes 対応
- 増分同期とフル同期の両方に対応
Fivetran
エンタープライズ向けのマネージド CDC サービス。
-- Fivetran 用のデータベース設定(PostgreSQL)
CREATE USER fivetran WITH PASSWORD 'secure_password';
GRANT USAGE ON SCHEMA public TO fivetran;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO fivetran;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO fivetran;
-- CDC 用の追加権限
GRANT rds_replication TO fivetran; -- AWS RDS の場合
特徴:
- フルマネージドサービス
- 自動スキーマ変更検出
- SLA 保証(99.9%)
- 豊富なデスティネーション対応
ツール選択ガイド
| 要件 | Debezium | Airbyte | Fivetran |
|---|---|---|---|
| コスト | 無料(OSS) | 無料/有料 | 有料 |
| 運用負荷 | 高 | 中 | 低 |
| カスタマイズ性 | 高 | 中 | 低 |
| エンタープライズサポート | Red Hat | Cloud版 | あり |
| レイテンシ | 最小 | 低 | 中 |
| 対応DB数 | 中 | 多 | 多 |
Kafka との統合
CDC と Apache Kafka の組み合わせは、2025年のリアルタイムデータアーキテクチャの標準となっています。
Debezium + Kafka 構成
// Kafka Streams による CDC イベント処理
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Consumed;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
public class CDCStreamProcessor {
public static void main(String[] args) {
StreamsBuilder builder = new StreamsBuilder();
// CDC イベントストリームを購読
KStream<String, OrderEvent> orderChanges = builder.stream(
"dbserver1.public.orders",
Consumed.with(Serdes.String(), new SpecificAvroSerde<>())
);
// INSERT イベントをフィルタリング
KStream<String, OrderEvent> newOrders = orderChanges
.filter((key, value) -> "c".equals(value.getOp()));
// UPDATE イベントで金額変更を検出
KStream<String, OrderEvent> priceChanges = orderChanges
.filter((key, value) -> "u".equals(value.getOp()))
.filter((key, value) -> {
var before = value.getBefore();
var after = value.getAfter();
return !before.getAmount().equals(after.getAmount());
});
// 通知システムに転送
newOrders.to("notifications.new-orders");
priceChanges.to("notifications.price-changes");
// ストリーム処理を開始
KafkaStreams streams = new KafkaStreams(builder.build(), getConfig());
streams.start();
}
}
Kafka Connect 設定
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz",
"database.dbname": "inventory",
"topic.prefix": "dbserver1",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
実装パターン
パターン1: Outbox パターン
マイクロサービス間のイベント伝播に最適なパターンです。
-- Outbox テーブルの作成
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- インデックス作成
CREATE INDEX idx_outbox_created_at ON outbox(created_at);
// Spring Boot での Outbox パターン実装
@Service
@Transactional
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private OutboxRepository outboxRepository;
public Order createOrder(CreateOrderRequest request) {
// 1. 注文を作成
Order order = new Order(request);
order = orderRepository.save(order);
// 2. Outbox にイベントを記録(同一トランザクション)
OutboxEvent event = OutboxEvent.builder()
.aggregateType("Order")
.aggregateId(order.getId().toString())
.eventType("OrderCreated")
.payload(toJson(new OrderCreatedEvent(order)))
.build();
outboxRepository.save(event);
return order;
}
}
# Debezium Outbox Event Router 設定
transforms: outbox
transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
transforms.outbox.route.by.field: aggregate_type
transforms.outbox.table.field.event.type: event_type
transforms.outbox.table.field.event.payload: payload
transforms.outbox.route.topic.replacement: events.${routedByValue}
パターン2: CQRS + Event Sourcing
# Python での CQRS 読み取りモデル更新
import json
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
class CDCConsumer:
def __init__(self):
self.consumer = KafkaConsumer(
'dbserver1.public.orders',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='order-search-updater',
auto_offset_reset='earliest'
)
self.es = Elasticsearch(['http://localhost:9200'])
def process_events(self):
for message in self.consumer:
event = message.value
payload = event.get('payload', {})
operation = payload.get('op')
if operation == 'c': # CREATE
self.handle_insert(payload['after'])
elif operation == 'u': # UPDATE
self.handle_update(payload['after'])
elif operation == 'd': # DELETE
self.handle_delete(payload['before'])
def handle_insert(self, data):
self.es.index(
index='orders',
id=data['id'],
document={
'customer_name': data['customer_name'],
'amount': data['amount'],
'status': data['status'],
'created_at': data['created_at']
}
)
def handle_update(self, data):
self.es.update(
index='orders',
id=data['id'],
doc={
'customer_name': data['customer_name'],
'amount': data['amount'],
'status': data['status']
}
)
def handle_delete(self, data):
self.es.delete(index='orders', id=data['id'])
パターン3: キャッシュ無効化
// TypeScript での Redis キャッシュ無効化
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { createClient, RedisClientType } from 'redis';
interface CDCPayload {
op: 'c' | 'u' | 'd' | 'r';
before: Record<string, unknown> | null;
after: Record<string, unknown> | null;
source: {
table: string;
db: string;
};
}
class CacheInvalidator {
private consumer: Consumer;
private redis: RedisClientType;
async initialize(): Promise<void> {
const kafka = new Kafka({
clientId: 'cache-invalidator',
brokers: ['localhost:9092']
});
this.consumer = kafka.consumer({ groupId: 'cache-invalidation-group' });
this.redis = createClient({ url: 'redis://localhost:6379' });
await this.consumer.connect();
await this.redis.connect();
await this.consumer.subscribe({ topic: /dbserver1\.public\..*/ });
}
async run(): Promise<void> {
await this.consumer.run({
eachMessage: async ({ topic, message }: EachMessagePayload) => {
const payload: CDCPayload = JSON.parse(message.value!.toString());
await this.invalidateCache(payload);
}
});
}
private async invalidateCache(payload: CDCPayload): Promise<void> {
const table = payload.source.table;
const id = payload.after?.id || payload.before?.id;
if (!id) return;
// 個別キーの削除
const cacheKey = `${table}:${id}`;
await this.redis.del(cacheKey);
// 関連するリストキャッシュも削除
const listPattern = `${table}:list:*`;
const keys = await this.redis.keys(listPattern);
if (keys.length > 0) {
await this.redis.del(keys);
}
console.log(`Cache invalidated: ${cacheKey}`);
}
}
ユースケース
1. リアルタイム分析ダッシュボード
# Apache Flink での CDC ストリーム処理
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
# CDC ソーステーブルの定義
t_env.execute_sql("""
CREATE TABLE orders_cdc (
id INT,
customer_id INT,
amount DECIMAL(10, 2),
status STRING,
created_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json'
)
""")
# リアルタイム集計
t_env.execute_sql("""
CREATE TABLE hourly_sales (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
total_amount DECIMAL(12, 2),
order_count BIGINT,
PRIMARY KEY (window_start, window_end) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/analytics',
'table-name' = 'hourly_sales'
)
""")
t_env.execute_sql("""
INSERT INTO hourly_sales
SELECT
TUMBLE_START(created_at, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(created_at, INTERVAL '1' HOUR) as window_end,
SUM(amount) as total_amount,
COUNT(*) as order_count
FROM orders_cdc
GROUP BY TUMBLE(created_at, INTERVAL '1' HOUR)
""")
2. マイクロサービス間データ同期
// Go での CDC コンシューマー
package main
import (
"context"
"encoding/json"
"log"
"github.com/segmentio/kafka-go"
)
type CDCEvent struct {
Payload struct {
Op string `json:"op"`
Before json.RawMessage `json:"before"`
After json.RawMessage `json:"after"`
Source struct {
Table string `json:"table"`
} `json:"source"`
} `json:"payload"`
}
func main() {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "dbserver1.public.users",
GroupID: "user-sync-service",
})
defer reader.Close()
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
log.Printf("Error reading message: %v", err)
continue
}
var event CDCEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("Error unmarshaling: %v", err)
continue
}
switch event.Payload.Op {
case "c", "r":
syncUserToLocalDB(event.Payload.After)
case "u":
updateUserInLocalDB(event.Payload.After)
case "d":
deleteUserFromLocalDB(event.Payload.Before)
}
}
}
3. 監査ログ・コンプライアンス
金融機関や医療機関では、すべてのデータ変更を監査証跡として保存する必要があります。
-- TimescaleDB での監査ログ保存
CREATE TABLE audit_log (
event_id UUID DEFAULT gen_random_uuid(),
event_time TIMESTAMPTZ NOT NULL,
table_name TEXT NOT NULL,
operation TEXT NOT NULL,
record_id TEXT NOT NULL,
old_values JSONB,
new_values JSONB,
source_metadata JSONB
);
SELECT create_hypertable('audit_log', 'event_time');
-- 90日後に自動削除
SELECT add_retention_policy('audit_log', INTERVAL '90 days');
2025年のCDC動向
1. サーバーレスCDC
AWS DMS、Google Cloud Datastream などのマネージドサービスが進化し、インフラ管理なしでCDCを利用できるようになっています。
2. AI/MLパイプラインとの統合
CDC で取得したリアルタイムデータを機械学習モデルの特徴量として活用するパターンが増加しています。
# CDC + Feature Store 統合例
from feast import FeatureStore
store = FeatureStore(repo_path="./feature_repo")
# CDC イベントから特徴量を更新
def update_features_from_cdc(cdc_event):
user_id = cdc_event['after']['user_id']
# オンライン特徴量ストアを更新
store.push(
push_source_name="user_activity_stream",
df=pd.DataFrame([{
'user_id': user_id,
'last_purchase_amount': cdc_event['after']['amount'],
'last_purchase_time': cdc_event['after']['created_at']
}])
)
3. マルチクラウド・ハイブリッドクラウド対応
異なるクラウドプロバイダー間、またはオンプレミスとクラウド間でのCDCデータ連携が標準化されています。
4. スキーマエボリューション対応
Avro や Protobuf を使用したスキーマレジストリとの統合により、スキーマ変更を安全に伝播できるようになっています。
ベストプラクティス
1. 適切なスナップショットモード選択
# 初回のみフルスナップショット(推奨)
snapshot.mode=initial
# 常に最新からキャプチャ開始
snapshot.mode=never
# スナップショットのみ(CDC なし)
snapshot.mode=initial_only
2. モニタリングの実装
# Prometheus メトリクス設定
debezium:
metrics:
enabled: true
jmx:
enabled: true
prometheus:
enabled: true
port: 9090
3. エラーハンドリング
# リトライ設定
errors.max.retries=10
errors.retry.delay.max.ms=60000
# デッドレターキュー
errors.deadletterqueue.topic.name=dlq.connector-errors
errors.deadletterqueue.context.headers.enable=true
まとめ
Change Data Capture は、2025年のデータアーキテクチャにおいて不可欠な技術となっています。Log-based CDC が最も推奨されるアプローチであり、Debezium + Kafka の組み合わせが業界標準として確立されています。
実装にあたっては、Outbox パターンやCQRSなどの設計パターンを活用し、適切なモニタリングとエラーハンドリングを組み込むことが重要です。マネージドサービスの活用により、運用負荷を軽減しながらリアルタイムデータパイプラインを構築することが可能です。
← 一覧に戻る