Elixir/Phoenix 2025 - 高並行リアルタイムWeb開発

2026.01.12

Elixir/Phoenixとは

Elixirは、Erlang VM (BEAM) 上で動作する関数型プログラミング言語です。Phoenixは、Elixirで書かれた高性能Webフレームワークで、リアルタイムWebアプリケーションの構築に特化しています。2025年現在、Discord、Pinterest、PepsiCoなど多くの大規模サービスで採用され、その信頼性と拡張性が実証されています。

注目ポイント: Discordは1つのサーバーで500万以上の同時接続を処理しており、Elixir/Phoenixの圧倒的なスケーラビリティを証明しています。

flowchart TB
    subgraph BEAM["BEAM VM (Erlang Virtual Machine)"]
        subgraph Scheduler1["Scheduler 1"]
            P1["Process"]
            P2["Process"]
        end
        subgraph Scheduler2["Scheduler 2"]
            P3["Process"]
            P4["Process"]
        end
        subgraph SchedulerN["Scheduler N"]
            P5["Process"]
            P6["Process"]
        end
    end

    subgraph OTP["OTP (Open Telecom Platform)"]
        GenServer["GenServer"]
        Supervisor["Supervisor"]
        Application["Application"]
    end

    subgraph Phoenix["Phoenix Framework"]
        LiveView["LiveView"]
        Channels["Channels"]
        Ecto["Ecto"]
        PubSub["PubSub"]
    end

    BEAM --> OTP
    OTP --> Phoenix

BEAM VMの特徴

軽量プロセスと並行性

BEAM VMの最大の特徴は、軽量プロセスによる並行処理モデルです。OSスレッドとは異なり、各プロセスはわずか数KBのメモリしか消費せず、数百万のプロセスを同時に実行できます。

# 100万のプロセスを起動する例
defmodule MassiveProcesses do
  def start_processes(count) do
    1..count
    |> Enum.map(fn i ->
      spawn(fn ->
        # 各プロセスは独立して動作
        receive do
          {:message, msg} -> IO.puts("Process #{i}: #{msg}")
        end
      end)
    end)
  end

  def send_to_all(pids, message) do
    Enum.each(pids, fn pid ->
      send(pid, {:message, message})
    end)
  end
end

# 使用例
pids = MassiveProcesses.start_processes(1_000_000)
MassiveProcesses.send_to_all(pids, "Hello!")

プロセス間通信

Elixirのプロセスは共有メモリを持たず、メッセージパッシングで通信します。これにより、デッドロックやレースコンディションのリスクが大幅に軽減されます。

defmodule Counter do
  use GenServer

  # クライアントAPI
  def start_link(initial_value \\ 0) do
    GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
  end

  def increment do
    GenServer.call(__MODULE__, :increment)
  end

  def get_value do
    GenServer.call(__MODULE__, :get_value)
  end

  # サーバーコールバック
  @impl true
  def init(initial_value) do
    {:ok, initial_value}
  end

  @impl true
  def handle_call(:increment, _from, state) do
    {:reply, state + 1, state + 1}
  end

  @impl true
  def handle_call(:get_value, _from, state) do
    {:reply, state, state}
  end
end

OTPによる耐障害性設計

Supervisorツリー

OTP (Open Telecom Platform) は、Erlang/Elixirの強力なライブラリ群です。特にSupervisorパターンは「Let it crash」の思想に基づき、障害から自動的に回復するシステムを構築できます。

defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # データベース接続プール
      {MyApp.Repo, []},
      # PubSubシステム
      {Phoenix.PubSub, name: MyApp.PubSub},
      # Webサーバー
      MyAppWeb.Endpoint,
      # カスタムワーカー(最大3回再起動)
      %{
        id: MyApp.CriticalWorker,
        start: {MyApp.CriticalWorker, :start_link, []},
        restart: :permanent,
        max_restarts: 3,
        max_seconds: 5
      }
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end
flowchart TB
    subgraph SupervisorTree["Supervisor Tree"]
        AppSup["Application Supervisor<br/>(one_for_one)"]

        subgraph Workers["Workers"]
            Repo["Repo<br/>(DB Pool)"]
            PubSub["PubSub"]
            Endpoint["Endpoint"]
            Worker["CriticalWorker"]
        end

        AppSup --> Repo
        AppSup --> PubSub
        AppSup --> Endpoint
        AppSup --> Worker
    end

    Crash["Worker Crash"] --> Worker
    Worker -.-> |"Auto Restart"| Worker

