データオブザーバビリティ 2025 - データ品質の可視化と監視

2026.01.12

データオブザーバビリティとは

データオブザーバビリティは、データパイプライン全体の健全性を可視化・監視するプラクティスです。2025年、データ駆動型組織においてデータオブザーバビリティは必須のインフラとして認識されています。

従来のデータ品質管理が「テストを書いて問題を検出する」リアクティブなアプローチだったのに対し、データオブザーバビリティはプロアクティブにデータの異常を検知し、問題が発生する前に対処できます。

データオブザーバビリティの5本柱

1. 鮮度(Freshness)

データが期待通りの頻度で更新されているかを監視します。

# 鮮度監視の実装例
from datetime import datetime, timedelta
from dataclasses import dataclass

@dataclass
class FreshnessCheck:
    table_name: str
    expected_update_interval: timedelta
    last_updated: datetime

    def is_stale(self) -> bool:
        """データが古くなっているかチェック"""
        age = datetime.now() - self.last_updated
        return age > self.expected_update_interval

    def staleness_score(self) -> float:
        """古さのスコアを計算(1.0 = 正常、2.0 = 2倍遅延)"""
        age = datetime.now() - self.last_updated
        return age / self.expected_update_interval

# 使用例
orders_freshness = FreshnessCheck(
    table_name="orders",
    expected_update_interval=timedelta(hours=1),
    last_updated=datetime.now() - timedelta(hours=3)
)

if orders_freshness.is_stale():
    alert(f"テーブル {orders_freshness.table_name} が古くなっています")

2. ボリューム(Volume)

データ量の変動を監視し、異常な増減を検知します。

# ボリューム監視
class VolumeMonitor:
    def __init__(self, table_name: str, lookback_days: int = 30):
        self.table_name = table_name
        self.lookback_days = lookback_days
        self.historical_counts = []

    def add_daily_count(self, count: int):
        """日次カウントを追加"""
        self.historical_counts.append(count)
        if len(self.historical_counts) > self.lookback_days:
            self.historical_counts.pop(0)

    def detect_anomaly(self, current_count: int, threshold: float = 2.0) -> bool:
        """Zスコアベースの異常検知"""
        if len(self.historical_counts) < 7:
            return False

        mean = sum(self.historical_counts) / len(self.historical_counts)
        variance = sum((x - mean) ** 2 for x in self.historical_counts) / len(self.historical_counts)
        std_dev = variance ** 0.5

        if std_dev == 0:
            return current_count != mean

        z_score = abs(current_count - mean) / std_dev
        return z_score > threshold

3. スキーマ(Schema)

テーブル構造の変更を追跡し、破壊的変更を検知します。

# スキーマ変更検知の設定例
schema_monitoring:
  tables:
    - name: customers
      track:
        - column_additions
        - column_deletions
        - type_changes
        - nullable_changes
      alerts:
        breaking_changes: critical
        additive_changes: info

  notifications:
    slack_channel: "#data-alerts"
    pagerduty: true
# スキーマ変更検知の実装
from typing import Dict, List, Set

@dataclass
class SchemaChange:
    change_type: str  # added, removed, modified
    column_name: str
    old_type: str = None
    new_type: str = None
    is_breaking: bool = False

class SchemaMonitor:
    def compare_schemas(
        self,
        old_schema: Dict[str, str],
        new_schema: Dict[str, str]
    ) -> List[SchemaChange]:
        """スキーマの差分を検出"""
        changes = []

        old_cols = set(old_schema.keys())
        new_cols = set(new_schema.keys())

        # 削除されたカラム(破壊的変更)
        for col in old_cols - new_cols:
            changes.append(SchemaChange(
                change_type="removed",
                column_name=col,
                old_type=old_schema[col],
                is_breaking=True
            ))

        # 追加されたカラム
        for col in new_cols - old_cols:
            changes.append(SchemaChange(
                change_type="added",
                column_name=col,
                new_type=new_schema[col]
            ))

        # 型変更
        for col in old_cols & new_cols:
            if old_schema[col] != new_schema[col]:
                changes.append(SchemaChange(
                    change_type="modified",
                    column_name=col,
                    old_type=old_schema[col],
                    new_type=new_schema[col],
                    is_breaking=True
                ))

        return changes

4. 分布(Distribution)

データ値の分布パターンを監視し、統計的な異常を検知します。

# 分布監視
import numpy as np
from scipy import stats

