Change Data Capture 2025 - リアルタイムデータ同期の標準手法

2026.01.12

Change Data Capture(CDC)とは

Change Data Capture(CDC)は、データベースの変更(INSERT、UPDATE、DELETE)をリアルタイムで検出し、他のシステムに伝播させる技術です。2025年、マイクロサービスアーキテクチャの普及とリアルタイムデータパイプラインの需要増加により、CDCは現代のデータアーキテクチャにおける標準的なアプローチとなっています。

従来のバッチ処理によるデータ同期では、数時間から数日の遅延が発生していました。CDCを導入することで、ミリ秒単位でのデータ同期が可能となり、分析、キャッシュ更新、イベント駆動アーキテクチャの基盤として活用されています。

CDCの仕組み

CDCには主に3つのアプローチがあります。それぞれ特性が異なるため、ユースケースに応じた選択が重要です。

1. Log-based CDC(ログベース)

データベースのトランザクションログ(WAL、binlogなど)を直接読み取る方式。最も推奨されるアプローチです。

-- PostgreSQL での WAL レベル設定
ALTER SYSTEM SET wal_level = 'logical';
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;

-- レプリケーションスロットの作成
SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput');

メリット:

  • データベースへの負荷が最小限
  • すべての変更を正確にキャプチャ
  • トランザクション順序が保証される
  • パフォーマンスへの影響が少ない

デメリット:

  • データベース固有の設定が必要
  • 管理者権限が必要な場合がある
  • ログの保持期間に依存

2. Trigger-based CDC(トリガーベース)

データベーストリガーを使用して変更を検出し、シャドウテーブルに記録する方式。

