ログ集約の必要性
分散システムでは、複数のサービスからログを収集し、一元管理する必要があります。ログ集約により、問題の特定と分析が容易になります。
構造化ログの出力
Node.js (Pino)
// logger.ts
import pino from 'pino';
export const logger = pino({
level: process.env.LOG_LEVEL || 'info',
formatters: {
level: (label) => ({ level: label }),
},
base: {
service: 'my-service',
env: process.env.NODE_ENV,
},
});
// 使用例
logger.info({ userId: '123', action: 'login' }, 'User logged in');
logger.error({ err, requestId: '456' }, 'Request failed');
Python (structlog)
# logger.py
import structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)
logger = structlog.get_logger()
# 使用例
logger.info("user_login", user_id="123", ip="192.168.1.1")
logger.error("request_failed", error=str(e), request_id="456")
ELKスタック
docker-compose.yml
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
ports:
- "5044:5044"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
depends_on:
- elasticsearch
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
volumes:
elasticsearch-data:
Logstash設定
# logstash.conf
input {
beats {
port => 5044
}
tcp {
port => 5000
codec => json
}
}
filter {
if [message] =~ /^{/ {
json {
source => "message"
}
}
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
mutate {
remove_field => [ "message" ]
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}
Filebeat設定
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/app/*.log
json.keys_under_root: true
json.add_error_key: true
output.logstash:
hosts: ["logstash:5044"]
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
Grafana Loki
docker-compose.yml
version: '3.8'
services:
loki:
image: grafana/loki:2.9.0
ports:
- "3100:3100"
command: -config.file=/etc/loki/local-config.yaml
volumes:
- loki-data:/loki
promtail:
image: grafana/promtail:2.9.0
volumes:
- ./promtail-config.yml:/etc/promtail/config.yml
- /var/log:/var/log
command: -config.file=/etc/promtail/config.yml
grafana:
image: grafana/grafana:10.2.0
ports:
- "3000:3000"
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
volumes:
- grafana-data:/var/lib/grafana
volumes:
loki-data:
grafana-data:
Promtail設定
# promtail-config.yml
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: app-logs
static_configs:
- targets:
- localhost
labels:
job: app
__path__: /var/log/app/*.log
pipeline_stages:
- json:
expressions:
level: level
timestamp: timestamp
message: message
- labels:
level:
- timestamp:
source: timestamp
format: RFC3339
アプリケーションからの直接送信
// Winston + Loki
import winston from 'winston';
import LokiTransport from 'winston-loki';
const logger = winston.createLogger({
transports: [
new LokiTransport({
host: 'http://loki:3100',
labels: { app: 'my-service' },
json: true,
}),
],
});
AWS CloudWatch Logs
AWS SDK
// cloudwatch-logger.ts
import { CloudWatchLogsClient, PutLogEventsCommand } from '@aws-sdk/client-cloudwatch-logs';
const client = new CloudWatchLogsClient({ region: 'ap-northeast-1' });
async function sendLogs(logGroupName: string, logStreamName: string, events: any[]) {
const command = new PutLogEventsCommand({
logGroupName,
logStreamName,
logEvents: events.map(event => ({
timestamp: Date.now(),
message: JSON.stringify(event),
})),
});
await client.send(command);
}
// 使用例
await sendLogs('/my-app/production', 'api-server', [
{ level: 'info', message: 'Request received', requestId: '123' },
]);
CloudWatch Logs Insights クエリ
# エラーログの検索
fields @timestamp, @message
| filter @message like /error/i
| sort @timestamp desc
| limit 100
# リクエストの統計
fields @timestamp, @message
| parse @message '{"requestId":"*","duration":*}' as requestId, duration
| stats avg(duration), max(duration), count() by bin(1h)
# 特定ユーザーのログ
fields @timestamp, @message
| filter @message like /"userId":"123"/
| sort @timestamp desc
LogQLクエリ(Loki)
# エラーログの検索
{app="my-service"} |= "error"
# JSONログのパースとフィルタ
{app="my-service"} | json | level="error"
# 統計
sum(rate({app="my-service"} |= "error" [5m]))
# 特定フィールドでの絞り込み
{app="my-service"} | json | userId="123"
アラート設定
Grafana Alert Rules
# alerts.yml
groups:
- name: log-alerts
rules:
- alert: HighErrorRate
expr: sum(rate({app="my-service"} |= "error" [5m])) > 10
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} per second"
関連記事
- ロギングベストプラクティス - ログ設計
- 監視とオブザーバビリティ - 監視戦略
- パフォーマンステスト - 負荷テスト
まとめ
ログ集約は、分散システムの可観測性に不可欠です。構造化ログを出力し、適切なツールで収集・分析することで、問題の早期発見と解決が可能になります。
← 一覧に戻る