class DistributionMonitor:
    def __init__(self, column_name: str):
        self.column_name = column_name
        self.baseline_stats = None

    def calculate_stats(self, data: np.ndarray) -> dict:
        """基本統計量を計算"""
        return {
            "mean": np.mean(data),
            "std": np.std(data),
            "median": np.median(data),
            "percentile_25": np.percentile(data, 25),
            "percentile_75": np.percentile(data, 75),
            "null_ratio": np.isnan(data).sum() / len(data),
            "unique_ratio": len(np.unique(data)) / len(data)
        }

    def set_baseline(self, data: np.ndarray):
        """ベースラインを設定"""
        self.baseline_stats = self.calculate_stats(data)

    def detect_drift(self, current_data: np.ndarray, threshold: float = 0.05) -> dict:
        """分布のドリフトを検知(KSテスト使用)"""
        current_stats = self.calculate_stats(current_data)

        # Kolmogorov-Smirnov検定
        ks_stat, p_value = stats.ks_2samp(
            self.baseline_data,
            current_data
        )

        return {
            "is_drifted": p_value < threshold,
            "ks_statistic": ks_stat,
            "p_value": p_value,
            "current_stats": current_stats,
            "baseline_stats": self.baseline_stats
        }

5. リネージ(Lineage)

データの系譜を追跡し、依存関係を可視化します。

# データリネージの実装
from typing import Set, Dict, List
from dataclasses import dataclass, field

@dataclass
class DataAsset:
    name: str
    asset_type: str  # table, view, dashboard
    upstream: Set[str] = field(default_factory=set)
    downstream: Set[str] = field(default_factory=set)

class LineageGraph:
    def __init__(self):
        self.assets: Dict[str, DataAsset] = {}

    def add_edge(self, source: str, target: str):
        """依存関係を追加"""
        if source not in self.assets:
            self.assets[source] = DataAsset(name=source, asset_type="unknown")
        if target not in self.assets:
            self.assets[target] = DataAsset(name=target, asset_type="unknown")

        self.assets[source].downstream.add(target)
        self.assets[target].upstream.add(source)

    def get_impact_radius(self, asset_name: str) -> Set[str]:
        """影響範囲を取得(下流の全アセット)"""
        impacted = set()
        to_visit = [asset_name]

        while to_visit:
            current = to_visit.pop()
            if current in self.assets:
                for downstream in self.assets[current].downstream:
                    if downstream not in impacted:
                        impacted.add(downstream)
                        to_visit.append(downstream)

        return impacted

    def get_root_cause_candidates(self, asset_name: str) -> Set[str]:
        """根本原因の候補を取得(上流の全アセット)"""
        candidates = set()
        to_visit = [asset_name]

        while to_visit:
            current = to_visit.pop()
            if current in self.assets:
                for upstream in self.assets[current].upstream:
                    if upstream not in candidates:
                        candidates.add(upstream)
                        to_visit.append(upstream)

        return candidates

主要ツール比較

2025年のデータオブザーバビリティツール

┌─────────────────────────────────────────────────────────────┐
│              ツール比較マトリクス                            │
├──────────────┬──────────┬──────────┬──────────┬────────────┤
│   ツール      │ 異常検知  │ リネージ │ dbt統合  │ 価格帯     │
├──────────────┼──────────┼──────────┼──────────┼────────────┤
│ Monte Carlo  │ ★★★★★  │ ★★★★★ │ ★★★★★ │ Enterprise │
│ Soda         │ ★★★★☆  │ ★★★☆☆ │ ★★★★★ │ OSS/有料   │
│ Great Expect │ ★★★★☆  │ ★★☆☆☆ │ ★★★★☆ │ OSS        │
│ Bigeye       │ ★★★★★  │ ★★★★☆ │ ★★★★☆ │ Enterprise │
│ Atlan        │ ★★★☆☆  │ ★★★★★ │ ★★★★☆ │ Enterprise │
│ Elementary   │ ★★★★☆  │ ★★★★☆ │ ★★★★★ │ OSS/有料   │
└──────────────┴──────────┴──────────┴──────────┴────────────┘

Monte Carlo

エンタープライズ向けの包括的なソリューションです。

# Monte Carlo SDK使用例
from monte_carlo_client import MonteCarloClient

mc = MonteCarloClient(api_key="your-api-key")

