データレイクハウス 2025 - データウェアハウスとデータレイクの統合

2026.01.12

データレイクハウスとは

データレイクハウス(Data Lakehouse)は、データレイクの柔軟性データウェアハウスの信頼性を統合した次世代データアーキテクチャです。2025年、レイクハウスは企業のデータ戦略において中核的な選択肢となりました。

従来のアプローチ:
  データレイク(生データ保存)→ ETL → データウェアハウス(分析用)

レイクハウスアプローチ:
  データレイクハウス(生データ + 分析 + ML を一元管理)

なぜレイクハウスが注目されているのか

データレイクの課題

data_lake_challenges:
  reliability:
    - ACID トランザクションの欠如
    - データ整合性の問題
    - 同時書き込みの競合

  performance:
    - 小さなファイルの問題
    - クエリ最適化の困難さ
    - インデックスの不在

  governance:
    - スキーマ管理の複雑さ
    - データ品質の保証困難
    - バージョン管理の欠如

データウェアハウスの課題

data_warehouse_challenges:
  flexibility:
    - 非構造化データの扱いが困難
    - スキーマ変更のコスト
    - ストレージコストが高い

  scalability:
    - ML/AIワークロードに不向き
    - リアルタイム処理の制限
    - ベンダーロックイン

レイクハウスの解決策

レイクハウスが提供する価値:
・ACIDトランザクション(データレイクに信頼性を追加)
・スキーマ進化(柔軟なスキーマ変更)
・タイムトラベル(過去データへのアクセス)
・統一されたバッチ/ストリーミング処理
・オープンフォーマット(ベンダーロックイン回避)
・ML/AI統合(同一プラットフォームで分析とML)

オープンテーブルフォーマット比較

Delta Lake

Databricksが開発した最も成熟したオープンテーブルフォーマットです。

# Delta Lakeの基本操作
from delta import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DeltaLakeDemo") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# データフレームをDelta形式で保存
df = spark.read.parquet("s3://bucket/raw/sales/")
df.write.format("delta").mode("overwrite").save("s3://bucket/lakehouse/sales/")

# ACIDトランザクションでの更新
delta_table = DeltaTable.forPath(spark, "s3://bucket/lakehouse/sales/")

delta_table.update(
    condition="region = 'APAC'",
    set={"discount": "discount * 1.1"}
)

# MERGE操作(Upsert)
delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# タイムトラベル(過去バージョンへのアクセス)
df_v1 = spark.read.format("delta") \
    .option("versionAsOf", 1) \
    .load("s3://bucket/lakehouse/sales/")

# タイムスタンプでのアクセス
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2025-01-11") \
    .load("s3://bucket/lakehouse/sales/")

Apache Iceberg

Netflixが開発し、マルチエンジン対応に優れたフォーマットです。

# Apache Icebergの基本操作
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IcebergDemo") \
    .config("spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg_catalog.type", "hadoop") \
    .config("spark.sql.catalog.iceberg_catalog.warehouse", "s3://bucket/iceberg/") \
    .getOrCreate()

# テーブル作成
spark.sql("""
    CREATE TABLE iceberg_catalog.db.sales (
        id BIGINT,
        product STRING,
        amount DECIMAL(10,2),
        region STRING,
        sale_date DATE
    )
    USING iceberg
    PARTITIONED BY (region, days(sale_date))
""")

# Hidden Partitioning(パーティション変換)
# Icebergの特徴:パーティションカラムを明示的に指定不要
spark.sql("""
    INSERT INTO iceberg_catalog.db.sales
    SELECT * FROM raw_sales
""")

# スキーマ進化
spark.sql("""
    ALTER TABLE iceberg_catalog.db.sales
    ADD COLUMN customer_id BIGINT AFTER product
""")

# パーティション進化(ダウンタイムなし)
spark.sql("""
    ALTER TABLE iceberg_catalog.db.sales
    ADD PARTITION FIELD months(sale_date)
""")

# Branching(Git風のバージョン管理)
spark.sql("""
    ALTER TABLE iceberg_catalog.db.sales
    CREATE BRANCH dev_branch
""")

Apache Hudi

Uberが開発し、ストリーミング処理に強みを持つフォーマットです。

# Apache Hudiの基本操作
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HudiDemo") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .getOrCreate()

# テーブルタイプ:Copy-On-Write
hudi_options_cow = {
    'hoodie.table.name': 'sales_cow',
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.partitionpath.field': 'region',
    'hoodie.datasource.write.precombine.field': 'updated_at',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.table.type': 'COPY_ON_WRITE'
}

