時系列データベース 2025 - IoT・監視データの最適解

2026.01.12

時系列データベースの重要性

2025年、IoTデバイスの爆発的増加、クラウドネイティブアプリケーションの監視需要、金融取引のリアルタイム分析など、時系列データの処理はあらゆる産業で不可欠となりました。従来のRDBMSでは対応困難な大規模時系列データを効率的に処理するため、専用の時系列データベース(TSDB)が急速に普及しています。

時系列データの特性と課題

データ特性

graph TB
    subgraph 時系列データの特性
        subgraph 特徴
            A1[タイムスタンプ必須]
            A2[追記中心 Write-heavy]
            A3[時間順序でのアクセス]
            A4[古いデータの価値低下]
            A5[集計・ダウンサンプリング需要]
        end
        subgraph 課題
            B1[高スループット書き込み]
            B2[効率的な圧縮]
            B3[時間範囲クエリの最適化]
            B4[データ保持ポリシー管理]
        end
    end

従来RDBMSとの比較

項目従来RDBMS時系列DB
書き込み性能数千/秒数百万/秒
圧縮率1-2x10-50x
時間範囲クエリインデックス依存ネイティブ最適化
データ保持手動管理自動ポリシー
集計関数汎用時系列特化

主要製品比較

2025年の市場動向

製品特徴適用領域
TimescaleDBPostgreSQL拡張、SQL完全互換エンタープライズ、分析
InfluxDB専用設計、Flux言語監視、IoT
QuestDB超高速、SQL対応金融、リアルタイム
ClickHouseOLAP最適化、列指向大規模分析

TimescaleDB

PostgreSQLの拡張として動作し、既存のSQL知識をそのまま活用できます。

-- TimescaleDB: ハイパーテーブル作成
CREATE TABLE sensor_data (
    time        TIMESTAMPTZ NOT NULL,
    device_id   TEXT NOT NULL,
    temperature DOUBLE PRECISION,
    humidity    DOUBLE PRECISION,
    pressure    DOUBLE PRECISION
);

-- ハイパーテーブルに変換(自動パーティショニング)
SELECT create_hypertable('sensor_data', 'time');

-- 時間ベースのパーティション設定(7日間隔)
SELECT set_chunk_time_interval('sensor_data', INTERVAL '7 days');

-- 空間パーティショニングも追加(デバイスID別)
SELECT add_dimension('sensor_data', 'device_id', number_partitions => 4);
-- データ挿入
INSERT INTO sensor_data (time, device_id, temperature, humidity, pressure)
VALUES
    (NOW(), 'sensor-001', 23.5, 65.2, 1013.25),
    (NOW(), 'sensor-002', 24.1, 62.8, 1012.80),
    (NOW(), 'sensor-003', 22.8, 68.5, 1014.10);

-- 時間バケットによる集計(1時間ごと)
SELECT
    time_bucket('1 hour', time) AS hour,
    device_id,
    AVG(temperature) AS avg_temp,
    MAX(temperature) AS max_temp,
    MIN(temperature) AS min_temp,
    COUNT(*) AS readings
FROM sensor_data
WHERE time > NOW() - INTERVAL '24 hours'
GROUP BY hour, device_id
ORDER BY hour DESC, device_id;

継続的集計(Continuous Aggregates)

-- 継続的集計ビューの作成
CREATE MATERIALIZED VIEW hourly_device_stats
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', time) AS bucket,
    device_id,
    AVG(temperature) AS avg_temperature,
    AVG(humidity) AS avg_humidity,
    AVG(pressure) AS avg_pressure,
    COUNT(*) AS sample_count
FROM sensor_data
GROUP BY bucket, device_id;

-- リフレッシュポリシー設定(1時間ごと、2時間遅延)
SELECT add_continuous_aggregate_policy('hourly_device_stats',
    start_offset => INTERVAL '3 hours',
    end_offset => INTERVAL '1 hour',
    schedule_interval => INTERVAL '1 hour'
);

-- 集計データのクエリ(高速)
SELECT * FROM hourly_device_stats
WHERE bucket > NOW() - INTERVAL '7 days'
ORDER BY bucket DESC;

InfluxDB

専用設計の時系列データベースで、独自のFlux言語による強力なデータ処理が特徴です。

