バックプレッシャー - 過負荷制御とフロー制御のパターン

中級 | 10分 で読める | 2026.04.24

公式ドキュメント

この記事の要点

バックプレッシャー: プロデューサーがコンシューマーの処理速度に合わせることで過負荷を防ぐ
• キューの飽和やOOMを回避し、システム全体のスループットを最適化
• Reactive Streams、TCP、HTTP/2、Kafka など多くのシステムで採用

バックプレッシャー(Backpressure)は、データストリーム処理において、受信側(コンシューマー)の処理能力に応じて送信側(プロデューサー)の速度を制御する仕組みです。過負荷によるメモリ枯渇やシステムダウンを防ぎ、安定した処理を実現します。本記事では、バックプレッシャーの原理、実装パターン、主要フレームワークでの使い方を体系的に解説します。

概要

バックプレッシャーとは

システムにおいて、プロデューサー(データ送信側)がコンシューマー(データ受信側)より高速な場合、中間のキューが溢れる問題が発生します。バックプレッシャーは、コンシューマーが「もう処理できない」と通知し、プロデューサーに送信を停止・減速させる仕組みです。

sequenceDiagram
    participant P as Producer<br/>(高速)
    participant Q as Queue<br/>(有限)
    participant C as Consumer<br/>(低速)

    P->>Q: Data 1
    P->>Q: Data 2
    P->>Q: Data 3
    Q->>C: Data 1
    Note over C: 処理中...(遅い)
    P->>Q: Data 4
    P->>Q: Data 5
    Note over Q: キュー満杯!

    Q-->>P: Backpressure Signal<br/>(送信停止)
    Note over P: 待機

    C->>Q: Request 1
    Q->>C: Data 2
    Q-->>P: Resume Signal
    P->>Q: Data 6

注意: バックプレッシャーなしで無限にキューイングすると、メモリ枯渇(OOM)でシステムがクラッシュします。

バックプレッシャーが必要な理由

1. メモリ枯渇の防止

キューにデータが蓄積し続けると、ヒープメモリが枯渇します。

2. レイテンシの悪化

キューが深くなると、データが処理されるまでの待ち時間が増えます。

3. カスケード障害

上流の過負荷が下流に伝播し、システム全体がダウンします。

原則・定義

バックプレッシャーの基本戦略

ポイント: バックプレッシャーにはプル型(Pull-based)プッシュ型(Push-based)の2つのモデルがあります。

モデル説明
プル型コンシューマーが明示的に「次のデータをくれ」とリクエストReactive Streams, Iterator
プッシュ型プロデューサーが送信し、コンシューマーが「待て」と通知TCP フロー制御, HTTP/2

Reactive Streams の仕様

Reactive Streams は、非同期ストリーム処理の標準で、以下の4つのインターフェースを定義します。

public interface Publisher<T> {
  void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
  void onSubscribe(Subscription s);
  void onNext(T t);
  void onError(Throwable t);
  void onComplete();
}

public interface Subscription {
  void request(long n); // バックプレッシャーのコア
  void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

コンシューマー(Subscriber)が request(n) で明示的に要求した分だけ、プロデューサー(Publisher)がデータを送信します。

構成要素

バックプレッシャー戦略の種類

戦略説明適用場面
Bufferingキューにバッファし、満杯時に待機一時的なバースト
Dropping新しいデータを破棄リアルタイム監視(古いデータは無価値)
Sampling一部のデータだけ取得メトリクス収集
Conflation最新データのみ保持(古いデータを上書き)UI更新
Blockingプロデューサーをブロック同期処理
Errorエラーを投げてストリームを終了厳格なSLA

実装例

1. Reactive Streams(RxJS)

import { Observable, Subscriber } from "rxjs";

// カスタムプロデューサー(バックプレッシャー対応)
const producer = new Observable<number>((subscriber: Subscriber<number>) => {
  let count = 0;
  let requested = 0;

  // コンシューマーからの request シグナルを受信
  const requestHandler = (n: number) => {
    requested += n;
    while (requested > 0 && count < 100) {
      subscriber.next(count++);
      requested--;
    }
    if (count >= 100) {
      subscriber.complete();
    }
  };

  // Reactive Streams の request を模擬(RxJS はデフォルトでバックプレッシャー非対応)
  // 実際は RxJS v7 では observeOn + bufferTime などで制御
  requestHandler(10); // 初期リクエスト

  return () => console.log("Unsubscribed");
});

producer.subscribe({
  next: (value) => {
    console.log(`Received: ${value}`);
    // 処理が遅い場合のシミュレーション
  },
  complete: () => console.log("Complete"),
});

2. Node.js Streams(Backpressure 対応)

Node.js の Stream は、内部的にバックプレッシャーをサポートします。

import { Readable, Writable } from "stream";

// 高速プロデューサー
const fastProducer = new Readable({
  read() {
    for (let i = 0; i < 100; i++) {
      const canContinue = this.push(`data-${i}\n`);
      if (!canContinue) {
        console.log("Backpressure! Pausing...");
        break; // バックプレッシャー発生
      }
    }
  },
});

// 低速コンシューマー
const slowConsumer = new Writable({
  write(chunk, encoding, callback) {
    console.log(`Processing: ${chunk.toString().trim()}`);
    // 遅延をシミュレート
    setTimeout(callback, 100);
  },
});

fastProducer.pipe(slowConsumer);

slowConsumer.on("drain", () => {
  console.log("Drained! Resuming...");
});

3. Go の Channel(Buffered Channel)

package main

import (
	"fmt"
	"time"
)

func producer(ch chan<- int) {
	for i := 0; i < 100; i++ {
		fmt.Printf("Producing: %d\n", i)
		ch <- i // バッファが満杯ならブロック(バックプレッシャー)
	}
	close(ch)
}

func consumer(ch <-chan int) {
	for value := range ch {
		fmt.Printf("Consuming: %d\n", value)
		time.Sleep(100 * time.Millisecond) // 遅い処理
	}
}

func main() {
	ch := make(chan int, 10) // バッファサイズ10

	go producer(ch)
	consumer(ch)
}

4. Akka Streams(Reactive Streams 実装)

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, OverflowStrategy}