df.write.format("hudi") \
    .options(**hudi_options_cow) \
    .mode("append") \
    .save("s3://bucket/hudi/sales_cow/")

# テーブルタイプ:Merge-On-Read(高速書き込み)
hudi_options_mor = {
    'hoodie.table.name': 'sales_mor',
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.partitionpath.field': 'region',
    'hoodie.datasource.write.precombine.field': 'updated_at',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.table.type': 'MERGE_ON_READ'
}

# インクリメンタル読み込み(CDC対応)
incremental_df = spark.read.format("hudi") \
    .option("hoodie.datasource.query.type", "incremental") \
    .option("hoodie.datasource.read.begin.instanttime", "20250111000000") \
    .load("s3://bucket/hudi/sales_mor/")

フォーマット比較表

┌─────────────────┬──────────────┬──────────────┬──────────────┐
│      機能        │  Delta Lake  │   Iceberg    │    Hudi      │
├─────────────────┼──────────────┼──────────────┼──────────────┤
│ ACID             │      ✓       │      ✓       │      ✓       │
│ タイムトラベル    │      ✓       │      ✓       │      ✓       │
│ スキーマ進化      │      ✓       │      ✓       │      ✓       │
│ パーティション進化 │    △        │      ✓       │      △       │
│ Hidden Partition │    △        │      ✓       │      ✗       │
│ ストリーミング    │      ✓       │      △       │      ✓       │
│ CDC対応          │      ✓       │      ✓       │      ✓       │
│ Branching       │    2025新機能 │      ✓       │      ✗       │
│ マルチエンジン    │      △       │      ✓       │      △       │
│ コミュニティ      │     大       │     大       │     中       │
│ 主要採用企業      │  Databricks  │  Netflix等   │    Uber      │
└─────────────────┴──────────────┴──────────────┴──────────────┘

Databricks vs Snowflake

Databricks

Sparkベースの統合データプラットフォームです。

# Databricksでのレイクハウス構築
# Unity Catalogによる統合ガバナンス

# カタログ作成
spark.sql("""
    CREATE CATALOG IF NOT EXISTS enterprise_catalog
    MANAGED LOCATION 's3://bucket/unity-catalog/'
""")

# スキーマ作成
spark.sql("""
    CREATE SCHEMA IF NOT EXISTS enterprise_catalog.sales_domain
    COMMENT 'Sales domain data products'
""")

# マネージドテーブル作成
spark.sql("""
    CREATE TABLE enterprise_catalog.sales_domain.transactions (
        transaction_id STRING,
        customer_id STRING,
        product_id STRING,
        amount DECIMAL(10,2),
        transaction_date TIMESTAMP
    )
    USING DELTA
    PARTITIONED BY (DATE(transaction_date))
    TBLPROPERTIES (
        'delta.autoOptimize.optimizeWrite' = 'true',
        'delta.autoOptimize.autoCompact' = 'true'
    )
""")

# Photonエンジンでの高速クエリ
# Databricks SQLウェアハウスで自動的に有効化

# Delta Live Tables(宣言的パイプライン)
# pipeline.py
import dlt
from pyspark.sql.functions import *

@dlt.table(
    comment="Raw sales data",
    table_properties={"quality": "bronze"}
)
def sales_bronze():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "json") \
        .load("s3://bucket/raw/sales/")