-- PostgreSQL でのトリガーベース CDC 実装例
CREATE TABLE orders_audit (
    audit_id SERIAL PRIMARY KEY,
    order_id INTEGER,
    operation VARCHAR(10),
    old_data JSONB,
    new_data JSONB,
    changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE OR REPLACE FUNCTION capture_order_changes()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        INSERT INTO orders_audit (order_id, operation, new_data)
        VALUES (NEW.id, 'INSERT', row_to_json(NEW)::jsonb);
        RETURN NEW;
    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO orders_audit (order_id, operation, old_data, new_data)
        VALUES (NEW.id, 'UPDATE', row_to_json(OLD)::jsonb, row_to_json(NEW)::jsonb);
        RETURN NEW;
    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO orders_audit (order_id, operation, old_data)
        VALUES (OLD.id, 'DELETE', row_to_json(OLD)::jsonb);
        RETURN OLD;
    END IF;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER orders_cdc_trigger
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION capture_order_changes();

メリット:

  • 特別な権限不要
  • カスタムロジックを追加可能
  • どのデータベースでも実装可能

デメリット:

  • 書き込みパフォーマンスに影響
  • シャドウテーブルの管理が必要
  • トランザクション分離の複雑さ

3. Query-based CDC(クエリベース)

定期的にテーブルをクエリし、タイムスタンプやバージョン番号で変更を検出する方式。

import asyncio
from datetime import datetime, timedelta
import asyncpg

class QueryBasedCDC:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.last_sync_time = datetime.utcnow() - timedelta(hours=1)

    async def capture_changes(self, table: str, timestamp_column: str):
        conn = await asyncpg.connect(self.connection_string)
        try:
            query = f"""
                SELECT * FROM {table}
                WHERE {timestamp_column} > $1
                ORDER BY {timestamp_column}
            """
            rows = await conn.fetch(query, self.last_sync_time)
            self.last_sync_time = datetime.utcnow()
            return rows
        finally:
            await conn.close()

    async def poll_continuously(self, table: str, timestamp_column: str, interval: int = 10):
        while True:
            changes = await self.capture_changes(table, timestamp_column)
            if changes:
                await self.process_changes(changes)
            await asyncio.sleep(interval)

    async def process_changes(self, changes):
        for row in changes:
            print(f"Change detected: {dict(row)}")

メリット:

  • 実装が簡単
  • データベース設定変更不要
  • レガシーシステムでも使用可能

デメリット:

  • DELETE を検出できない
  • ポーリング間隔に依存
  • 高頻度更新には不向き

主要CDCツール比較

Debezium

Red Hat が開発するオープンソースの CDC プラットフォーム。2025年時点で最も広く採用されています。

# Debezium PostgreSQL Connector 設定
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: postgres-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  tasksMax: 1
  config:
    database.hostname: postgres
    database.port: 5432
    database.user: debezium
    database.password: ${secrets:postgres-credentials:password}
    database.dbname: inventory
    database.server.name: dbserver1
    plugin.name: pgoutput
    slot.name: debezium_slot
    publication.name: dbz_publication
    table.include.list: public.orders,public.customers

    # スナップショット設定
    snapshot.mode: initial

    # トランスフォーメーション
    transforms: unwrap
    transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
    transforms.unwrap.drop.tombstones: false
    transforms.unwrap.delete.handling.mode: rewrite

Debezium の出力フォーマット:

{
  "schema": {...},
  "payload": {
    "before": {
      "id": 1,
      "customer_name": "田中太郎",
      "amount": 10000
    },
    "after": {
      "id": 1,
      "customer_name": "田中太郎",
      "amount": 15000
    },
    "source": {
      "version": "2.5.0.Final",
      "connector": "postgresql",
      "name": "dbserver1",
      "ts_ms": 1704067200000,
      "db": "inventory",
      "schema": "public",
      "table": "orders",
      "txId": 12345,
      "lsn": 98765432
    },
    "op": "u",
    "ts_ms": 1704067200100
  }
}

特徴:

  • PostgreSQL、MySQL、MongoDB、SQL Server、Oracle 対応
  • Kafka Connect ベースのアーキテクチャ
  • 正確な once semantics をサポート
  • スキーマレジストリとの統合

Airbyte

オープンソースのデータ統合プラットフォームで、CDC 機能を内蔵しています。

# Airbyte CDC ソース設定(PostgreSQL)
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
connectionConfiguration:
  host: localhost
  port: 5432
  database: mydb
  username: airbyte_user
  password: ${AIRBYTE_PASSWORD}
  schemas:
    - public
  replication_method:
    method: CDC
    plugin: pgoutput
    replication_slot: airbyte_slot
    publication: airbyte_publication
    initial_waiting_seconds: 300
# Airbyte Python SDK による CDC ストリーム処理
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.streams import Stream

class CDCStream(Stream):
    primary_key = "id"
    cursor_field = "_ab_cdc_lsn"

    def get_json_schema(self):
        return {
            "type": "object",
            "properties": {
                "id": {"type": "integer"},
                "data": {"type": "object"},
                "_ab_cdc_lsn": {"type": "integer"},
                "_ab_cdc_updated_at": {"type": "string"},
                "_ab_cdc_deleted_at": {"type": ["string", "null"]}
            }
        }

    def read_records(self, sync_mode, cursor_field=None, stream_slice=None, stream_state=None):
        # CDC レコードの読み取り処理
        last_lsn = stream_state.get("_ab_cdc_lsn", 0) if stream_state else 0

        for record in self.fetch_cdc_records(last_lsn):
            yield record

特徴:

  • 300+ のコネクタ
  • WebUI による設定管理
  • Kubernetes 対応
  • 増分同期とフル同期の両方に対応

Fivetran

エンタープライズ向けのマネージド CDC サービス。

-- Fivetran 用のデータベース設定(PostgreSQL)
CREATE USER fivetran WITH PASSWORD 'secure_password';
GRANT USAGE ON SCHEMA public TO fivetran;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO fivetran;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO fivetran;

-- CDC 用の追加権限
GRANT rds_replication TO fivetran;  -- AWS RDS の場合

特徴:

  • フルマネージドサービス
  • 自動スキーマ変更検出
  • SLA 保証(99.9%)
  • 豊富なデスティネーション対応

ツール選択ガイド

要件DebeziumAirbyteFivetran
コスト無料(OSS)無料/有料有料
運用負荷
カスタマイズ性
エンタープライズサポートRed HatCloud版あり
レイテンシ最小
対応DB数

Kafka との統合

CDC と Apache Kafka の組み合わせは、2025年のリアルタイムデータアーキテクチャの標準となっています。

Debezium + Kafka 構成

// Kafka Streams による CDC イベント処理
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Consumed;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;

public class CDCStreamProcessor {

    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();

        // CDC イベントストリームを購読
        KStream<String, OrderEvent> orderChanges = builder.stream(
            "dbserver1.public.orders",
            Consumed.with(Serdes.String(), new SpecificAvroSerde<>())
        );

        // INSERT イベントをフィルタリング
        KStream<String, OrderEvent> newOrders = orderChanges
            .filter((key, value) -> "c".equals(value.getOp()));

        // UPDATE イベントで金額変更を検出
        KStream<String, OrderEvent> priceChanges = orderChanges
            .filter((key, value) -> "u".equals(value.getOp()))
            .filter((key, value) -> {
                var before = value.getBefore();
                var after = value.getAfter();
                return !before.getAmount().equals(after.getAmount());
            });

        // 通知システムに転送
        newOrders.to("notifications.new-orders");
        priceChanges.to("notifications.price-changes");

        // ストリーム処理を開始
        KafkaStreams streams = new KafkaStreams(builder.build(), getConfig());
        streams.start();
    }
}

