マイクロサービスのための Tumblr 製フレームワーク "Colossus"

この記事は Scala Advent Calendar 2014 の 15 日目です。昨日は id:qtamaki さんの”「関数プログラミング 珠玉のアルゴリズムデザイン」をScalaで実装してみる”でした。

今日は、先日に TumblrOSS 化を発表した Scala 製のノンブロッキング I/O (NIO) フレームワーク "Colossus" を紹介したい。”高性能なマイクロサービスを構築するためのフレームワークを謳っており、まだ OSS 化されて日が浅いものの Tumblr ではすでに production で使われているとされる。また、Colossus 自体がアクターフレームワーク Akka のアクターとして実装されており、それを使った独自のスレッドモデルを提供している点も興味深い。

基本的なコンセプト

Tumblr が Colossus を開発した狙いについて、公式サイトの冒頭でこのように述べている。

Colossus は、ノンブロッキングネットワーク I/O を必要とする高性能なアプリケーション構築のための Scala 製軽量フレームワークである。Colossus は特に、低レイテンシのステートレスなマイクロサービス(たいていはデータベースやキャッシュを抽象化したものと大きく変わらない)に焦点を当てている。Colossus は、こうしたユースケースにおいて性能を最大化すると共に、インタフェースをクリーンかつ簡潔に保つことを狙っている。

つまり…どういうことだってばよ?

マイクロサービス・アーキテクチャでは、コンポーネント化された軽量サービス同士を連携させてアプリケーションを組み立てる。つまり、多くのマイクロサービスのビジネスロジックはこのようになっているはずだ:

  1. クライアントからコネクションが張られてリクエストが来る。
  2. 他のバックエンドサービス(データベースとか)にリクエストを飛ばしてレスポンスを待つ。
  3. バックエンドサービスから戻ってきたレスポンスを使ってレスポンスを組み立ててクライアントに返す。

Colossus を使って書くとこんな感じ(Colossus のドキュメントより)。

Service.serve[Http]("http-service", 9000){ context =>
  val redis = context.clientFor[Redis]("localhost", 6379)
  context.handle{ connection => 
    connection.become {
      case request @ Get on Root / "get" / key => redis.send(Commands.Get(key)).map{
        case BulkReply(data) => request.ok(data.utf8String)
        case NilReply => request.notFound("(nil)")
      }
      ...
    }
  }
}

このように、非同期計算を抽象化した型である FuturemapflatMap をつないでいくアプローチは、Scala プログラマにとって見慣れたものだ。

ところで、Colossus において redis.send の戻り値は Callback であり Scala 標準ライブラリTwitter 製ライブラリに含まれているような Future ではない。

CallbackFuture と同様のインタフェースを持つ(し、実際に Future に変換することもできる)が、スレッドセーフではない。これは、Callback は単一のワーカスレッド上で実行されることを前提にしているからだ(あと、Callback を呼び出す準備ができた際に execute() メソッドを明示的に呼び出す必要がある。この呼び出しは通常はフレームワーク側がやってくれる)。

これは、冒頭に挙げたようなマイクロサービスのユースケースを前提にするなら、基本的にはスレッド間で状態を共有する必要がないからだ。また、データベース接続などの複数のコネクション間で共有したい状態については、イベントループごとに保持してコネクション間で共有すればよい(一つのイベントループは多数のコネクションを処理するが、シングルスレッドなのでイベントループ内では競合を心配する必要がない。また、全てはノンブロッキングに処理されるので、あるコネクションがデータベースからの応答を待っている間に、後続のコネクションがブロックすることはない)。

Future を使う場合、多くの実装はマルチスレッド動作のために ExecutionContext 内で処理を実行するが、これによりわずかだがオーバヘッドが発生する。このオーバヘッドが問題になるようなリクエストの処理においては、Colossus では代わりにシングルスレッドで動作する Callback を使うことでレイテンシを削減できる。これは、リクエストのサイズが小さくステートレスな場合に特に有効だと言える。

アーキテクチャ

Colossus の Core Layer は Akka 上で動作するアクターであり、例えばイベントループは下記のような”自分にメッセージを送るアクター”として実装されている(実際のコードはこのあたりにある)。

receive message {
  case `Select` => {
    event_handlers = selector.select()
    ...
    self ! `Select`
  }
  ...
}

ドキュメントから読み取れるアーキテクチャを図示するとこんな感じか:

f:id:okapies:20141215011648j:plain

Colossus 全体を駆動するのが IOSystem で、作成時に ActorSystem と関連付けて指定した数の worker(イベントループ)を作成する。なお、例えばアプリケーションが複数の独立したサーバを実行するような場合は、一つの ActorSystem に複数の IOSystem を関連付けてもよい。