# カスタムモニター作成
monitor = mc.create_monitor(
    monitor_type="custom_sql",
    name="daily_revenue_check",
    sql="""
        SELECT
            COUNT(*) as row_count,
            SUM(amount) as total_revenue
        FROM orders
        WHERE order_date = CURRENT_DATE - 1
    """,
    schedule="0 8 * * *",  # 毎日8時
    thresholds={
        "row_count": {"min": 1000, "max": 100000},
        "total_revenue": {"min": 10000}
    }
)

# インシデント取得
incidents = mc.get_incidents(
    status="open",
    severity=["high", "critical"],
    days=7
)

for incident in incidents:
    print(f"[{incident.severity}] {incident.table}: {incident.description}")

Soda

SQLベースのデータ品質チェックが特徴です。

# soda/checks/orders.yml
checks for orders:
  # 鮮度チェック
  - freshness(created_at) < 1h

  # ボリュームチェック
  - row_count > 0
  - row_count change between -20% and 50%

  # Nullチェック
  - missing_count(order_id) = 0
  - missing_count(customer_id) = 0

  # 値の検証
  - invalid_count(status) = 0:
      valid: ["pending", "confirmed", "shipped", "delivered", "cancelled"]

  # 重複チェック
  - duplicate_count(order_id) = 0

  # カスタムSQL
  - revenue_sum > 0:
      revenue_sum query: |
        SELECT SUM(amount) FROM orders
        WHERE order_date = CURRENT_DATE - 1

  # 分布チェック
  - avg(amount) between 50 and 500
  - stddev(amount) < 1000
# Sodaの実行
soda scan -d warehouse -c soda/configuration.yml soda/checks/orders.yml

Great Expectations

Pythonネイティブのデータ品質フレームワークです。

# Great Expectations設定
import great_expectations as gx

context = gx.get_context()

# データソース接続
datasource = context.sources.add_snowflake(
    name="snowflake_warehouse",
    connection_string="snowflake://user:pass@account/db/schema"
)

# Expectation Suite作成
suite = context.add_expectation_suite("orders_suite")

# 期待値の定義
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="amount",
        min_value=0,
        max_value=1000000
    )
)

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeInSet(
        column="status",
        value_set=["pending", "confirmed", "shipped", "delivered", "cancelled"]
    )
)

suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(
        min_value=1000,
        max_value=1000000
    )
)

# チェックポイント作成
checkpoint = context.add_checkpoint(
    name="orders_daily_check",
    validations=[
        {
            "batch_request": {
                "datasource_name": "snowflake_warehouse",
                "data_asset_name": "orders"
            },
            "expectation_suite_name": "orders_suite"
        }
    ]
)

# 実行
result = checkpoint.run()

Elementary(dbtネイティブ)

dbtプロジェクトに統合されたオブザーバビリティツールです。

# models/schema.yml
version: 2

models:
  - name: orders
    description: "注文データ"
    config:
      elementary:
        timestamp_column: created_at

    columns:
      - name: order_id
        tests:
          - unique
          - not_null
          - elementary.volume_anomalies:
              sensitivity: 3

      - name: amount
        tests:
          - elementary.column_anomalies:
              column_anomalies:
                - zero_count
                - zero_percent
                - average
                - standard_deviation
              sensitivity: 3

      - name: status
        tests:
          - accepted_values:
              values: ["pending", "confirmed", "shipped", "delivered", "cancelled"]
          - elementary.dimension_anomalies:
              dimensions:
                - status
              timestamp_column: created_at

    tests:
      - elementary.freshness_anomalies:
          timestamp_column: created_at
          sensitivity: 3

      - elementary.all_columns_anomalies:
          sensitivity: 3

dbt統合パターン

dbt + データオブザーバビリティの統合アーキテクチャ

┌─────────────────────────────────────────────────────────────┐
│                dbt + オブザーバビリティ統合                  │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐    │
│  │   dbt run   │───→│ dbt test    │───→│ Observability│   │
│  │             │    │             │    │   Checks     │   │
│  └─────────────┘    └─────────────┘    └──────┬──────┘   │
│                                               │           │
│                                               ▼           │
│                                        ┌─────────────┐    │
│                                        │   Alerting  │    │
│                                        │   System    │    │
│                                        └──────┬──────┘   │
│                                               │           │
│                    ┌──────────────────────────┼───────┐  │
│                    ▼              ▼           ▼        │  │
│              ┌─────────┐   ┌─────────┐  ┌─────────┐   │  │
│              │  Slack  │   │PagerDuty│  │Dashboard│   │  │
│              └─────────┘   └─────────┘  └─────────┘   │  │
│                                                        │  │
└────────────────────────────────────────────────────────┘  │