Kafka Connect 設定

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.dbname": "inventory",
    "topic.prefix": "dbserver1",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",

    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

実装パターン

パターン1: Outbox パターン

マイクロサービス間のイベント伝播に最適なパターンです。

-- Outbox テーブルの作成
CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(255) NOT NULL,
    aggregate_id VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- インデックス作成
CREATE INDEX idx_outbox_created_at ON outbox(created_at);
// Spring Boot での Outbox パターン実装
@Service
@Transactional
public class OrderService {

    @Autowired
    private OrderRepository orderRepository;

    @Autowired
    private OutboxRepository outboxRepository;

    public Order createOrder(CreateOrderRequest request) {
        // 1. 注文を作成
        Order order = new Order(request);
        order = orderRepository.save(order);

        // 2. Outbox にイベントを記録(同一トランザクション)
        OutboxEvent event = OutboxEvent.builder()
            .aggregateType("Order")
            .aggregateId(order.getId().toString())
            .eventType("OrderCreated")
            .payload(toJson(new OrderCreatedEvent(order)))
            .build();

        outboxRepository.save(event);

        return order;
    }
}
# Debezium Outbox Event Router 設定
transforms: outbox
transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
transforms.outbox.route.by.field: aggregate_type
transforms.outbox.table.field.event.type: event_type
transforms.outbox.table.field.event.payload: payload
transforms.outbox.route.topic.replacement: events.${routedByValue}

パターン2: CQRS + Event Sourcing

# Python での CQRS 読み取りモデル更新
import json
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch

class CDCConsumer:
    def __init__(self):
        self.consumer = KafkaConsumer(
            'dbserver1.public.orders',
            bootstrap_servers=['localhost:9092'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            group_id='order-search-updater',
            auto_offset_reset='earliest'
        )
        self.es = Elasticsearch(['http://localhost:9200'])

    def process_events(self):
        for message in self.consumer:
            event = message.value
            payload = event.get('payload', {})
            operation = payload.get('op')

            if operation == 'c':  # CREATE
                self.handle_insert(payload['after'])
            elif operation == 'u':  # UPDATE
                self.handle_update(payload['after'])
            elif operation == 'd':  # DELETE
                self.handle_delete(payload['before'])

    def handle_insert(self, data):
        self.es.index(
            index='orders',
            id=data['id'],
            document={
                'customer_name': data['customer_name'],
                'amount': data['amount'],
                'status': data['status'],
                'created_at': data['created_at']
            }
        )

    def handle_update(self, data):
        self.es.update(
            index='orders',
            id=data['id'],
            doc={
                'customer_name': data['customer_name'],
                'amount': data['amount'],
                'status': data['status']
            }
        )

    def handle_delete(self, data):
        self.es.delete(index='orders', id=data['id'])

パターン3: キャッシュ無効化

// TypeScript での Redis キャッシュ無効化
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { createClient, RedisClientType } from 'redis';

interface CDCPayload {
  op: 'c' | 'u' | 'd' | 'r';
  before: Record<string, unknown> | null;
  after: Record<string, unknown> | null;
  source: {
    table: string;
    db: string;
  };
}

class CacheInvalidator {
  private consumer: Consumer;
  private redis: RedisClientType;

  async initialize(): Promise<void> {
    const kafka = new Kafka({
      clientId: 'cache-invalidator',
      brokers: ['localhost:9092']
    });

    this.consumer = kafka.consumer({ groupId: 'cache-invalidation-group' });
    this.redis = createClient({ url: 'redis://localhost:6379' });

    await this.consumer.connect();
    await this.redis.connect();
    await this.consumer.subscribe({ topic: /dbserver1\.public\..*/ });
  }

  async run(): Promise<void> {
    await this.consumer.run({
      eachMessage: async ({ topic, message }: EachMessagePayload) => {
        const payload: CDCPayload = JSON.parse(message.value!.toString());
        await this.invalidateCache(payload);
      }
    });
  }

  private async invalidateCache(payload: CDCPayload): Promise<void> {
    const table = payload.source.table;
    const id = payload.after?.id || payload.before?.id;

    if (!id) return;

    // 個別キーの削除
    const cacheKey = `${table}:${id}`;
    await this.redis.del(cacheKey);

    // 関連するリストキャッシュも削除
    const listPattern = `${table}:list:*`;
    const keys = await this.redis.keys(listPattern);
    if (keys.length > 0) {
      await this.redis.del(keys);
    }

    console.log(`Cache invalidated: ${cacheKey}`);
  }
}

ユースケース

1. リアルタイム分析ダッシュボード

# Apache Flink での CDC ストリーム処理
from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

# CDC ソーステーブルの定義
t_env.execute_sql("""
    CREATE TABLE orders_cdc (
        id INT,
        customer_id INT,
        amount DECIMAL(10, 2),
        status STRING,
        created_at TIMESTAMP(3),
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'dbserver1.public.orders',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'debezium-json'
    )
""")

# リアルタイム集計
t_env.execute_sql("""
    CREATE TABLE hourly_sales (
        window_start TIMESTAMP(3),
        window_end TIMESTAMP(3),
        total_amount DECIMAL(12, 2),
        order_count BIGINT,
        PRIMARY KEY (window_start, window_end) NOT ENFORCED
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://localhost:5432/analytics',
        'table-name' = 'hourly_sales'
    )
""")

t_env.execute_sql("""
    INSERT INTO hourly_sales
    SELECT
        TUMBLE_START(created_at, INTERVAL '1' HOUR) as window_start,
        TUMBLE_END(created_at, INTERVAL '1' HOUR) as window_end,
        SUM(amount) as total_amount,
        COUNT(*) as order_count
    FROM orders_cdc
    GROUP BY TUMBLE(created_at, INTERVAL '1' HOUR)
""")

2. マイクロサービス間データ同期

// Go での CDC コンシューマー
package main

import (
    "context"
    "encoding/json"
    "log"

    "github.com/segmentio/kafka-go"
)

type CDCEvent struct {
    Payload struct {
        Op     string          `json:"op"`
        Before json.RawMessage `json:"before"`
        After  json.RawMessage `json:"after"`
        Source struct {
            Table string `json:"table"`
        } `json:"source"`
    } `json:"payload"`
}

func main() {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "dbserver1.public.users",
        GroupID: "user-sync-service",
    })
    defer reader.Close()

    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            log.Printf("Error reading message: %v", err)
            continue
        }

        var event CDCEvent
        if err := json.Unmarshal(msg.Value, &event); err != nil {
            log.Printf("Error unmarshaling: %v", err)
            continue
        }

        switch event.Payload.Op {
        case "c", "r":
            syncUserToLocalDB(event.Payload.After)
        case "u":
            updateUserInLocalDB(event.Payload.After)
        case "d":
            deleteUserFromLocalDB(event.Payload.Before)
        }
    }
}