// InfluxDB Line Protocol でのデータ書き込み
// 形式: measurement,tag=value field=value timestamp

sensor_data,device_id=sensor-001,location=tokyo temperature=23.5,humidity=65.2 1704067200000000000
sensor_data,device_id=sensor-002,location=osaka temperature=24.1,humidity=62.8 1704067200000000000
sensor_data,device_id=sensor-003,location=nagoya temperature=22.8,humidity=68.5 1704067200000000000
// Flux言語によるクエリ例
from(bucket: "iot_metrics")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "sensor_data")
  |> filter(fn: (r) => r._field == "temperature")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
  |> yield(name: "hourly_avg")

// 異常検知: 標準偏差を超えるデータポイント
from(bucket: "iot_metrics")
  |> range(start: -7d)
  |> filter(fn: (r) => r._measurement == "sensor_data")
  |> filter(fn: (r) => r._field == "temperature")
  |> movingAverage(n: 24)
  |> map(fn: (r) => ({
      r with
      anomaly: if r._value > 30.0 or r._value < 15.0
               then true
               else false
    }))
  |> filter(fn: (r) => r.anomaly == true)
# InfluxDB Python クライアント
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime

# クライアント初期化
client = InfluxDBClient(
    url="http://localhost:8086",
    token="your-token",
    org="your-org"
)

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

# データ書き込み
point = Point("sensor_data") \
    .tag("device_id", "sensor-001") \
    .tag("location", "tokyo") \
    .field("temperature", 23.5) \
    .field("humidity", 65.2) \
    .time(datetime.utcnow())

write_api.write(bucket="iot_metrics", record=point)

# バッチ書き込み(高性能)
points = [
    Point("sensor_data")
        .tag("device_id", f"sensor-{i:03d}")
        .field("temperature", 20 + (i % 10))
        .field("humidity", 50 + (i % 30))
    for i in range(1000)
]
write_api.write(bucket="iot_metrics", record=points)

# Fluxクエリ実行
query = '''
from(bucket: "iot_metrics")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "sensor_data")
  |> mean()
'''

result = query_api.query(query)
for table in result:
    for record in table.records:
        print(f"{record.get_field()}: {record.get_value()}")

QuestDB

超高速なインジェストとSQL互換性を両立した時系列データベースです。金融データやリアルタイム分析に最適です。

-- QuestDB: テーブル作成
CREATE TABLE trades (
    timestamp TIMESTAMP,
    symbol SYMBOL,
    side SYMBOL,
    price DOUBLE,
    quantity DOUBLE
) timestamp(timestamp) PARTITION BY DAY WAL;

-- 高速バルクインサート
INSERT INTO trades
SELECT
    timestamp_sequence('2025-01-01', 1000000L) AS timestamp,
    rnd_symbol('BTC-USD', 'ETH-USD', 'XRP-USD') AS symbol,
    rnd_symbol('buy', 'sell') AS side,
    rnd_double() * 50000 AS price,
    rnd_double() * 100 AS quantity
FROM long_sequence(10000000);

-- SAMPLE BY による高速集計
SELECT
    timestamp,
    symbol,
    first(price) AS open,
    max(price) AS high,
    min(price) AS low,
    last(price) AS close,
    sum(quantity) AS volume
FROM trades
WHERE timestamp > dateadd('d', -7, now())
SAMPLE BY 1h
ALIGN TO CALENDAR;
-- ASOF JOIN: 最近傍結合(金融データで重要)
CREATE TABLE quotes (
    timestamp TIMESTAMP,
    symbol SYMBOL,
    bid DOUBLE,
    ask DOUBLE
) timestamp(timestamp) PARTITION BY DAY;

-- トレードと最新の気配値を結合
SELECT
    t.timestamp,
    t.symbol,
    t.price,
    q.bid,
    q.ask,
    t.price - q.bid AS slippage
FROM trades t
ASOF JOIN quotes q ON (t.symbol = q.symbol);

-- LATEST ON: 各シンボルの最新値
SELECT * FROM trades
LATEST ON timestamp PARTITION BY symbol;

ClickHouse

列指向OLAPデータベースで、大規模な時系列分析に威力を発揮します。