dbt post-hookによる品質チェック

-- macros/quality_check.sql
{% macro run_quality_checks(model_name) %}
  {% set checks_query %}
    INSERT INTO analytics.quality_checks (
      model_name,
      check_timestamp,
      row_count,
      null_count,
      duplicate_count
    )
    SELECT
      '{{ model_name }}' as model_name,
      CURRENT_TIMESTAMP as check_timestamp,
      COUNT(*) as row_count,
      SUM(CASE WHEN id IS NULL THEN 1 ELSE 0 END) as null_count,
      COUNT(*) - COUNT(DISTINCT id) as duplicate_count
    FROM {{ ref(model_name) }}
  {% endset %}

  {% do run_query(checks_query) %}
{% endmacro %}
-- models/marts/orders_mart.sql
{{
  config(
    materialized='table',
    post_hook="{{ run_quality_checks(this.name) }}"
  )
}}

SELECT * FROM {{ ref('stg_orders') }}

dbt exposuresとの連携

# models/exposures.yml
version: 2

exposures:
  - name: revenue_dashboard
    type: dashboard
    maturity: high
    url: https://bi.company.com/dashboards/revenue
    description: "経営層向け収益ダッシュボード"
    owner:
      name: Data Team
      email: data@company.com

    depends_on:
      - ref('orders_mart')
      - ref('customers_mart')

    meta:
      observability:
        freshness_sla: "1 hour"
        importance: critical
        alert_channel: "#exec-data-alerts"

異常検知パターン

統計的異常検知

# 異常検知エンジン
from dataclasses import dataclass
from typing import List, Optional
from enum import Enum
import numpy as np

class AnomalyType(Enum):
    SPIKE = "spike"
    DROP = "drop"
    TREND_CHANGE = "trend_change"
    DISTRIBUTION_SHIFT = "distribution_shift"

@dataclass
class Anomaly:
    timestamp: str
    metric_name: str
    expected_value: float
    actual_value: float
    anomaly_type: AnomalyType
    severity: str
    confidence: float

class AnomalyDetector:
    def __init__(self, sensitivity: float = 3.0):
        self.sensitivity = sensitivity

    def detect_volume_anomaly(
        self,
        historical: List[int],
        current: int
    ) -> Optional[Anomaly]:
        """ボリューム異常検知"""
        if len(historical) < 7:
            return None

        mean = np.mean(historical)
        std = np.std(historical)

        if std == 0:
            return None

        z_score = (current - mean) / std

        if abs(z_score) > self.sensitivity:
            anomaly_type = AnomalyType.SPIKE if z_score > 0 else AnomalyType.DROP
            severity = "critical" if abs(z_score) > self.sensitivity * 1.5 else "warning"

            return Anomaly(
                timestamp=datetime.now().isoformat(),
                metric_name="row_count",
                expected_value=mean,
                actual_value=current,
                anomaly_type=anomaly_type,
                severity=severity,
                confidence=1 - (1 / abs(z_score))
            )

        return None

    def detect_freshness_anomaly(
        self,
        expected_interval_minutes: int,
        actual_delay_minutes: int
    ) -> Optional[Anomaly]:
        """鮮度異常検知"""
        delay_ratio = actual_delay_minutes / expected_interval_minutes

        if delay_ratio > self.sensitivity:
            severity = "critical" if delay_ratio > self.sensitivity * 2 else "warning"

            return Anomaly(
                timestamp=datetime.now().isoformat(),
                metric_name="freshness",
                expected_value=expected_interval_minutes,
                actual_value=actual_delay_minutes,
                anomaly_type=AnomalyType.DROP,
                severity=severity,
                confidence=min(delay_ratio / 10, 1.0)
            )

        return None

機械学習ベースの異常検知

# MLベースの異常検知
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import pandas as pd