3. 監査ログ・コンプライアンス

金融機関や医療機関では、すべてのデータ変更を監査証跡として保存する必要があります。

-- TimescaleDB での監査ログ保存
CREATE TABLE audit_log (
    event_id UUID DEFAULT gen_random_uuid(),
    event_time TIMESTAMPTZ NOT NULL,
    table_name TEXT NOT NULL,
    operation TEXT NOT NULL,
    record_id TEXT NOT NULL,
    old_values JSONB,
    new_values JSONB,
    source_metadata JSONB
);

SELECT create_hypertable('audit_log', 'event_time');

-- 90日後に自動削除
SELECT add_retention_policy('audit_log', INTERVAL '90 days');

2025年のCDC動向

1. サーバーレスCDC

AWS DMS、Google Cloud Datastream などのマネージドサービスが進化し、インフラ管理なしでCDCを利用できるようになっています。

2. AI/MLパイプラインとの統合

CDC で取得したリアルタイムデータを機械学習モデルの特徴量として活用するパターンが増加しています。

# CDC + Feature Store 統合例
from feast import FeatureStore

store = FeatureStore(repo_path="./feature_repo")

# CDC イベントから特徴量を更新
def update_features_from_cdc(cdc_event):
    user_id = cdc_event['after']['user_id']

    # オンライン特徴量ストアを更新
    store.push(
        push_source_name="user_activity_stream",
        df=pd.DataFrame([{
            'user_id': user_id,
            'last_purchase_amount': cdc_event['after']['amount'],
            'last_purchase_time': cdc_event['after']['created_at']
        }])
    )

3. マルチクラウド・ハイブリッドクラウド対応

異なるクラウドプロバイダー間、またはオンプレミスとクラウド間でのCDCデータ連携が標準化されています。

4. スキーマエボリューション対応

Avro や Protobuf を使用したスキーマレジストリとの統合により、スキーマ変更を安全に伝播できるようになっています。

ベストプラクティス

1. 適切なスナップショットモード選択

# 初回のみフルスナップショット(推奨)
snapshot.mode=initial

# 常に最新からキャプチャ開始
snapshot.mode=never

# スナップショットのみ(CDC なし)
snapshot.mode=initial_only

2. モニタリングの実装

# Prometheus メトリクス設定
debezium:
  metrics:
    enabled: true
    jmx:
      enabled: true
    prometheus:
      enabled: true
      port: 9090

3. エラーハンドリング

# リトライ設定
errors.max.retries=10
errors.retry.delay.max.ms=60000

# デッドレターキュー
errors.deadletterqueue.topic.name=dlq.connector-errors
errors.deadletterqueue.context.headers.enable=true

まとめ

Change Data Capture は、2025年のデータアーキテクチャにおいて不可欠な技術となっています。Log-based CDC が最も推奨されるアプローチであり、Debezium + Kafka の組み合わせが業界標準として確立されています。

実装にあたっては、Outbox パターンやCQRSなどの設計パターンを活用し、適切なモニタリングとエラーハンドリングを組み込むことが重要です。マネージドサービスの活用により、運用負荷を軽減しながらリアルタイムデータパイプラインを構築することが可能です。

参考: Debezium Documentation, Confluent CDC Guide

この技術を体系的に学びたいですか?

未来学では東証プライム上場企業のITエンジニアが24時間サポート。月額24,800円から、退会金0円のオンラインIT塾です。

LINEで無料相談する
← 一覧に戻る