この記事の要点
• バックプレッシャー: プロデューサーがコンシューマーの処理速度に合わせることで過負荷を防ぐ
• キューの飽和やOOMを回避し、システム全体のスループットを最適化
• Reactive Streams、TCP、HTTP/2、Kafka など多くのシステムで採用
バックプレッシャー(Backpressure)は、データストリーム処理において、受信側(コンシューマー)の処理能力に応じて送信側(プロデューサー)の速度を制御する仕組みです。過負荷によるメモリ枯渇やシステムダウンを防ぎ、安定した処理を実現します。本記事では、バックプレッシャーの原理、実装パターン、主要フレームワークでの使い方を体系的に解説します。
概要
バックプレッシャーとは
システムにおいて、プロデューサー(データ送信側)がコンシューマー(データ受信側)より高速な場合、中間のキューが溢れる問題が発生します。バックプレッシャーは、コンシューマーが「もう処理できない」と通知し、プロデューサーに送信を停止・減速させる仕組みです。
sequenceDiagram
participant P as Producer<br/>(高速)
participant Q as Queue<br/>(有限)
participant C as Consumer<br/>(低速)
P->>Q: Data 1
P->>Q: Data 2
P->>Q: Data 3
Q->>C: Data 1
Note over C: 処理中...(遅い)
P->>Q: Data 4
P->>Q: Data 5
Note over Q: キュー満杯!
Q-->>P: Backpressure Signal<br/>(送信停止)
Note over P: 待機
C->>Q: Request 1
Q->>C: Data 2
Q-->>P: Resume Signal
P->>Q: Data 6
注意: バックプレッシャーなしで無限にキューイングすると、メモリ枯渇(OOM)でシステムがクラッシュします。
バックプレッシャーが必要な理由
1. メモリ枯渇の防止
キューにデータが蓄積し続けると、ヒープメモリが枯渇します。
2. レイテンシの悪化
キューが深くなると、データが処理されるまでの待ち時間が増えます。
3. カスケード障害
上流の過負荷が下流に伝播し、システム全体がダウンします。
原則・定義
バックプレッシャーの基本戦略
ポイント: バックプレッシャーにはプル型(Pull-based)とプッシュ型(Push-based)の2つのモデルがあります。
| モデル | 説明 | 例 |
|---|---|---|
| プル型 | コンシューマーが明示的に「次のデータをくれ」とリクエスト | Reactive Streams, Iterator |
| プッシュ型 | プロデューサーが送信し、コンシューマーが「待て」と通知 | TCP フロー制御, HTTP/2 |
Reactive Streams の仕様
Reactive Streams は、非同期ストリーム処理の標準で、以下の4つのインターフェースを定義します。
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
public interface Subscription {
void request(long n); // バックプレッシャーのコア
void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
コンシューマー(Subscriber)が request(n) で明示的に要求した分だけ、プロデューサー(Publisher)がデータを送信します。
構成要素
バックプレッシャー戦略の種類
| 戦略 | 説明 | 適用場面 |
|---|---|---|
| Buffering | キューにバッファし、満杯時に待機 | 一時的なバースト |
| Dropping | 新しいデータを破棄 | リアルタイム監視(古いデータは無価値) |
| Sampling | 一部のデータだけ取得 | メトリクス収集 |
| Conflation | 最新データのみ保持(古いデータを上書き) | UI更新 |
| Blocking | プロデューサーをブロック | 同期処理 |
| Error | エラーを投げてストリームを終了 | 厳格なSLA |
実装例
1. Reactive Streams(RxJS)
import { Observable, Subscriber } from "rxjs";
// カスタムプロデューサー(バックプレッシャー対応)
const producer = new Observable<number>((subscriber: Subscriber<number>) => {
let count = 0;
let requested = 0;
// コンシューマーからの request シグナルを受信
const requestHandler = (n: number) => {
requested += n;
while (requested > 0 && count < 100) {
subscriber.next(count++);
requested--;
}
if (count >= 100) {
subscriber.complete();
}
};
// Reactive Streams の request を模擬(RxJS はデフォルトでバックプレッシャー非対応)
// 実際は RxJS v7 では observeOn + bufferTime などで制御
requestHandler(10); // 初期リクエスト
return () => console.log("Unsubscribed");
});
producer.subscribe({
next: (value) => {
console.log(`Received: ${value}`);
// 処理が遅い場合のシミュレーション
},
complete: () => console.log("Complete"),
});
2. Node.js Streams(Backpressure 対応)
Node.js の Stream は、内部的にバックプレッシャーをサポートします。
import { Readable, Writable } from "stream";
// 高速プロデューサー
const fastProducer = new Readable({
read() {
for (let i = 0; i < 100; i++) {
const canContinue = this.push(`data-${i}\n`);
if (!canContinue) {
console.log("Backpressure! Pausing...");
break; // バックプレッシャー発生
}
}
},
});
// 低速コンシューマー
const slowConsumer = new Writable({
write(chunk, encoding, callback) {
console.log(`Processing: ${chunk.toString().trim()}`);
// 遅延をシミュレート
setTimeout(callback, 100);
},
});
fastProducer.pipe(slowConsumer);
slowConsumer.on("drain", () => {
console.log("Drained! Resuming...");
});
3. Go の Channel(Buffered Channel)
package main
import (
"fmt"
"time"
)
func producer(ch chan<- int) {
for i := 0; i < 100; i++ {
fmt.Printf("Producing: %d\n", i)
ch <- i // バッファが満杯ならブロック(バックプレッシャー)
}
close(ch)
}
func consumer(ch <-chan int) {
for value := range ch {
fmt.Printf("Consuming: %d\n", value)
time.Sleep(100 * time.Millisecond) // 遅い処理
}
}
func main() {
ch := make(chan int, 10) // バッファサイズ10
go producer(ch)
consumer(ch)
}
4. Akka Streams(Reactive Streams 実装)
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, OverflowStrategy}
implicit val system = ActorSystem("BackpressureExample")
implicit val materializer = ActorMaterializer()
Source(1 to 100)
.buffer(10, OverflowStrategy.backpressure) // バッファサイズ10、満杯時は待機
.throttle(1, 100.milliseconds) // 1秒に10個まで
.map { x =>
println(s"Processing: $x")
x * 2
}
.runWith(Sink.ignore)
実践メモ: Akka Streams や RxJava などの Reactive Streams 実装では、バックプレッシャーがデフォルトで組み込まれています。手動制御は不要です。
5. HTTP/2 のフロー制御
HTTP/2 は WINDOW_UPDATE フレームでバックプレッシャーを実現します。
// Node.js http2 サーバー(バックプレッシャー対応)
import http2 from "http2";
const server = http2.createServer();
server.on("stream", (stream, headers) => {
let count = 0;
const interval = setInterval(() => {
const canContinue = stream.write(`data: ${count++}\n`);
if (!canContinue) {
console.log("Backpressure! Pausing...");
clearInterval(interval);
stream.once("drain", () => {
console.log("Resumed!");
// 再開処理
});
}
if (count > 100) {
clearInterval(interval);
stream.end();
}
}, 10);
});
server.listen(3000);
メリット・デメリット
メリット
- 安定性: メモリ枯渇やクラッシュを防ぐ
- スループット最適化: システム全体の処理能力に合わせる
- レイテンシ削減: キューの深さを制限
- カスケード障害防止: 過負荷が伝播しない
デメリット
- 実装複雑: 手動実装は難しい
- デッドロック: 不適切な実装でデッドロックの可能性
- スループット低下: コンシューマーが遅いとプロデューサーも遅くなる
- デバッグ困難: バックプレッシャーの発生箇所を特定しづらい
ユースケース
1. ログ収集・ストリーミング(Kafka)
Kafka のコンシューマーは、fetch.max.bytes や max.poll.records でバックプレッシャーを制御します。
# Kafka Consumer 設定
fetch.max.bytes=52428800 # 50MB まで取得
max.poll.records=500 # 1回のポーリングで最大500件
2. WebSocket ストリーミング
WebSocket でリアルタイムデータを配信する際、クライアントの処理速度に合わせて送信を制御します。
import WebSocket from "ws";
const wss = new WebSocket.Server({ port: 8080 });
wss.on("connection", (ws) => {
let isPaused = false;
ws.on("message", (message) => {
if (message === "pause") {
isPaused = true;
} else if (message === "resume") {
isPaused = false;
}
});
setInterval(() => {
if (!isPaused && ws.readyState === WebSocket.OPEN) {
ws.send(`data: ${Date.now()}`);
}
}, 100);
});
3. ETL パイプライン
データベースからデータを読み取り、変換して別のデータベースに書き込む際、書き込み速度に合わせて読み取りを制御します。
4. リアルタイム分析
Apache Flink や Spark Streaming では、内部的にバックプレッシャーを管理し、データソースからの取得速度を調整します。
落とし穴
1. キューサイズの設定ミス
キューが大きすぎるとメモリを浪費し、小さすぎるとバックプレッシャーが頻発します。
// 悪い例: 無制限キュー
const queue = []; // メモリ枯渇のリスク
// 良い例: 有限キュー
const maxSize = 1000;
if (queue.length >= maxSize) {
// バックプレッシャー処理
}
2. 同期処理との混在
非同期バックプレッシャーと同期ブロッキングを混ぜると、デッドロックが発生します。
3. エラーハンドリングの欠如
バックプレッシャーが発生した際のエラーハンドリングがないと、データロスや例外が発生します。
4. Dropping 戦略の誤用
重要なデータに Dropping 戦略を使うと、データロスが発生します。メトリクスやログには適しますが、トランザクションデータには不適切です。
5. TCP のバックプレッシャー無視
アプリケーション層でバックプレッシャーを実装しても、TCP 層のバッファが溢れるとパケットロスが発生します。
比較表
バックプレッシャー戦略の比較
| 戦略 | データロス | メモリ消費 | レイテンシ | 適用場面 |
|---|---|---|---|---|
| Buffering | なし | 高い | 高い | 一時的なバースト |
| Dropping | あり | 低い | 低い | メトリクス・ログ |
| Sampling | あり | 低い | 低い | 高頻度イベント |
| Conflation | あり(古いデータ) | 低い | 低い | UI更新 |
| Blocking | なし | 低い | 高い | 同期処理 |
| Error | なし(中断) | 低い | - | 厳格なSLA |
フレームワークのバックプレッシャー対応
| フレームワーク | バックプレッシャー | 実装方式 |
|---|---|---|
| RxJava | ◯ | Reactive Streams |
| RxJS | △(一部) | bufferTime, throttle |
| Akka Streams | ◯ | Reactive Streams |
| Node.js Streams | ◯ | Backpressure API |
| Go Channels | ◯ | Buffered Channel |
| Kafka | ◯ | fetch.max.bytes |
| HTTP/2 | ◯ | WINDOW_UPDATE |
| TCP | ◯ | Flow Control (Window Size) |
ベストプラクティス
- Reactive Streams 準拠のライブラリを使う: RxJava、Akka Streams、Project Reactor
- キューサイズを監視: キューの深さをメトリクスで追跡
- 戦略を明示: Buffering / Dropping / Sampling のいずれかを明確に選択
- エラーハンドリング: バックプレッシャー発生時の挙動を定義
- 段階的なバックプレッシャー: サーキットブレーカーと組み合わせる
- 負荷テスト: 高負荷時の挙動を確認
- ログ: バックプレッシャー発生を記録
- ドキュメント: バックプレッシャー戦略をチームで共有
まとめ
バックプレッシャーは、分散システムやストリーム処理において、過負荷を防ぐための重要な技術です。
- 原理: コンシューマーの処理速度にプロデューサーを合わせる
- 戦略: Buffering、Dropping、Sampling、Conflation、Blocking、Error
- 実装: Reactive Streams、Node.js Streams、Go Channels、TCP、HTTP/2
- 採用例: Kafka、Akka Streams、RxJava、Flink
- 監視: キュー深さ、バックプレッシャー発生頻度
バックプレッシャーなしでストリーム処理を行うと、システムがクラッシュするリスクが高まります。適切な戦略を選択しましょう。
応用トピック
サーキットブレーカーとの組み合わせ
バックプレッシャーとサーキットブレーカーを組み合わせ、過負荷時に上流をフェイルファストで切断します。
class BackpressureWithCircuitBreaker {
private queue: any[] = [];
private maxSize = 1000;
private failureCount = 0;
private circuitOpen = false;
push(item: any): boolean {
if (this.circuitOpen) {
throw new Error("Circuit breaker is OPEN");
}
if (this.queue.length >= this.maxSize) {
this.failureCount++;
if (this.failureCount > 10) {
this.circuitOpen = true; // サーキットブレーカー発動
}
return false; // バックプレッシャー
}
this.queue.push(item);
this.failureCount = 0;
return true;
}
}
Adaptive Backpressure
コンシューマーの処理速度を動的に測定し、バッファサイズを自動調整します。
Load Shedding(負荷削減)
バックプレッシャーが限界に達したら、優先度の低いリクエストを破棄します。
Rate Limiting との違い
- Rate Limiting: プロデューサー側で送信レートを制限
- Backpressure: コンシューマー側からのシグナルで制限
両者を組み合わせることで、より堅牢なシステムになります。
参考リソース
- Reactive Streams Specification
- The Reactive Manifesto
- Node.js Stream API - Backpressure
- Akka Streams Documentation
- RxJava Backpressure
- HTTP/2 Flow Control
- Designing Data-Intensive Applications - Chapter 11
関連記事
- サーキットブレーカーパターン - 過負荷時のフェイルファストとバックプレッシャーの組み合わせ
- レート制限 - プロデューサー側での送信速度制御