class MLAnomalyDetector:
    def __init__(self, contamination: float = 0.05):
        self.model = IsolationForest(
            contamination=contamination,
            random_state=42
        )
        self.scaler = StandardScaler()
        self.is_fitted = False

    def fit(self, historical_data: pd.DataFrame):
        """履歴データでモデルを学習"""
        features = self._extract_features(historical_data)
        scaled_features = self.scaler.fit_transform(features)
        self.model.fit(scaled_features)
        self.is_fitted = True

    def detect(self, current_data: pd.DataFrame) -> List[dict]:
        """異常を検知"""
        if not self.is_fitted:
            raise ValueError("モデルが学習されていません")

        features = self._extract_features(current_data)
        scaled_features = self.scaler.transform(features)

        predictions = self.model.predict(scaled_features)
        scores = self.model.decision_function(scaled_features)

        anomalies = []
        for i, (pred, score) in enumerate(zip(predictions, scores)):
            if pred == -1:  # 異常
                anomalies.append({
                    "index": i,
                    "anomaly_score": -score,
                    "confidence": min(abs(score) * 2, 1.0)
                })

        return anomalies

    def _extract_features(self, df: pd.DataFrame) -> np.ndarray:
        """特徴量抽出"""
        return df[["row_count", "null_ratio", "distinct_count", "avg_value"]].values

データリネージの実装

リネージグラフの構築

# リネージ管理システム
from typing import Dict, List, Set
import json

class LineageManager:
    def __init__(self):
        self.graph = {}
        self.metadata = {}

    def register_asset(
        self,
        asset_id: str,
        asset_type: str,
        metadata: dict
    ):
        """アセットを登録"""
        self.graph[asset_id] = {
            "upstream": set(),
            "downstream": set()
        }
        self.metadata[asset_id] = {
            "type": asset_type,
            **metadata
        }

    def add_dependency(self, source: str, target: str):
        """依存関係を追加"""
        if source in self.graph and target in self.graph:
            self.graph[source]["downstream"].add(target)
            self.graph[target]["upstream"].add(source)

    def get_upstream(self, asset_id: str, depth: int = -1) -> Set[str]:
        """上流アセットを取得"""
        visited = set()
        to_visit = [(asset_id, 0)]

        while to_visit:
            current, current_depth = to_visit.pop(0)
            if current in visited:
                continue
            if depth != -1 and current_depth > depth:
                continue

            visited.add(current)

            if current in self.graph:
                for upstream in self.graph[current]["upstream"]:
                    to_visit.append((upstream, current_depth + 1))

        visited.discard(asset_id)
        return visited

    def get_downstream(self, asset_id: str, depth: int = -1) -> Set[str]:
        """下流アセットを取得"""
        visited = set()
        to_visit = [(asset_id, 0)]

        while to_visit:
            current, current_depth = to_visit.pop(0)
            if current in visited:
                continue
            if depth != -1 and current_depth > depth:
                continue

            visited.add(current)

            if current in self.graph:
                for downstream in self.graph[current]["downstream"]:
                    to_visit.append((downstream, current_depth + 1))

        visited.discard(asset_id)
        return visited

    def impact_analysis(self, asset_id: str) -> dict:
        """影響分析"""
        downstream = self.get_downstream(asset_id)

        impact = {
            "tables": [],
            "dashboards": [],
            "ml_models": []
        }

        for asset in downstream:
            asset_type = self.metadata.get(asset, {}).get("type", "unknown")
            if asset_type == "table":
                impact["tables"].append(asset)
            elif asset_type == "dashboard":
                impact["dashboards"].append(asset)
            elif asset_type == "ml_model":
                impact["ml_models"].append(asset)

        return impact

OpenLineage統合

# OpenLineage統合
from openlineage.client import OpenLineageClient
from openlineage.client.run import Run, RunEvent, RunState
from openlineage.client.facet import SqlJobFacet

client = OpenLineageClient(url="http://marquez:5000")

# ジョブ実行の記録
def record_job_execution(
    job_name: str,
    inputs: List[str],
    outputs: List[str],
    sql: str
):
    run = Run(runId=str(uuid.uuid4()))

    # 開始イベント
    start_event = RunEvent(
        eventType=RunState.START,
        eventTime=datetime.now().isoformat(),
        run=run,
        job={"namespace": "data_platform", "name": job_name},
        inputs=[{"namespace": "warehouse", "name": inp} for inp in inputs],
        outputs=[{"namespace": "warehouse", "name": out} for out in outputs],
        facets={"sql": SqlJobFacet(query=sql)}
    )
    client.emit(start_event)

    # ... ジョブ実行 ...

    # 完了イベント
    complete_event = RunEvent(
        eventType=RunState.COMPLETE,
        eventTime=datetime.now().isoformat(),
        run=run,
        job={"namespace": "data_platform", "name": job_name}
    )
    client.emit(complete_event)

2025年の動向

トレンド1: AI/ML駆動の異常検知