再起動戦略

戦略説明ユースケース
:one_for_oneクラッシュした子プロセスのみ再起動独立したワーカー
:one_for_all1つがクラッシュすると全て再起動相互依存するワーカー
:rest_for_oneクラッシュした以降のワーカーを再起動順序依存のワーカー

Phoenix LiveView

リアルタイムUIの革新

Phoenix LiveViewは、サーバーサイドレンダリングとWebSocketを組み合わせ、JavaScriptをほとんど書かずにリアルタイムUIを構築できる革新的な機能です。

defmodule MyAppWeb.CounterLive do
  use MyAppWeb, :live_view

  @impl true
  def mount(_params, _session, socket) do
    if connected?(socket) do
      # クライアント接続後にリアルタイム更新を開始
      :timer.send_interval(1000, self(), :tick)
    end

    {:ok, assign(socket, count: 0, last_updated: DateTime.utc_now())}
  end

  @impl true
  def handle_event("increment", _params, socket) do
    {:noreply, update(socket, :count, &(&1 + 1))}
  end

  @impl true
  def handle_event("decrement", _params, socket) do
    {:noreply, update(socket, :count, &(&1 - 1))}
  end

  @impl true
  def handle_info(:tick, socket) do
    {:noreply, assign(socket, last_updated: DateTime.utc_now())}
  end

  @impl true
  def render(assigns) do
    ~H"""
    <div class="counter-container">
      <h1>リアルタイムカウンター</h1>

      <div class="counter-value">
        <%= @count %>
      </div>

      <div class="counter-controls">
        <button phx-click="decrement" class="btn btn-danger">
          -
        </button>
        <button phx-click="increment" class="btn btn-primary">
          +
        </button>
      </div>

      <p class="last-updated">
        最終更新: <%= Calendar.strftime(@last_updated, "%H:%M:%S") %>
      </p>
    </div>
    """
  end
end

LiveViewのリアルタイムチャット実装

より実践的な例として、リアルタイムチャットアプリケーションを実装してみましょう。

defmodule MyAppWeb.ChatLive do
  use MyAppWeb, :live_view

  alias MyApp.Chat

  @impl true
  def mount(%{"room_id" => room_id}, session, socket) do
    if connected?(socket) do
      # PubSubでルームに参加
      Phoenix.PubSub.subscribe(MyApp.PubSub, "chat:#{room_id}")
    end

    user = get_user_from_session(session)
    messages = Chat.list_messages(room_id)

    {:ok,
     socket
     |> assign(:room_id, room_id)
     |> assign(:user, user)
     |> assign(:messages, messages)
     |> assign(:new_message, "")}
  end

  @impl true
  def handle_event("send_message", %{"message" => content}, socket) do
    %{room_id: room_id, user: user} = socket.assigns

    case Chat.create_message(%{
           room_id: room_id,
           user_id: user.id,
           content: content
         }) do
      {:ok, message} ->
        # 全参加者にブロードキャスト
        Phoenix.PubSub.broadcast(
          MyApp.PubSub,
          "chat:#{room_id}",
          {:new_message, message}
        )

        {:noreply, assign(socket, new_message: "")}

      {:error, _changeset} ->
        {:noreply, put_flash(socket, :error, "メッセージを送信できませんでした")}
    end
  end

  @impl true
  def handle_info({:new_message, message}, socket) do
    {:noreply, update(socket, :messages, &(&1 ++ [message]))}
  end

  @impl true
  def render(assigns) do
    ~H"""
    <div class="chat-container" id="chat" phx-hook="ScrollToBottom">
      <div class="messages">
        <%= for message <- @messages do %>
          <div class={"message #{if message.user_id == @user.id, do: "own"}"}>
            <span class="author"><%= message.user.name %></span>
            <p class="content"><%= message.content %></p>
            <time><%= format_time(message.inserted_at) %></time>
          </div>
        <% end %>
      </div>

      <form phx-submit="send_message" class="message-form">
        <input
          type="text"
          name="message"
          value={@new_message}
          placeholder="メッセージを入力..."
          autocomplete="off"
        />
        <button type="submit">送信</button>
      </form>
    </div>
    """
  end

  defp format_time(datetime) do
    Calendar.strftime(datetime, "%H:%M")
  end

  defp get_user_from_session(session) do
    # セッションからユーザー情報を取得
    MyApp.Accounts.get_user(session["user_id"])
  end