@dlt.table(
    comment="Cleansed sales data",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_amount", "amount > 0")
@dlt.expect_or_drop("valid_date", "sale_date IS NOT NULL")
def sales_silver():
    return dlt.read_stream("sales_bronze") \
        .select(
            col("id").alias("transaction_id"),
            col("amount").cast("decimal(10,2)"),
            to_date(col("sale_date")).alias("sale_date"),
            col("region")
        )

@dlt.table(
    comment="Aggregated sales metrics",
    table_properties={"quality": "gold"}
)
def sales_gold():
    return dlt.read("sales_silver") \
        .groupBy("region", "sale_date") \
        .agg(
            sum("amount").alias("total_sales"),
            count("*").alias("transaction_count")
        )

Snowflake

クラウドネイティブなデータプラットフォームです。

-- Snowflakeでのレイクハウス構築
-- Iceberg Tablesサポート(2025年GA)

-- 外部ボリューム設定
CREATE OR REPLACE EXTERNAL VOLUME iceberg_volume
    STORAGE_LOCATIONS = (
        (
            NAME = 's3_storage'
            STORAGE_PROVIDER = 'S3'
            STORAGE_BASE_URL = 's3://bucket/iceberg/'
            STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789:role/snowflake-access'
        )
    );

-- Icebergテーブル作成
CREATE OR REPLACE ICEBERG TABLE sales_iceberg (
    transaction_id STRING,
    customer_id STRING,
    amount NUMBER(10,2),
    transaction_date TIMESTAMP
)
    CATALOG = 'SNOWFLAKE'
    EXTERNAL_VOLUME = 'iceberg_volume'
    BASE_LOCATION = 'sales/';

-- 外部Icebergカタログとの連携
CREATE OR REPLACE ICEBERG TABLE external_sales
    CATALOG = 'GLUE'
    CATALOG_NAMESPACE = 'lakehouse_db'
    CATALOG_TABLE_NAME = 'sales'
    EXTERNAL_VOLUME = 'iceberg_volume';

-- Dynamic Tables(自動更新マテリアライズドビュー)
CREATE OR REPLACE DYNAMIC TABLE sales_summary
    TARGET_LAG = '1 hour'
    WAREHOUSE = analytics_wh
AS
SELECT
    DATE_TRUNC('day', transaction_date) as sale_date,
    region,
    SUM(amount) as total_sales,
    COUNT(*) as transaction_count
FROM sales_iceberg
GROUP BY 1, 2;

-- Snowparkでの処理
-- Python UDF
CREATE OR REPLACE FUNCTION calculate_ltv(
    purchase_history ARRAY
)
RETURNS FLOAT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.10'
PACKAGES = ('pandas', 'numpy')
HANDLER = 'ltv_calculator'
AS $$
import pandas as pd
import numpy as np

def ltv_calculator(purchase_history):
    if not purchase_history:
        return 0.0
    df = pd.DataFrame(purchase_history)
    # 簡易LTV計算
    avg_purchase = df['amount'].mean()
    frequency = len(df) / 12  # 年間頻度
    lifespan = 3  # 顧客寿命(年)
    return float(avg_purchase * frequency * lifespan)
$$;

-- Cortex AI(組み込みML)
SELECT
    customer_id,
    SNOWFLAKE.CORTEX.SENTIMENT(review_text) as sentiment,
    SNOWFLAKE.CORTEX.SUMMARIZE(review_text) as summary
FROM customer_reviews;

プラットフォーム比較

comparison:
  databricks:
    strengths:
      - Spark/ML ワークロードに最適
      - Delta Lake ネイティブ
      - ノートブック中心の開発体験
      - MLflow 統合
      - フォトンエンジンによる高速化
    considerations:
      - SQL専門家には学習曲線
      - マネージドサービスコスト

  snowflake:
    strengths:
      - SQL中心の使いやすさ
      - ゼロメンテナンス
      - セキュリティ機能の充実
      - データシェアリング
      - Cortex AI統合
    considerations:
      - ストレージ/コンピュートのコスト
      - ML/AI ワークロードの制限

実装パターン

メダリオンアーキテクチャ

┌─────────────────────────────────────────────────────────────┐
│              メダリオンアーキテクチャ                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐              │
│  │  Bronze   │ →  │  Silver  │ →  │   Gold   │              │
│  │ (Raw)     │    │(Cleansed)│    │(Business)│              │
│  └──────────┘    └──────────┘    └──────────┘              │
│                                                             │
│  ・生データ取込     ・データ品質       ・集計・KPI            │
│  ・スキーマ推論     ・正規化          ・ビジネスロジック       │
│  ・監査証跡        ・重複排除         ・消費者向けビュー       │
│                                                             │
└─────────────────────────────────────────────────────────────┘
# メダリオンアーキテクチャ実装例
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable

class MedallionPipeline:
    def __init__(self, spark: SparkSession):
        self.spark = spark
        self.bronze_path = "s3://lakehouse/bronze/"
        self.silver_path = "s3://lakehouse/silver/"
        self.gold_path = "s3://lakehouse/gold/"

    def ingest_to_bronze(self, source_path: str, table_name: str):
        """
        Bronze層:生データの取り込み
        """
        df = self.spark.read.format("json") \
            .option("inferSchema", "true") \
            .load(source_path)

        # メタデータ追加
        df_with_metadata = df \
            .withColumn("_ingestion_timestamp", current_timestamp()) \
            .withColumn("_source_file", input_file_name())

        # Bronze テーブルへ書き込み
        df_with_metadata.write.format("delta") \
            .mode("append") \
            .option("mergeSchema", "true") \
            .save(f"{self.bronze_path}{table_name}")

        return df_with_metadata

    def transform_to_silver(self, table_name: str, transformations: dict):
        """
        Silver層:データクレンジングと正規化
        """
        bronze_df = self.spark.read.format("delta") \
            .load(f"{self.bronze_path}{table_name}")

        # 変換適用
        silver_df = bronze_df

        # NULL値処理
        if "null_handling" in transformations:
            for col_name, default_val in transformations["null_handling"].items():
                silver_df = silver_df.fillna({col_name: default_val})

        # 型変換
        if "type_casting" in transformations:
            for col_name, new_type in transformations["type_casting"].items():
                silver_df = silver_df.withColumn(
                    col_name,
                    col(col_name).cast(new_type)
                )

        # 重複排除
        if transformations.get("deduplicate"):
            key_cols = transformations["deduplicate"]["keys"]
            order_col = transformations["deduplicate"]["order_by"]
            window = Window.partitionBy(key_cols).orderBy(desc(order_col))
            silver_df = silver_df.withColumn("_rank", row_number().over(window)) \
                .filter(col("_rank") == 1) \
                .drop("_rank")

        # Silver テーブルへマージ
        silver_df.write.format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .save(f"{self.silver_path}{table_name}")

        return silver_df

    def aggregate_to_gold(self, config: dict):
        """
        Gold層:ビジネスメトリクスの集計
        """
        # 複数Silverテーブルの結合
        base_df = None
        for table_name in config["source_tables"]:
            df = self.spark.read.format("delta") \
                .load(f"{self.silver_path}{table_name}")
            if base_df is None:
                base_df = df
            else:
                join_key = config["joins"][table_name]
                base_df = base_df.join(df, on=join_key, how="left")

        # 集計
        gold_df = base_df.groupBy(config["group_by"]) \
            .agg(*[
                eval(agg_expr).alias(alias)
                for alias, agg_expr in config["aggregations"].items()
            ])

        # Gold テーブルへ書き込み
        gold_df.write.format("delta") \
            .mode("overwrite") \
            .save(f"{self.gold_path}{config['output_table']}")

        return gold_df

ストリーミングレイクハウス

# リアルタイムデータ取り込み
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("StreamingLakehouse") \
    .getOrCreate()

# Kafkaからのストリーミング取り込み
kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "sales_events") \
    .option("startingOffsets", "latest") \
    .load()