従来:
・静的な閾値ベースのアラート
・ルールベースの検出
・高い誤検知率

2025年:
・機械学習による自動閾値調整
・季節性・トレンドの自動学習
・コンテキスト認識型アラート
・自己学習による精度向上

トレンド2: セマンティックレイヤーとの統合

# セマンティックレイヤー統合
semantic_layer:
  metrics:
    - name: monthly_recurring_revenue
      description: "月次経常収益"
      calculation: SUM(subscription_amount)
      observability:
        freshness: "< 1 hour"
        quality_threshold: 99.5%
        alert_on_drop: 10%

  dimensions:
    - name: customer_segment
      allowed_values: ["enterprise", "mid_market", "smb"]
      observability:
        track_new_values: true
        alert_on_null: true

トレンド3: DataOps統合

# GitHub Actions + データオブザーバビリティ
name: Data Quality CI

on:
  push:
    paths:
      - 'dbt/**'

jobs:
  data-quality:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Setup dbt
        uses: dbt-labs/dbt-setup@v1

      - name: Run dbt
        run: dbt run --target ci

      - name: Run data tests
        run: dbt test --target ci

      - name: Run Soda checks
        uses: sodadata/soda-action@v1
        with:
          soda_library_version: 1.5
          configuration: soda/configuration.yml
          checks: soda/checks/*.yml

      - name: Upload quality report
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: quality-report
          path: target/quality_report.html

トレンド4: データコントラクト

# データコントラクト定義
apiVersion: datacontract.com/v1
kind: DataContract
metadata:
  name: orders-contract
  version: "2.0"

schema:
  type: object
  properties:
    order_id:
      type: string
      description: "注文ID"
      required: true
      unique: true
    customer_id:
      type: string
      description: "顧客ID"
      required: true
    amount:
      type: number
      description: "注文金額"
      minimum: 0
      maximum: 10000000
    status:
      type: string
      enum: ["pending", "confirmed", "shipped", "delivered", "cancelled"]

quality:
  freshness:
    max_age: "1 hour"
  completeness:
    threshold: 99.5%
  accuracy:
    null_ratio:
      order_id: 0%
      customer_id: 0%
      amount: "< 1%"

sla:
  availability: 99.9%
  update_frequency: "hourly"
  support_response: "4 hours"

導入ベストプラクティス

段階的導入アプローチ

Phase 1: 基礎構築(1-2ヶ月)
├── クリティカルテーブルの特定
├── 基本的な鮮度・ボリュームチェック
└── アラート基盤の構築

Phase 2: 拡張(2-3ヶ月)
├── スキーマ変更検知
├── 分布チェック
└── リネージの可視化

Phase 3: 高度化(3-6ヶ月)
├── ML異常検知
├── 自動修復
└── データコントラクト

アラート設計

# アラート階層設計
class AlertLevel(Enum):
    INFO = "info"           # 通知のみ
    WARNING = "warning"     # 調査推奨
    CRITICAL = "critical"   # 即時対応
    EMERGENCY = "emergency" # エスカレーション

alert_routing = {
    AlertLevel.INFO: ["slack:#data-info"],
    AlertLevel.WARNING: ["slack:#data-alerts", "email:data-team"],
    AlertLevel.CRITICAL: ["slack:#data-alerts", "pagerduty:data-oncall"],
    AlertLevel.EMERGENCY: ["pagerduty:data-oncall", "phone:data-lead"]
}

# アラート疲れ対策
alert_config = {
    "deduplication_window": "1 hour",
    "max_alerts_per_hour": 10,
    "grouping": ["table_name", "check_type"],
    "auto_resolve": True
}

まとめ

2025年のデータオブザーバビリティは、データ品質の保証からビジネス価値の保護へと進化しました。5本柱(鮮度、ボリューム、スキーマ、分布、リネージ)を監視することで、データパイプラインの信頼性を大幅に向上できます。

Monte Carlo、Soda、Great Expectations、Elementaryなどのツールを活用し、dbtワークフローに統合することで、プロアクティブなデータ品質管理が実現します。AI/ML駆動の異常検知、データコントラクト、DataOpsとの統合が2025年の主要トレンドです。

参考: Monte Carlo - Data Observability, Soda - Data Quality, Great Expectations

この技術を体系的に学びたいですか?

未来学では東証プライム上場企業のITエンジニアが24時間サポート。月額24,800円から、退会金0円のオンラインIT塾です。

LINEで無料相談する
← 一覧に戻る