end

Phoenix Channels

LiveViewに加えて、Phoenix Channelsは低レベルなWebSocket通信を提供します。モバイルアプリやカスタムクライアントとの連携に最適です。

defmodule MyAppWeb.GameChannel do
  use MyAppWeb, :channel

  alias MyApp.GameServer

  @impl true
  def join("game:" <> game_id, _params, socket) do
    case GameServer.join_game(game_id, socket.assigns.user_id) do
      {:ok, game_state} ->
        send(self(), :after_join)
        {:ok, game_state, assign(socket, :game_id, game_id)}

      {:error, reason} ->
        {:error, %{reason: reason}}
    end
  end

  @impl true
  def handle_info(:after_join, socket) do
    # 他のプレイヤーに参加を通知
    broadcast!(socket, "player_joined", %{
      user_id: socket.assigns.user_id,
      timestamp: DateTime.utc_now()
    })

    {:noreply, socket}
  end

  @impl true
  def handle_in("move", %{"position" => position}, socket) do
    %{game_id: game_id, user_id: user_id} = socket.assigns

    case GameServer.make_move(game_id, user_id, position) do
      {:ok, new_state} ->
        broadcast!(socket, "game_updated", new_state)
        {:reply, :ok, socket}

      {:error, reason} ->
        {:reply, {:error, %{reason: reason}}, socket}
    end
  end

  @impl true
  def terminate(_reason, socket) do
    GameServer.leave_game(socket.assigns.game_id, socket.assigns.user_id)
    :ok
  end
end

Ectoによるデータベース操作

Ectoは、Elixirの強力なデータベースラッパーおよびクエリ言語です。型安全性と表現力を両立しています。

defmodule MyApp.Blog.Post do
  use Ecto.Schema
  import Ecto.Changeset

  schema "posts" do
    field :title, :string
    field :body, :string
    field :published, :boolean, default: false
    field :view_count, :integer, default: 0
    field :published_at, :utc_datetime

    belongs_to :author, MyApp.Accounts.User
    has_many :comments, MyApp.Blog.Comment
    many_to_many :tags, MyApp.Blog.Tag, join_through: "posts_tags"

    timestamps()
  end

  def changeset(post, attrs) do
    post
    |> cast(attrs, [:title, :body, :published, :author_id])
    |> validate_required([:title, :body, :author_id])
    |> validate_length(:title, min: 5, max: 200)
    |> validate_length(:body, min: 100)
    |> maybe_set_published_at()
  end

  defp maybe_set_published_at(changeset) do
    if get_change(changeset, :published) == true do
      put_change(changeset, :published_at, DateTime.utc_now())
    else
      changeset
    end
  end
end

高度なクエリ

defmodule MyApp.Blog do
  import Ecto.Query
  alias MyApp.Repo
  alias MyApp.Blog.Post

  # 公開済み投稿を人気順で取得
  def list_popular_posts(limit \\ 10) do
    from(p in Post,
      where: p.published == true,
      order_by: [desc: p.view_count],
      limit: ^limit,
      preload: [:author, :tags]
    )
    |> Repo.all()
  end

  # タグ別の投稿数を集計
  def count_posts_by_tag do
    from(p in Post,
      join: t in assoc(p, :tags),
      where: p.published == true,
      group_by: t.name,
      select: {t.name, count(p.id)},
      order_by: [desc: count(p.id)]
    )
    |> Repo.all()
    |> Map.new()
  end

  # 全文検索(PostgreSQL)
  def search_posts(query_string) do
    from(p in Post,
      where:
        p.published == true and
          fragment(
            "to_tsvector('english', ? || ' ' || ?) @@ plainto_tsquery('english', ?)",
            p.title,
            p.body,
            ^query_string
          ),
      order_by: [desc: p.published_at]
    )
    |> Repo.all()
  end
