Apache Icebergとは
Apache Icebergは、Netflixによって開発されたオープンテーブルフォーマットです。大規模な分析テーブルに対してACIDトランザクション、スキーマ進化、タイムトラベルなどの機能を提供し、データレイクハウスアーキテクチャの中核技術として急速に普及しています。
2025年、Apache Icebergは事実上の業界標準としての地位を確立し、Snowflake、Databricks、AWS、Googleなど主要ベンダーがフルサポートを表明しました。
Iceberg V3仕様の新機能
2025年にリリースされたV3仕様は、パフォーマンスのボトルネックを解消する革新的な機能を導入しました。
Deletion Vectors(削除ベクトル)
行レベルの更新パフォーマンスを劇的に向上させる機能です。従来の「読み込み-変更-書き込み」操作を回避し、効率的なデータ更新を実現します。
-- V3のDeletion Vectorsを活用した効率的な更新
UPDATE iceberg_catalog.db.orders
SET status = 'shipped'
WHERE order_id = 12345;
-- 従来: データファイル全体を書き換え
-- V3: 削除ベクトルのみを記録(高速)
Default Column Values(デフォルト列値)
スキーマ進化時の摩擦を解消します。列追加時にデフォルト値を指定でき、メタデータのみの変更で完了します。
-- デフォルト値付きの列追加(V3)
ALTER TABLE iceberg_catalog.db.users
ADD COLUMN subscription_tier STRING DEFAULT 'free';
-- 既存データファイルは変更不要
-- 新しい行のみにデフォルト値が適用
Row-Level Lineage(行レベル系統追跡)
監査やCDC(Change Data Capture)パイプラインのために、行の履歴を追跡します。
-- 行の追加・変更時刻をメタデータで追跡
SELECT
_row_id,
_row_created_at,
_row_last_modified_at,
*
FROM iceberg_catalog.db.transactions;
リッチデータ型
V3では表現力豊かな型システムが導入されました:
-- VARIANT型(半構造化データ用)
CREATE TABLE events (
event_id BIGINT,
payload VARIANT, -- JSONライクなデータを格納
location GEOMETRY, -- 地理空間データ
created_at TIMESTAMP(9) -- ナノ秒精度
) USING iceberg;
-- VARIANTへのクエリ
SELECT
event_id,
payload:user_id::STRING as user_id,
payload:action::STRING as action
FROM events;
Delta Lake・Hudiとの比較
データレイクハウスの三大フォーマット(Iceberg、Delta Lake、Hudi)はそれぞれ異なる強みを持っています。
機能比較
| 機能 | Apache Iceberg | Delta Lake | Apache Hudi |
|---|---|---|---|
| 開発元 | Netflix | Databricks | Uber |
| スキーマ進化 | 完全サポート | サポート | サポート |
| パーティション進化 | 隠れたパーティショニング | 限定的 | 限定的 |
| タイムトラベル | スナップショット単位 | バージョン単位 | インスタント単位 |
| エンジン互換性 | 最も広範 | Spark中心 | Spark/Flink |
| 行レベル更新 | Merge-on-Read | Copy-on-Write | Merge-on-Read |
| メタデータ管理 | 分散マニフェスト | トランザクションログ | タイムライン |
ユースケース別推奨
┌─────────────────────────────────────────────────────────────┐
│ ユースケース │ 推奨フォーマット │
├─────────────────────────────────────────────────────────────┤
│ マルチエンジン分析 │ Apache Iceberg │
│ Sparkバッチ処理中心 │ Delta Lake │
│ リアルタイムCDC/ストリーミング │ Apache Hudi │
│ スキーマ頻繁変更 │ Apache Iceberg │
│ 大規模アドホッククエリ │ Apache Iceberg │
└─────────────────────────────────────────────────────────────┘
Apache XTableによる相互運用
フォーマット選択に迷う場合、Apache XTable(Incubating)が解決策を提供します:
# XTableによるフォーマット変換
java -jar xtable-utilities.jar \
--sourceFormat iceberg \
--targetFormat delta \
--tablePath s3://bucket/iceberg_table \
--outputPath s3://bucket/delta_table
主要ベンダーのサポート状況
Databricks
Unity Catalog管理のIcebergテーブルがDatabricks Runtime 16.4 LTS以降でサポートされています。
# DatabricksでのIcebergテーブル作成
spark.sql("""
CREATE TABLE unity_catalog.schema.sales (
sale_id BIGINT,
product_name STRING,
amount DECIMAL(10,2),
sale_date DATE
)
USING iceberg
PARTITIONED BY (days(sale_date))
""")
# 外部Icebergカタログへのアクセス
spark.sql("""
CREATE CATALOG aws_glue_catalog
USING iceberg
OPTIONS (
'type' = 'glue',
'warehouse' = 's3://my-warehouse'
)
""")
Snowflake
2025年10月にIcebergテーブルへの書き込みサポートがGA(一般提供)となりました。
-- SnowflakeでのIcebergテーブル作成
CREATE ICEBERG TABLE my_db.my_schema.customer_data (
customer_id INT,
name STRING,
email STRING,
created_at TIMESTAMP
)
CATALOG = 'SNOWFLAKE'
EXTERNAL_VOLUME = 'my_s3_volume'
BASE_LOCATION = 'customer_data/';
-- 外部Icebergカタログとの連携
CREATE CATALOG INTEGRATION aws_glue_iceberg
CATALOG_SOURCE = GLUE
GLUE_AWS_ROLE_ARN = 'arn:aws:iam::123456789:role/SnowflakeGlueRole'
GLUE_CATALOG_ID = '123456789012'
TABLE_FORMAT = ICEBERG;
-- RESTカタログ統合(Polaris、Unity Catalog等)
CREATE CATALOG INTEGRATION polaris_iceberg
CATALOG_SOURCE = ICEBERG_REST
TABLE_FORMAT = ICEBERG
CATALOG_NAMESPACE = 'my_namespace'
REST_CONFIG = (
CATALOG_URI = 'https://polaris.example.com/api/catalog',
WAREHOUSE = 'my_warehouse'
);
AWS
AWS Glue、EMR、Athenaで包括的なIcebergサポートを提供しています。
# AWS GlueでのIcebergジョブ
from awsglue.context import GlueContext
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.glue_catalog.warehouse", "s3://my-bucket/warehouse") \
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.getOrCreate()
# Icebergテーブルへの書き込み
df.writeTo("glue_catalog.database.table") \
.using("iceberg") \
.partitionedBy("date") \
.createOrReplace()
Google Cloud
BigLakeを通じてIcebergテーブルをサポートし、BigQueryとの統合を実現しています。
-- BigQueryからIcebergテーブルへのアクセス
CREATE EXTERNAL TABLE my_project.my_dataset.iceberg_table
WITH CONNECTION `my_project.us.biglake_connection`
OPTIONS (
format = 'ICEBERG',
uris = ['gs://my-bucket/iceberg-warehouse/table'],
metadata_cache_mode = 'AUTOMATIC'
);
実装例
Apache Sparkでの利用
from pyspark.sql import SparkSession
# Spark + Iceberg設定
spark = SparkSession.builder \
.appName("Iceberg Demo") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.demo.type", "hadoop") \
.config("spark.sql.catalog.demo.warehouse", "s3://bucket/warehouse") \
.getOrCreate()
# テーブル作成(隠れたパーティショニング)
spark.sql("""
CREATE TABLE demo.db.events (
event_id BIGINT,
event_type STRING,
user_id BIGINT,
event_data STRING,
event_time TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(event_time), bucket(16, user_id))
""")
# データ挿入
spark.sql("""
INSERT INTO demo.db.events VALUES
(1, 'click', 100, '{"page": "home"}', timestamp '2025-01-10 10:00:00'),
(2, 'purchase', 100, '{"product": "laptop"}', timestamp '2025-01-10 11:30:00'),
(3, 'click', 200, '{"page": "product"}', timestamp '2025-01-11 09:15:00')
""")
# タイムトラベルクエリ
spark.sql("""
SELECT * FROM demo.db.events
VERSION AS OF 'snap-1234567890'
""")
# スナップショット履歴の確認
spark.sql("""
SELECT * FROM demo.db.events.snapshots
""")
# Merge操作(V3 Deletion Vectors活用)
spark.sql("""
MERGE INTO demo.db.events t
USING updates s
ON t.event_id = s.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Trinoでの利用
-- Trinoカタログ設定(etc/catalog/iceberg.properties)
-- connector.name=iceberg
-- iceberg.catalog.type=glue
-- iceberg.file-format=PARQUET
-- hive.metastore.glue.region=ap-northeast-1
-- テーブルクエリ
SELECT
event_type,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users
FROM iceberg.db.events
WHERE event_time >= TIMESTAMP '2025-01-01 00:00:00'
GROUP BY event_type
ORDER BY event_count DESC;
-- メタデータクエリ
SELECT * FROM iceberg.db.events$snapshots;
SELECT * FROM iceberg.db.events$manifests;
SELECT * FROM iceberg.db.events$files;
-- パーティションプルーニングの確認
EXPLAIN ANALYZE
SELECT * FROM iceberg.db.events
WHERE event_time BETWEEN TIMESTAMP '2025-01-10' AND TIMESTAMP '2025-01-11';
Apache Flinkでの利用
// Flink + Iceberg設定
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Icebergカタログ登録
tableEnv.executeSql("""
CREATE CATALOG iceberg_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'hive',
'uri' = 'thrift://hive-metastore:9083',
'warehouse' = 's3://bucket/warehouse'
)
""");
// ストリーミングソースからIcebergへの書き込み
tableEnv.executeSql("""
CREATE TABLE kafka_events (
event_id BIGINT,
event_type STRING,
user_id BIGINT,
event_data STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
""");
// ストリーミングインジェスト
tableEnv.executeSql("""
INSERT INTO iceberg_catalog.db.events
SELECT * FROM kafka_events
""");
-- Flink SQLクライアントでの操作
-- カタログ作成
CREATE CATALOG iceberg_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'rest',
'uri' = 'http://iceberg-rest:8181',
'warehouse' = 's3://bucket/warehouse'
);
USE CATALOG iceberg_catalog;
-- Upsertモードでのストリーミング
CREATE TABLE target_table (
id BIGINT,
name STRING,
updated_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'format-version' = '2',
'write.upsert.enabled' = 'true'
);
INSERT INTO target_table
SELECT id, name, updated_at
FROM source_stream;
パフォーマンス最適化
コンパクション(Compaction)
小さなファイルを大きなファイルに統合し、クエリパフォーマンスを向上させます。
-- Sparkでのコンパクション
CALL demo.system.rewrite_data_files(
table => 'db.events',
strategy => 'binpack',
options => map(
'target-file-size-bytes', '134217728', -- 128MB
'min-input-files', '5',
'max-concurrent-file-group-rewrites', '10'
)
);
-- ソート戦略でのコンパクション(読み取り最適化)
CALL demo.system.rewrite_data_files(
table => 'db.events',
strategy => 'sort',
sort_order => 'user_id, event_time'
);
-- Z-orderでの最適化(複数次元クエリ向け)
CALL demo.system.rewrite_data_files(
table => 'db.events',
strategy => 'sort',
sort_order => 'zorder(user_id, event_type, event_time)'
);
パーティション設計
-- 低カーディナリティ列でパーティショニング
CREATE TABLE demo.db.orders (
order_id BIGINT,
customer_id BIGINT,
product_id BIGINT,
amount DECIMAL(10,2),
order_time TIMESTAMP,
region STRING
)
USING iceberg
PARTITIONED BY (
days(order_time), -- 日単位パーティション
region -- リージョン別
);
-- パーティション進化(既存データに影響なし)
ALTER TABLE demo.db.orders
ADD PARTITION FIELD bucket(8, customer_id);
メタデータ最適化
-- 古いスナップショットの期限切れ
CALL demo.system.expire_snapshots(
table => 'db.events',
older_than => TIMESTAMP '2024-12-01 00:00:00',
retain_last => 10
);
-- 孤立ファイルの削除
CALL demo.system.remove_orphan_files(
table => 'db.events',
older_than => TIMESTAMP '2024-12-01 00:00:00'
);
-- マニフェストの書き換え
CALL demo.system.rewrite_manifests(
table => 'db.events'
);
ストリーミングワークロード最適化
# ストリーミング時の推奨設定
spark.conf.set("spark.sql.iceberg.handle-timestamp-without-timezone", "true")
spark.conf.set("spark.sql.iceberg.check-ordering", "false")
# コールドパーティションのみコンパクション
from datetime import datetime, timedelta
cold_partition_filter = f"event_time < '{(datetime.now() - timedelta(hours=1)).isoformat()}'"
spark.sql(f"""
CALL demo.system.rewrite_data_files(
table => 'db.events',
where => "{cold_partition_filter}"
)
""")
2025年の動向
Iceberg V4の展望
コミュニティはV4仕様の準備を開始しています:
- Single-File Commits: メタデータ変更を単一ファイルに統合
- Native Index Support: インデックスの正式サポート
- Formal Caching Model: キャッシュモデルの標準化
- JDK 17ベースライン: 最小要件の引き上げ
Apache Polaris
新しいインキュベーションプロジェクトとして、Icebergテーブルのカタログ・ガバナンス層を提供:
# Polaris設定例
polaris:
catalog:
name: my_catalog
storage:
type: s3
bucket: my-iceberg-warehouse
region: ap-northeast-1
auth:
type: oauth2
issuer: https://auth.example.com
governance:
column-level-access: enabled
row-level-security: enabled
AWS re:Invent 2025発表
- AWS GlueのIceberg V3サポート
- Icebergベースのマテリアライズドビュー
- Lake Formation認可との統合強化
- DuckDBとの統合
ストリーミング統合の進化
Apache KafkaとFlinkとの統合が進み、リアルタイムとバッチ分析の統一が実現:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Kafka │───▶│ Flink │───▶│ Iceberg │
│ (Source) │ │ (Process) │ │ (Sink) │
└─────────────┘ └─────────────┘ └─────────────┘
│
┌────────────────────────┼────────────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Spark │ │ Trino │ │ Snowflake │
│ (Batch) │ │ (Query) │ │ (Analyze) │
└─────────────┘ └─────────────┘ └─────────────┘
ベストプラクティス
テーブル設計
- パーティション戦略: 低カーディナリティ列を選択(月/日単位の時間、リージョン等)
- ファイルサイズ: 128MB〜256MBを目標に
- メタデータ管理: 定期的なスナップショット期限切れを設定
運用
# 推奨コンパクションスケジュール
# - ストリーミング: 1時間ごと(コールドパーティションのみ)
# - バッチ: 日次(オフピーク時間)
# - スナップショット期限切れ: 週次
# モニタリング指標
# - ファイル数/パーティション
# - 平均ファイルサイズ
# - メタデータファイルサイズ
# - クエリスキャン効率
まとめ
Apache Icebergは2025年、データレイクハウスの事実上の標準として確立されました。V3仕様の革新的機能、主要ベンダーの包括的サポート、そして活発なコミュニティにより、モダンデータ基盤構築の最適解となっています。
マルチエンジン互換性、柔軟なスキーマ進化、強力なパフォーマンス最適化機能を活用し、スケーラブルで将来性のあるデータアーキテクチャを構築しましょう。