implicit val system = ActorSystem("BackpressureExample")
implicit val materializer = ActorMaterializer()

Source(1 to 100)
  .buffer(10, OverflowStrategy.backpressure) // バッファサイズ10、満杯時は待機
  .throttle(1, 100.milliseconds) // 1秒に10個まで
  .map { x =>
    println(s"Processing: $x")
    x * 2
  }
  .runWith(Sink.ignore)

実践メモ: Akka Streams や RxJava などの Reactive Streams 実装では、バックプレッシャーがデフォルトで組み込まれています。手動制御は不要です。

5. HTTP/2 のフロー制御

HTTP/2 は WINDOW_UPDATE フレームでバックプレッシャーを実現します。

// Node.js http2 サーバー(バックプレッシャー対応)
import http2 from "http2";

const server = http2.createServer();

server.on("stream", (stream, headers) => {
  let count = 0;

  const interval = setInterval(() => {
    const canContinue = stream.write(`data: ${count++}\n`);
    if (!canContinue) {
      console.log("Backpressure! Pausing...");
      clearInterval(interval);
      stream.once("drain", () => {
        console.log("Resumed!");
        // 再開処理
      });
    }

    if (count > 100) {
      clearInterval(interval);
      stream.end();
    }
  }, 10);
});

server.listen(3000);

メリット・デメリット

メリット

  1. 安定性: メモリ枯渇やクラッシュを防ぐ
  2. スループット最適化: システム全体の処理能力に合わせる
  3. レイテンシ削減: キューの深さを制限
  4. カスケード障害防止: 過負荷が伝播しない

デメリット

  1. 実装複雑: 手動実装は難しい
  2. デッドロック: 不適切な実装でデッドロックの可能性
  3. スループット低下: コンシューマーが遅いとプロデューサーも遅くなる
  4. デバッグ困難: バックプレッシャーの発生箇所を特定しづらい

ユースケース

1. ログ収集・ストリーミング(Kafka)

Kafka のコンシューマーは、fetch.max.bytes や max.poll.records でバックプレッシャーを制御します。

# Kafka Consumer 設定
fetch.max.bytes=52428800  # 50MB まで取得
max.poll.records=500      # 1回のポーリングで最大500件

2. WebSocket ストリーミング

WebSocket でリアルタイムデータを配信する際、クライアントの処理速度に合わせて送信を制御します。

import WebSocket from "ws";

const wss = new WebSocket.Server({ port: 8080 });

wss.on("connection", (ws) => {
  let isPaused = false;

  ws.on("message", (message) => {
    if (message === "pause") {
      isPaused = true;
    } else if (message === "resume") {
      isPaused = false;
    }
  });

  setInterval(() => {
    if (!isPaused && ws.readyState === WebSocket.OPEN) {
      ws.send(`data: ${Date.now()}`);
    }
  }, 100);
});

3. ETL パイプライン

データベースからデータを読み取り、変換して別のデータベースに書き込む際、書き込み速度に合わせて読み取りを制御します。

4. リアルタイム分析