end

パフォーマンス特性

ベンチマーク比較

フレームワークリクエスト/秒レイテンシ (P99)同時接続数
Phoenix 1.7180,0002.1ms2,000,000+
Rails 712,00045ms50,000
Django 415,00038ms60,000
Express.js85,0008ms500,000
Go Gin200,0001.8ms1,000,000+

メモリ効率

# プロセスのメモリ使用量を確認
defmodule MemoryAnalyzer do
  def analyze_processes do
    Process.list()
    |> Enum.map(fn pid ->
      info = Process.info(pid, [:memory, :message_queue_len, :registered_name])
      {pid, info}
    end)
    |> Enum.sort_by(fn {_pid, info} -> -info[:memory] end)
    |> Enum.take(10)
  end

  def system_memory do
    %{
      total: :erlang.memory(:total),
      processes: :erlang.memory(:processes),
      system: :erlang.memory(:system),
      atom: :erlang.memory(:atom),
      binary: :erlang.memory(:binary),
      ets: :erlang.memory(:ets)
    }
  end
end

2025年の動向

Phoenix 1.8の新機能

2025年にリリース予定のPhoenix 1.8では、以下の新機能が期待されています。

  • LiveView Streams強化: 大規模リストの効率的な更新
  • コンポーネントの遅延ロード: 初期ロード時間の短縮
  • 型システム統合: Elixir 1.18の型機能との連携
  • Bumblebee統合: 機械学習モデルのシームレスな組み込み

Elixir 1.18の型システム

# Elixir 1.18での型アノテーション(実験的)
defmodule MyApp.Calculator do
  @spec add(number(), number()) :: number()
  def add(a, b), do: a + b

  @spec divide(number(), number()) :: {:ok, float()} | {:error, :division_by_zero}
  def divide(_a, 0), do: {:error, :division_by_zero}
  def divide(a, b), do: {:ok, a / b}
end

Nx/Bumblebeeによる機械学習

defmodule MyApp.Sentiment do
  def analyze(text) do
    {:ok, model} = Bumblebee.load_model({:hf, "distilbert-base-uncased-finetuned-sst-2-english"})
    {:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "distilbert-base-uncased"})

    serving = Bumblebee.Text.text_classification(model, tokenizer)

    Nx.Serving.run(serving, text)
  end
end

# 使用例
MyApp.Sentiment.analyze("This product is amazing!")
# => %{predictions: [%{label: "POSITIVE", score: 0.9998}]}

採用事例

Discord

Discordは、Elixirを活用して500万以上の同時接続を単一サーバーで処理しています。特にリアルタイムメッセージングとボイスチャネル管理にElixirの並行処理能力が活かされています。

Pinterest

Pinterestは、通知システムにElixirを採用し、1日あたり数十億のプッシュ通知を処理しています。

PepsiCo

PepsiCoは、サプライチェーン管理システムにElixir/Phoenixを採用し、リアルタイムな在庫追跡と需要予測を実現しています。

まとめ

Elixir/Phoenixは、2025年においても最先端のリアルタイムWeb開発プラットフォームとして進化を続けています。

主な強み

  • 圧倒的な並行処理能力: BEAMの軽量プロセスによる数百万の同時接続
  • 耐障害性: OTPのSupervisorパターンによる自動復旧
  • 開発者体験: LiveViewによるシンプルなリアルタイムUI開発
  • パフォーマンス: 低レイテンシ、高スループット
  • 保守性: 関数型プログラミングによる予測可能なコード

適したユースケース

  • リアルタイムチャット・メッセージング
  • IoTデータ処理
  • ゲームサーバー
  • 金融取引システム
  • ライブストリーミング

Elixir/Phoenixは、特に高い並行性と耐障害性が求められるシステムにおいて、その真価を発揮します。学習曲線は存在しますが、関数型プログラミングとOTPの概念を習得することで、堅牢でスケーラブルなシステムを効率的に構築できるようになります。

参考リンク

この技術を体系的に学びたいですか?

未来学では東証プライム上場企業のITエンジニアが24時間サポート。月額24,800円から、退会金0円のオンラインIT塾です。

LINEで無料相談する
← 一覧に戻る