-- ClickHouse: テーブル作成
CREATE TABLE metrics (
    timestamp DateTime64(3),
    metric_name LowCardinality(String),
    host LowCardinality(String),
    datacenter LowCardinality(String),
    value Float64
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (metric_name, host, timestamp)
TTL timestamp + INTERVAL 90 DAY;

-- 集計テーブル(マテリアライズドビュー)
CREATE MATERIALIZED VIEW metrics_hourly
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (metric_name, host, timestamp)
AS SELECT
    toStartOfHour(timestamp) AS timestamp,
    metric_name,
    host,
    count() AS count,
    sum(value) AS sum_value,
    min(value) AS min_value,
    max(value) AS max_value
FROM metrics
GROUP BY timestamp, metric_name, host;
-- 高速分析クエリ
SELECT
    toStartOfInterval(timestamp, INTERVAL 5 MINUTE) AS interval,
    metric_name,
    quantile(0.95)(value) AS p95,
    quantile(0.99)(value) AS p99,
    avg(value) AS avg_value
FROM metrics
WHERE timestamp > now() - INTERVAL 1 HOUR
  AND metric_name IN ('cpu_usage', 'memory_usage', 'disk_io')
GROUP BY interval, metric_name
ORDER BY interval DESC;

-- ウィンドウ関数による異常検知
SELECT
    timestamp,
    host,
    value,
    avg(value) OVER (
        PARTITION BY host
        ORDER BY timestamp
        ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
    ) AS moving_avg,
    value - moving_avg AS deviation
FROM metrics
WHERE metric_name = 'cpu_usage'
  AND timestamp > now() - INTERVAL 1 HOUR
HAVING abs(deviation) > 20;

データ圧縮とパーティショニング

圧縮戦略

compression_strategies:
  timescaledb:
    method: "ネイティブ圧縮"
    compression_ratio: "10-20x"
    config: |
      -- 圧縮ポリシー設定
      ALTER TABLE sensor_data SET (
        timescaledb.compress,
        timescaledb.compress_segmentby = 'device_id'
      );
      -- 7日以上経過したデータを自動圧縮
      SELECT add_compression_policy('sensor_data', INTERVAL '7 days');

  influxdb:
    method: "TSM (Time-Structured Merge Tree)"
    compression_ratio: "10-50x"
    features:
      - "Run-length encoding"
      - "Delta encoding for timestamps"
      - "Snappy/Zstd compression"

  questdb:
    method: "列指向 + SIMD最適化"
    compression_ratio: "5-15x"
    features:
      - "Memory-mapped files"
      - "Zero-copy読み取り"
      - "SIMD命令による並列処理"

  clickhouse:
    method: "LZ4/ZSTD + 特殊コーデック"
    compression_ratio: "10-40x"
    config: |
      -- 列ごとの圧縮コーデック指定
      CREATE TABLE optimized_metrics (
        timestamp DateTime64(3) CODEC(DoubleDelta, LZ4),
        value Float64 CODEC(Gorilla, LZ4),
        host LowCardinality(String) CODEC(ZSTD(3))
      )

パーティショニング設計

-- TimescaleDB: 時間 + 空間パーティション
SELECT create_hypertable(
    'sensor_data',
    'time',
    partitioning_column => 'device_id',
    number_partitions => 8,
    chunk_time_interval => INTERVAL '1 day'
);

-- データ保持ポリシー(90日後に自動削除)
SELECT add_retention_policy('sensor_data', INTERVAL '90 days');

-- 階層化ストレージ(古いデータを安価なストレージへ)
SELECT add_tiered_storage_policy(
    'sensor_data',
    move_after => INTERVAL '30 days',
    tiered_storage_name => 's3_archive'
);

監視・IoTでの活用例

Prometheusメトリクス連携

# Prometheus + TimescaleDB 構成
prometheus:
  remote_write:
    - url: "http://promscale:9201/write"
      queue_config:
        max_samples_per_send: 10000
        batch_send_deadline: 5s

  remote_read:
    - url: "http://promscale:9201/read"
      read_recent: true
-- Prometheusメトリクスのクエリ(Promscale)
SELECT
    time_bucket('5 minutes', time) AS bucket,
    jsonb_extract_path_text(labels, 'instance') AS instance,
    avg(value) AS avg_cpu
FROM prom_data.cpu_usage_seconds_total
WHERE time > NOW() - INTERVAL '1 hour'
  AND jsonb_extract_path_text(labels, 'job') = 'kubernetes'
GROUP BY bucket, instance
ORDER BY bucket DESC;

IoTセンサーデータ収集

# IoTゲートウェイからのデータ収集
import asyncio
from datetime import datetime
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

class IoTDataCollector:
    def __init__(self, influx_url: str, token: str, org: str, bucket: str):
        self.client = InfluxDBClientAsync(
            url=influx_url,
            token=token,
            org=org
        )
        self.bucket = bucket
        self.write_api = self.client.write_api()

    async def collect_sensor_data(self, device_id: str, data: dict):
        """センサーデータを収集して書き込み"""
        point = Point("sensor_reading") \
            .tag("device_id", device_id) \
            .tag("device_type", data.get("type", "unknown")) \
            .tag("location", data.get("location", "unknown")) \
            .field("temperature", data.get("temperature")) \
            .field("humidity", data.get("humidity")) \
            .field("battery_level", data.get("battery")) \
            .time(datetime.utcnow())

        await self.write_api.write(bucket=self.bucket, record=point)

    async def batch_collect(self, readings: list):
        """バッチ収集(高スループット)"""
        points = []
        for reading in readings:
            point = Point("sensor_reading") \
                .tag("device_id", reading["device_id"]) \
                .field("value", reading["value"]) \
                .time(reading.get("timestamp", datetime.utcnow()))
            points.append(point)

        # バッチ書き込み
        await self.write_api.write(bucket=self.bucket, record=points)

# 使用例
async def main():
    collector = IoTDataCollector(
        influx_url="http://localhost:8086",
        token="your-token",
        org="iot-org",
        bucket="sensor_data"
    )

    # 単一データ収集
    await collector.collect_sensor_data("sensor-001", {
        "type": "environmental",
        "location": "building-a",
        "temperature": 23.5,
        "humidity": 65.2,
        "battery": 85
    })

    # バッチ収集(1000センサーからのデータ)
    readings = [
        {"device_id": f"sensor-{i:04d}", "value": 20 + (i % 15)}
        for i in range(1000)
    ]
    await collector.batch_collect(readings)

asyncio.run(main())

Grafanaダッシュボード連携

{
  "dashboard": {
    "title": "IoT Sensor Monitoring",
    "panels": [
      {
        "title": "Temperature Heatmap",
        "type": "heatmap",
        "datasource": "TimescaleDB",
        "targets": [
          {
            "rawSql": "SELECT time_bucket('5 minutes', time) AS time, device_id, AVG(temperature) as value FROM sensor_data WHERE $__timeFilter(time) GROUP BY 1, 2 ORDER BY 1",
            "format": "time_series"
          }
        ]
      },
      {
        "title": "Device Status",
        "type": "stat",
        "datasource": "InfluxDB",
        "targets": [
          {
            "query": "from(bucket: \"iot_metrics\") |> range(start: -5m) |> filter(fn: (r) => r._measurement == \"device_status\") |> last()"
          }
        ]
      }
    ]
  }
}

金融データ処理

-- QuestDB: 高頻度取引データ分析
-- VWAP(出来高加重平均価格)計算
SELECT
    timestamp,
    symbol,
    sum(price * quantity) / sum(quantity) AS vwap,
    sum(quantity) AS total_volume
FROM trades
WHERE timestamp > dateadd('h', -1, now())
SAMPLE BY 1m
ALIGN TO CALENDAR;

-- ボラティリティ計算
SELECT
    symbol,
    stddev(price) AS volatility,
    max(price) - min(price) AS range,
    count(*) AS trade_count
FROM trades
WHERE timestamp > dateadd('d', -1, now())
GROUP BY symbol;

-- オーダーブックスナップショット
SELECT
    timestamp,
    symbol,
    sum(CASE WHEN side = 'bid' THEN quantity ELSE 0 END) AS bid_depth,
    sum(CASE WHEN side = 'ask' THEN quantity ELSE 0 END) AS ask_depth,
    max(CASE WHEN side = 'bid' THEN price END) AS best_bid,
    min(CASE WHEN side = 'ask' THEN price END) AS best_ask
FROM order_book
LATEST ON timestamp PARTITION BY symbol;

2025年の動向

主要トレンド

1. クラウドネイティブ化の加速
   → マネージドサービスの成熟
   → Kubernetes上での運用標準化
   → サーバーレス時系列DBの登場

2. AI/ML統合
   → 組み込み異常検知
   → 予測分析機能
   → 自動最適化(Auto-tuning)

3. エッジコンピューティング対応
   → 軽量版TSDBの普及
   → エッジ-クラウド同期
   → オフライン対応

4. マルチモーダルデータ
   → 時系列 + 空間データ
   → イベントストリームとの統合
   → GraphDB連携

5. コスト最適化
   → 階層化ストレージの標準化
   → 圧縮技術の進化
   → ダウンサンプリング自動化

製品別2025年アップデート

製品2025年の主要更新
TimescaleDBTimescale Cloud強化、AI統合、マルチノード改善
InfluxDBInfluxDB 3.0 (IOx)、Parquet対応、無制限カーディナリティ
QuestDBクラウド版GA、レプリケーション、JIT最適化
ClickHouseClickHouse Cloud成熟、Keeper改善、S3統合強化

選定ガイドライン

selection_criteria:
  timescaledb:
    recommended_for:
      - "既存PostgreSQLスキルの活用"
      - "複雑なJOIN・分析クエリ"
      - "トランザクション要件あり"
    not_recommended_for:
      - "超高頻度書き込み(100万+/秒)"

  influxdb:
    recommended_for:
      - "監視・メトリクス収集"
      - "IoTデータ収集"
      - "Telegraf/Grafanaエコシステム"
    not_recommended_for:
      - "複雑なリレーショナルクエリ"

  questdb:
    recommended_for:
      - "金融・取引データ"
      - "超低レイテンシ要件"
      - "高頻度インジェスト"
    not_recommended_for:
      - "大規模クラスター構成"

  clickhouse:
    recommended_for:
      - "大規模OLAP分析"
      - "ログ・イベント分析"
      - "ペタバイト級データ"
    not_recommended_for:
      - "単純な時系列クエリのみ"

ベストプラクティス

パフォーマンス最適化

-- TimescaleDB: インデックス戦略
-- 時間範囲クエリ用(自動作成)
CREATE INDEX ON sensor_data (time DESC);

-- デバイス別クエリ用
CREATE INDEX ON sensor_data (device_id, time DESC);

-- 複合条件用
CREATE INDEX ON sensor_data (device_id, time DESC)
WHERE temperature > 30;  -- 部分インデックス

-- クエリプラン確認
EXPLAIN ANALYZE
SELECT time_bucket('1 hour', time), avg(temperature)
FROM sensor_data
WHERE time > NOW() - INTERVAL '7 days'
  AND device_id = 'sensor-001'
GROUP BY 1;

運用監視

-- TimescaleDB: チャンク情報確認
SELECT
    hypertable_name,
    chunk_name,
    range_start,
    range_end,
    pg_size_pretty(total_bytes) AS size,
    is_compressed
FROM timescaledb_information.chunks
WHERE hypertable_name = 'sensor_data'
ORDER BY range_start DESC
LIMIT 10;

-- 圧縮状況確認
SELECT
    hypertable_name,
    compressed_heap_size,
    uncompressed_heap_size,
    compressed_heap_size::float / uncompressed_heap_size AS compression_ratio
FROM timescaledb_information.compression_settings;

まとめ

2025年の時系列データベースは、IoT・監視・金融など多様なユースケースに対応する成熟したソリューションとなりました。TimescaleDBのPostgreSQL互換性、InfluxDBの専用設計による使いやすさ、QuestDBの超高速処理、ClickHouseの大規模分析能力など、それぞれの強みを理解し適切に選定することが重要です。データ圧縮・パーティショニング・保持ポリシーを適切に設計することで、大規模な時系列データを効率的に管理できます。クラウドネイティブ化とAI統合が進む中、時系列データベースは今後も進化を続けていくでしょう。

この技術を体系的に学びたいですか?

未来学では東証プライム上場企業のITエンジニアが24時間サポート。月額24,800円から、退会金0円のオンラインIT塾です。

LINEで無料相談する
← 一覧に戻る