Apache Flink や Spark Streaming では、内部的にバックプレッシャーを管理し、データソースからの取得速度を調整します。

落とし穴

1. キューサイズの設定ミス

キューが大きすぎるとメモリを浪費し、小さすぎるとバックプレッシャーが頻発します。

// 悪い例: 無制限キュー
const queue = []; // メモリ枯渇のリスク

// 良い例: 有限キュー
const maxSize = 1000;
if (queue.length >= maxSize) {
  // バックプレッシャー処理
}

2. 同期処理との混在

非同期バックプレッシャーと同期ブロッキングを混ぜると、デッドロックが発生します。

3. エラーハンドリングの欠如

バックプレッシャーが発生した際のエラーハンドリングがないと、データロスや例外が発生します。

4. Dropping 戦略の誤用

重要なデータに Dropping 戦略を使うと、データロスが発生します。メトリクスやログには適しますが、トランザクションデータには不適切です。

5. TCP のバックプレッシャー無視

アプリケーション層でバックプレッシャーを実装しても、TCP 層のバッファが溢れるとパケットロスが発生します。

比較表

バックプレッシャー戦略の比較

戦略データロスメモリ消費レイテンシ適用場面
Bufferingなし高い高い一時的なバースト
Droppingあり低い低いメトリクス・ログ
Samplingあり低い低い高頻度イベント
Conflationあり(古いデータ)低い低いUI更新
Blockingなし低い高い同期処理
Errorなし(中断)低い-厳格なSLA

フレームワークのバックプレッシャー対応

フレームワークバックプレッシャー実装方式
RxJavaReactive Streams
RxJS△(一部)bufferTime, throttle
Akka StreamsReactive Streams
Node.js StreamsBackpressure API
Go ChannelsBuffered Channel
Kafkafetch.max.bytes
HTTP/2WINDOW_UPDATE
TCPFlow Control (Window Size)

ベストプラクティス

  1. Reactive Streams 準拠のライブラリを使う: RxJava、Akka Streams、Project Reactor
  2. キューサイズを監視: キューの深さをメトリクスで追跡
  3. 戦略を明示: Buffering / Dropping / Sampling のいずれかを明確に選択
  4. エラーハンドリング: バックプレッシャー発生時の挙動を定義
  5. 段階的なバックプレッシャー: サーキットブレーカーと組み合わせる
  6. 負荷テスト: 高負荷時の挙動を確認
  7. ログ: バックプレッシャー発生を記録
  8. ドキュメント: バックプレッシャー戦略をチームで共有

まとめ

バックプレッシャーは、分散システムやストリーム処理において、過負荷を防ぐための重要な技術です。

  • 原理: コンシューマーの処理速度にプロデューサーを合わせる
  • 戦略: Buffering、Dropping、Sampling、Conflation、Blocking、Error
  • 実装: Reactive Streams、Node.js Streams、Go Channels、TCP、HTTP/2
  • 採用例: Kafka、Akka Streams、RxJava、Flink
  • 監視: キュー深さ、バックプレッシャー発生頻度

バックプレッシャーなしでストリーム処理を行うと、システムがクラッシュするリスクが高まります。適切な戦略を選択しましょう。

応用トピック

サーキットブレーカーとの組み合わせ

バックプレッシャーとサーキットブレーカーを組み合わせ、過負荷時に上流をフェイルファストで切断します。

class BackpressureWithCircuitBreaker {
  private queue: any[] = [];
  private maxSize = 1000;
  private failureCount = 0;
  private circuitOpen = false;

  push(item: any): boolean {
    if (this.circuitOpen) {
      throw new Error("Circuit breaker is OPEN");
    }

    if (this.queue.length >= this.maxSize) {
      this.failureCount++;
      if (this.failureCount > 10) {
        this.circuitOpen = true; // サーキットブレーカー発動
      }
      return false; // バックプレッシャー
    }

    this.queue.push(item);
    this.failureCount = 0;
    return true;
  }
}

Adaptive Backpressure

コンシューマーの処理速度を動的に測定し、バッファサイズを自動調整します。

Load Shedding(負荷削減)

バックプレッシャーが限界に達したら、優先度の低いリクエストを破棄します。

Rate Limiting との違い

  • Rate Limiting: プロデューサー側で送信レートを制限
  • Backpressure: コンシューマー側からのシグナルで制限

両者を組み合わせることで、より堅牢なシステムになります。

参考リソース

関連記事

この技術を体系的に学びたいですか?

未来学では東証プライム上場企業のITエンジニアが24時間サポート。月額24,800円から、退会金0円のオンラインIT塾です。

メールで無料相談する
← 一覧に戻る