implicit val actor_system = ActorSystem("system")
val io_system = IOSystem("io-system", numWorkers = 4)

IOSystem は、コネクションがやってくるとそれを各 Worker に振り分ける。各 Worker はアクターであり、ひとたび割り当てられたコネクションはずっと同じ Worker に束縛され続ける。つまり、Worker 内のイベントループが呼び出すイベントハンドラはシングルスレッドで動作することが保証される。

では、イベントループ間やコネクション間で状態を共有したい場合や、ブロックするような処理を実行したい場合はどうするか? その場合は、アクターと Future を使ってイベントループの外側で処理すればよい。

プログラミングモデル

サーバのビジネスロジックの実装方法は、関数型っぽい書き方とオブジェクト指向っぽい書き方の二通りがある。まずは関数型っぽい方から:

Service.serve[Http]("service-name", 456){context: ServiceContext[Http] => 
  //everything here happens once per event-loop
  context.handle{ connecton: ConnectionContext[Http] => 
    //everything here happens once per connection
    connection.become {
      //partial function HttpRequest => Response[HttpResponse]
      case req => req.ok("Hello world!")
    }
  }
}

この記法は、実際には下記のような ConnectionHandler の実装と同じことをやっている(ServiceServer は ConnectionHandler の継承クラス):

import com.tumblr.colossus._
import protocols.Telnet._

class HelloWorldHandler(config: ServiceConfig, worker: WorkerRef) 
  extends ServiceServer[HttpRequest, HttpResponse](new HttpServerCodec, config, worker) {

  def processRequest(request: HttpRequest): Response[HttpResponse] = {
    req.ok("Hello World!")
  }

  def processFailure(request: HttpRequest, reason: Throwable) = {
    request.error(s"Error: $reason")
  }
}

テストサーバ、メトリクス、タスク

時間がないので下記を読んでください…。個人的には、最初のリリースからテスト関係や計測関係のユーティリティが含まれているのが素晴らしいなと思う。

まとめ

ドキュメントには他にも色々と紹介したい話が書いてあるんだけど、全然時間が足りなかった…。もしこれを読んで興味が湧いたら、読んでみたりコードを触ってみたりしてください。あと、Issue を見てるとまだまだ成熟してない感じなので、逆に言えば貢献のチャンスかも。

ところで、Scala 製のマイクロサービス向けフレームワークといえば、真っ先に Finagle の名前が思い浮かぶ。以前のエントリでも書いたように、TumblrScala を採用するに至った大きな理由の一つに Finagle の存在がある。

では、ここに至って Tumblr が独自のフレームワークの実装を始めたのはなぜか。特に、性能については折り紙付きの Netty の代わりに Java NIO + Akka で実装しているのは何故なのか。

一つは上で説明したように、特定のユースケースにおける性能を追求していく中で Future の”リッチな”スレッドモデルがオーバヘッドになったこと。そしてもう一つは、明示はされていないが、性能を追求して下のレイヤに手を入れていく際に、Netty の低レベル操作を中心とした API や複雑なスレッドモデルと密結合した Finagle が扱いにくかった、ということがあるのではないかと思う。例えば、Colossus のコンセプトとして掲げられている四つの原則の中にこんな項がある:

Keep it transparent - Choose how much magic you want! Colossus is built in several layers which allow you to build high-level abstractions without being walled off from taking full control.

実際、Finagle の Netty 3 依存について、2014 年度中としていた Netty 4 移行計画が 2015 2Q にズレこむなど、Finagle が Netty に対して相当に密結合している様子が見てとれる。

その一方で、大規模並列処理を実現するためのイベント駆動なミドルウェアやアプリケーションを構築する上で、Akka が使われるケースがかなり広がってきている。例えば、今回紹介した Colossus の他にも、ストレステストツールGatling の実装に Akka が使われているのは有名かと思う。

また、Akka 自身も Netty に相当するレイヤを Akka I/O として再実装すると共に、大規模ストリームをうまく扱うための背圧制御 (back-pressure) の仕組みを加えて "Reactive Stream" として標準化したりと、低レイヤについても手を打ってきている*1

結果として、分散並列化したいネットワーク I/O からビジネスロジックまでを、アクターモデルという同じ抽象の上で統一的に扱えるのが Akka というランタイムの強みとなってきており、そのような扱いやすさがミドルウェアやアプリケーションの作者から支持を集めつつあるのだとすれば、Netty 陣営の”苦境”は Akka 陣営にとってはチャンスなのかもしれない。

*1:念の為に書いておくと、Colossus 自身は今のところ Akka I/O は使っていない。ただし、似たような背圧制御の仕組みを独自で持っているので、この部分を移行するという話もあり得なくもなさそうな…?