# JSONパース
parsed_stream = kafka_stream \
    .select(
        from_json(
            col("value").cast("string"),
            schema
        ).alias("data"),
        col("timestamp").alias("kafka_timestamp")
    ) \
    .select("data.*", "kafka_timestamp")

# Delta Lakeへのストリーミング書き込み
query = parsed_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "s3://lakehouse/checkpoints/sales/") \
    .trigger(processingTime="1 minute") \
    .start("s3://lakehouse/bronze/sales/")

# Change Data Feed(CDC)の有効化
spark.sql("""
    ALTER TABLE delta.`s3://lakehouse/silver/customers/`
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# CDCストリームの読み込み
cdc_stream = spark.readStream \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .load("s3://lakehouse/silver/customers/")

# 変更タイプごとの処理
cdc_stream.filter(col("_change_type") == "update_postimage") \
    .writeStream \
    .format("delta") \
    .option("checkpointLocation", "s3://lakehouse/checkpoints/cdc/") \
    .start("s3://lakehouse/silver/customer_updates/")

コスト最適化

ストレージ最適化

# Delta Lake最適化
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "s3://lakehouse/silver/sales/")

# OPTIMIZE(小さなファイルの統合)
delta_table.optimize().executeCompaction()

# Z-ORDER(クエリ最適化のためのデータ配置)
delta_table.optimize() \
    .where("sale_date >= '2025-01-01'") \
    .executeZOrderBy("customer_id", "product_id")

# VACUUM(古いファイルの削除)
delta_table.vacuum(retentionHours=168)  # 7日間保持

# 自動最適化設定
spark.sql("""
    ALTER TABLE delta.`s3://lakehouse/silver/sales/`
    SET TBLPROPERTIES (
        'delta.autoOptimize.optimizeWrite' = 'true',
        'delta.autoOptimize.autoCompact' = 'true',
        'delta.targetFileSize' = '134217728'  -- 128MB
    )
""")

コンピュート最適化

compute_optimization:
  databricks:
    - name: "Photonエンジン有効化"
      impact: "2-8倍の高速化"
      cost: "追加コストあり"

    - name: "Serverless SQL Warehouse"
      impact: "アイドル時間のコスト削減"
      use_case: "間欠的なクエリワークロード"

    - name: "クラスター自動スケーリング"
      config:
        min_workers: 2
        max_workers: 10
        autoscale_mode: "optimized"

  snowflake:
    - name: "Warehouse自動サスペンド"
      config:
        auto_suspend: 60  # 秒
        auto_resume: true

    - name: "クエリ結果キャッシュ"
      impact: "繰り返しクエリのコスト削減"

    - name: "マルチクラスター"
      use_case: "同時実行の多いワークロード"

コストモニタリング

-- Databricks: コスト分析
SELECT
    workspace_id,
    cluster_name,
    DATE_TRUNC('day', usage_date) as date,
    SUM(usage_quantity) as dbu_consumed,
    SUM(usage_quantity * list_price) as estimated_cost
FROM system.billing.usage
WHERE usage_date >= DATEADD(day, -30, CURRENT_DATE)
GROUP BY 1, 2, 3
ORDER BY estimated_cost DESC;

-- Snowflake: コスト分析
SELECT
    WAREHOUSE_NAME,
    DATE_TRUNC('day', START_TIME) as date,
    SUM(CREDITS_USED) as credits_consumed,
    SUM(CREDITS_USED) * 3.00 as estimated_cost  -- $3/credit想定
FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY
WHERE START_TIME >= DATEADD(day, -30, CURRENT_DATE())
GROUP BY 1, 2
ORDER BY estimated_cost DESC;

2025年の動向

主要トレンド

1. オープンフォーマットの標準化
   - Delta Lake、Iceberg、Hudiの相互運用性向上
   - Delta Lake UniForm(複数フォーマット対応)
   - Apache XTable(フォーマット変換)

2. AI/ML統合の深化
   - フィーチャーストアとの統合
   - ベクトル検索機能の追加
   - LLMワークロードのサポート

3. リアルタイム処理の強化
   - サブ秒レイテンシの実現
   - ストリーミングとバッチの完全統合
   - 増分処理の最適化

4. ガバナンス機能の充実
   - Unity Catalog/Polaris Catalogの普及
   - データリネージの自動化
   - コンプライアンス機能の強化

フォーマット収束

# Delta Lake UniForm(2025年)
# 1つのテーブルで複数フォーマット対応
spark.sql("""
    CREATE TABLE unified_sales (
        id BIGINT,
        amount DECIMAL(10,2),
        sale_date DATE
    )
    USING DELTA
    TBLPROPERTIES (
        'delta.universalFormat.enabledFormats' = 'iceberg,hudi'
    )
""")

# Deltaで書き込み、Icebergで読み込み可能
# これにより、ツール選択の自由度が向上

AI統合

# Databricks: Vector Search統合
from databricks.vector_search.client import VectorSearchClient

client = VectorSearchClient()

# ベクトルインデックス作成
index = client.create_delta_sync_index(
    endpoint_name="vector_search_endpoint",
    index_name="product_embeddings",
    source_table_name="catalog.schema.products",
    primary_key="product_id",
    embedding_source_column="description",
    embedding_model_endpoint_name="embedding_model"
)

# 類似検索
results = index.similarity_search(
    query_text="高性能ノートパソコン",
    columns=["product_id", "name", "price"],
    num_results=10
)

ベストプラクティス

成功のポイント

1. 段階的な移行
   - 既存DWHを一度に置き換えない
   - ユースケース単位で移行
   - 並行運用期間を設ける

2. ガバナンスの確立
   - データカタログの整備
   - アクセス制御の一元化
   - 品質基準の定義

3. コスト管理
   - ストレージ階層化
   - コンピュート最適化
   - 定期的なVACUUM/OPTIMIZE

4. チーム体制
   - プラットフォームチームの設置
   - データエンジニアの育成
   - ドメインチームとの協業

参考: Databricks Lakehouse Architecture

まとめ

2025年のデータレイクハウスは、データ基盤の新標準として確立されました。Delta Lake、Iceberg、Hudiといったオープンフォーマットの成熟により、ベンダーロックインを回避しながらACID、スキーマ進化、タイムトラベルといった高度な機能を利用できます。DatabricksとSnowflakeはそれぞれの強みを活かしながらレイクハウス機能を強化しており、組織のニーズに応じた選択が可能です。AI/ML統合、リアルタイム処理、ガバナンス機能の進化により、レイクハウスは今後もデータアーキテクチャの中核として発展を続けるでしょう。

この技術を体系的に学びたいですか?

未来学では東証プライム上場企業のITエンジニアが24時間サポート。月額24,800円から、退会金0円のオンラインIT塾です。

LINEで無料相談する
← 一覧に戻る