マイクロサービスのための Tumblr 製フレームワーク "Colossus"

この記事は Scala Advent Calendar 2014 の 15 日目です。昨日は id:qtamaki さんの”「関数プログラミング 珠玉のアルゴリズムデザイン」をScalaで実装してみる”でした。

今日は、先日に TumblrOSS 化を発表した Scala 製のノンブロッキング I/O (NIO) フレームワーク "Colossus" を紹介したい。”高性能なマイクロサービスを構築するためのフレームワークを謳っており、まだ OSS 化されて日が浅いものの Tumblr ではすでに production で使われているとされる。また、Colossus 自体がアクターフレームワーク Akka のアクターとして実装されており、それを使った独自のスレッドモデルを提供している点も興味深い。

基本的なコンセプト

Tumblr が Colossus を開発した狙いについて、公式サイトの冒頭でこのように述べている。

Colossus は、ノンブロッキングネットワーク I/O を必要とする高性能なアプリケーション構築のための Scala 製軽量フレームワークである。Colossus は特に、低レイテンシのステートレスなマイクロサービス(たいていはデータベースやキャッシュを抽象化したものと大きく変わらない)に焦点を当てている。Colossus は、こうしたユースケースにおいて性能を最大化すると共に、インタフェースをクリーンかつ簡潔に保つことを狙っている。

つまり…どういうことだってばよ?

マイクロサービス・アーキテクチャでは、コンポーネント化された軽量サービス同士を連携させてアプリケーションを組み立てる。つまり、多くのマイクロサービスのビジネスロジックはこのようになっているはずだ:

  1. クライアントからコネクションが張られてリクエストが来る。
  2. 他のバックエンドサービス(データベースとか)にリクエストを飛ばしてレスポンスを待つ。
  3. バックエンドサービスから戻ってきたレスポンスを使ってレスポンスを組み立ててクライアントに返す。

Colossus を使って書くとこんな感じ(Colossus のドキュメントより)。

Service.serve[Http]("http-service", 9000){ context =>
  val redis = context.clientFor[Redis]("localhost", 6379)
  context.handle{ connection => 
    connection.become {
      case request @ Get on Root / "get" / key => redis.send(Commands.Get(key)).map{
        case BulkReply(data) => request.ok(data.utf8String)
        case NilReply => request.notFound("(nil)")
      }
      ...
    }
  }
}

このように、非同期計算を抽象化した型である FuturemapflatMap をつないでいくアプローチは、Scala プログラマにとって見慣れたものだ。

ところで、Colossus において redis.send の戻り値は Callback であり Scala 標準ライブラリTwitter 製ライブラリに含まれているような Future ではない。

CallbackFuture と同様のインタフェースを持つ(し、実際に Future に変換することもできる)が、スレッドセーフではない。これは、Callback は単一のワーカスレッド上で実行されることを前提にしているからだ(あと、Callback を呼び出す準備ができた際に execute() メソッドを明示的に呼び出す必要がある。この呼び出しは通常はフレームワーク側がやってくれる)。

これは、冒頭に挙げたようなマイクロサービスのユースケースを前提にするなら、基本的にはスレッド間で状態を共有する必要がないからだ。また、データベース接続などの複数のコネクション間で共有したい状態については、イベントループごとに保持してコネクション間で共有すればよい(一つのイベントループは多数のコネクションを処理するが、シングルスレッドなのでイベントループ内では競合を心配する必要がない。また、全てはノンブロッキングに処理されるので、あるコネクションがデータベースからの応答を待っている間に、後続のコネクションがブロックすることはない)。

Future を使う場合、多くの実装はマルチスレッド動作のために ExecutionContext 内で処理を実行するが、これによりわずかだがオーバヘッドが発生する。このオーバヘッドが問題になるようなリクエストの処理においては、Colossus では代わりにシングルスレッドで動作する Callback を使うことでレイテンシを削減できる。これは、リクエストのサイズが小さくステートレスな場合に特に有効だと言える。

アーキテクチャ

Colossus の Core Layer は Akka 上で動作するアクターであり、例えばイベントループは下記のような”自分にメッセージを送るアクター”として実装されている(実際のコードはこのあたりにある)。

receive message {
  case `Select` => {
    event_handlers = selector.select()
    ...
    self ! `Select`
  }
  ...
}

ドキュメントから読み取れるアーキテクチャを図示するとこんな感じか:

f:id:okapies:20141215011648j:plain

Colossus 全体を駆動するのが IOSystem で、作成時に ActorSystem と関連付けて指定した数の worker(イベントループ)を作成する。なお、例えばアプリケーションが複数の独立したサーバを実行するような場合は、一つの ActorSystem に複数の IOSystem を関連付けてもよい。

implicit val actor_system = ActorSystem("system")
val io_system = IOSystem("io-system", numWorkers = 4)

IOSystem は、コネクションがやってくるとそれを各 Worker に振り分ける。各 Worker はアクターであり、ひとたび割り当てられたコネクションはずっと同じ Worker に束縛され続ける。つまり、Worker 内のイベントループが呼び出すイベントハンドラはシングルスレッドで動作することが保証される。

では、イベントループ間やコネクション間で状態を共有したい場合や、ブロックするような処理を実行したい場合はどうするか? その場合は、アクターと Future を使ってイベントループの外側で処理すればよい。

プログラミングモデル

サーバのビジネスロジックの実装方法は、関数型っぽい書き方とオブジェクト指向っぽい書き方の二通りがある。まずは関数型っぽい方から:

Service.serve[Http]("service-name", 456){context: ServiceContext[Http] => 
  //everything here happens once per event-loop
  context.handle{ connecton: ConnectionContext[Http] => 
    //everything here happens once per connection
    connection.become {
      //partial function HttpRequest => Response[HttpResponse]
      case req => req.ok("Hello world!")
    }
  }
}

この記法は、実際には下記のような ConnectionHandler の実装と同じことをやっている(ServiceServer は ConnectionHandler の継承クラス):

import com.tumblr.colossus._
import protocols.Telnet._

class HelloWorldHandler(config: ServiceConfig, worker: WorkerRef) 
  extends ServiceServer[HttpRequest, HttpResponse](new HttpServerCodec, config, worker) {

  def processRequest(request: HttpRequest): Response[HttpResponse] = {
    req.ok("Hello World!")
  }

  def processFailure(request: HttpRequest, reason: Throwable) = {
    request.error(s"Error: $reason")
  }
}

テストサーバ、メトリクス、タスク

時間がないので下記を読んでください…。個人的には、最初のリリースからテスト関係や計測関係のユーティリティが含まれているのが素晴らしいなと思う。

まとめ

ドキュメントには他にも色々と紹介したい話が書いてあるんだけど、全然時間が足りなかった…。もしこれを読んで興味が湧いたら、読んでみたりコードを触ってみたりしてください。あと、Issue を見てるとまだまだ成熟してない感じなので、逆に言えば貢献のチャンスかも。

ところで、Scala 製のマイクロサービス向けフレームワークといえば、真っ先に Finagle の名前が思い浮かぶ。以前のエントリでも書いたように、TumblrScala を採用するに至った大きな理由の一つに Finagle の存在がある。

では、ここに至って Tumblr が独自のフレームワークの実装を始めたのはなぜか。特に、性能については折り紙付きの Netty の代わりに Java NIO + Akka で実装しているのは何故なのか。

一つは上で説明したように、特定のユースケースにおける性能を追求していく中で Future の”リッチな”スレッドモデルがオーバヘッドになったこと。そしてもう一つは、明示はされていないが、性能を追求して下のレイヤに手を入れていく際に、Netty の低レベル操作を中心とした API や複雑なスレッドモデルと密結合した Finagle が扱いにくかった、ということがあるのではないかと思う。例えば、Colossus のコンセプトとして掲げられている四つの原則の中にこんな項がある:

Keep it transparent - Choose how much magic you want! Colossus is built in several layers which allow you to build high-level abstractions without being walled off from taking full control.

実際、Finagle の Netty 3 依存について、2014 年度中としていた Netty 4 移行計画が 2015 2Q にズレこむなど、Finagle が Netty に対して相当に密結合している様子が見てとれる。

その一方で、大規模並列処理を実現するためのイベント駆動なミドルウェアやアプリケーションを構築する上で、Akka が使われるケースがかなり広がってきている。例えば、今回紹介した Colossus の他にも、ストレステストツールGatling の実装に Akka が使われているのは有名かと思う。

また、Akka 自身も Netty に相当するレイヤを Akka I/O として再実装すると共に、大規模ストリームをうまく扱うための背圧制御 (back-pressure) の仕組みを加えて "Reactive Stream" として標準化したりと、低レイヤについても手を打ってきている*1

結果として、分散並列化したいネットワーク I/O からビジネスロジックまでを、アクターモデルという同じ抽象の上で統一的に扱えるのが Akka というランタイムの強みとなってきており、そのような扱いやすさがミドルウェアやアプリケーションの作者から支持を集めつつあるのだとすれば、Netty 陣営の”苦境”は Akka 陣営にとってはチャンスなのかもしれない。

*1:念の為に書いておくと、Colossus 自身は今のところ Akka I/O は使っていない。ただし、似たような背圧制御の仕組みを独自で持っているので、この部分を移行するという話もあり得なくもなさそうな…?

"The Reactive Manifesto v2.0" 日本語訳

はじめに

いつの間にか "The Reactive Manifesto" のバージョンが上がって v2.0 になっていたので、さっくりと翻訳。従前よりかなりコンパクトになっている。マニフェストに署名したい方は、公式サイトの一番下の "Sign the manifesto" をクリックしてください。

v1.0 の日本語訳は id:kimito_k さんがこちらで公開されています。

追記【2015/03/16】: 公式サイトに掲載されました

追記【2014/12/27】: 公式へ Pull Request してマージしてもらいました。最新版は以下をご覧ください。

v2.0 での変更点

v2.0 になって変わった点についてはこのへんこのへんに記事がある。

リライトの結果として、リアクティブマニフェストが最終的に実現したい価値は”システムの即応性を保ち続けること”であり、そのために”耐障害性””弾力性”という二つの非機能が必要であり、それら三つを下支えするのが”メッセージ駆動”アーキテクチャである、という論理構成が明確になったと思う。

で、v1.0 で書かれていた細かい話はどこに行ったかというと、用語集という形で別ページに集約されている。

あと、特に明示されてないけど、”リアクティブアプリケーション (reactive application)”という呼称が全て取り除かれて”リアクティブシステム (reactive systems)”に置き換わっている。従来からある概念である”リアクティブプログラミング (reactive programming)”と混同しやすい、という批判に配慮した形なのかな。

The Reactive Manifesto v2.0

異なる分野で活動する組織が、同じようなソフトウェア構築のパターンを独立に発見している。このようなシステムはより堅牢で、より耐障害性があり、より柔軟で、より最新の要求を反映しやすくなっている。

こうした変化が起きているのは、近年、アプリケーションの要求が著しく変化してきているからだ。ほんの数年前、巨大アプリケーションは数十のサーバから構成され、数秒の応答時間と数時間のオフラインメンテナンスを許容し、データは数ギガバイトだった。今日のアプリケーションは、モバイル機器から数千のマルチコアプロセッサによって動作するクラウドベースのクラスタまで、あらゆる機器上に配備される。ユーザはミリ秒の応答時間と 100% の稼働率を期待する。データはペタバイト単位で測定される。昨日のソフトウェアアーキテクチャは、今日の要求を全く満たしていない。

求められているのは、システムアーキテクチャに対する明快なアプローチであると我々は考える。そして、必要な側面の全ては既に独立に認識されている: 求めるものは、即応性と、耐障害性と、弾力性と、メッセージ駆動とを備えたシステムだ。我々はこれをリアクティブシステム (Reactive Systems) と呼ぶ。

リアクティブシステムとして構築されたシステムはより柔軟で、疎結合で、スケーラビリティがある。これによって開発が容易になるだけでなく、変更を受け入れやすくなる。これらは障害に対してより著しい耐性を持ち、たとえ障害が起きても災害を起こすことなく優雅に対処する。リアクティブシステムは高い即応性を持ち、ユーザに対して効果的な対話型フィードバックを与える。

リアクティブシステムとは:

  • 即応性 (Responsive): システムは可能な限りすみやかに応答する。即応性とは使い勝手と実用性の基盤だが、しかしそれだけではなく、問題が素早く検出され効果的に対処できることを意味する。即応性のあるシステムは、迅速で、かつ一貫した応答時間を提供することに主眼を置く。システムは応答時間に信頼性のある上限を確立し、一貫した品質のサービスを供給する。この一貫した挙動によってエラー処理が単純化され、エンドユーザの信頼を醸成し、さらなる相互作用を促す。
  • 耐障害性 (Resilient): システムは障害に直面しても即応性を保ち続ける。これが当てはまるのは高可用性のミッションクリティカルシステムだけではない — 耐障害性を持たないシステムは障害が起きると即応性を失う。耐障害性は、レプリケーション、封じ込め、隔離、そして委譲によって実現される。障害はそれぞれのコンポーネントに封じ込められ、コンポーネントは互いに隔離されるので、システムが部分的に故障してもシステム全体を危険に晒すことなしに回復することが保証される。各々のコンポーネントの回復処理は(外部の)他のコンポーネントに委譲され、また必要な場合はレプリケーションによって高可用性を保証する。コンポーネントのクライアントはコンポーネントの障害への対処に苦しめられることがなくなる。
  • 弾力性 (Elastic): システムはワークロードが変動しても即応性を保ち続ける。リアクティブシステムは、入力の提供に割り当てるリソースを増加あるいは減少させることで入力量の変化に反応する。これは、システムの中に競合する場所や中心的なボトルネックが存在しないように設計し、シャーディングしたりレプリケーションしたコンポーネント間に入力を分散させることを意味する。リアクティブシステムは関連するライブな性能測定を提供することで、予測的かつリアクティブなスケーリングアルゴリズムをサポートする。これらは、コモディティなハードウェアとソフトウェアプラットフォーム上で費用対効果の高い弾力性を実現する。
  • メッセージ駆動 (Message Driven): リアクティブシステムは非同期メッセージパッシングに依ってコンポーネント間の境界を確立する。これによって、疎結合性、隔離性、位置透過性を保証すると共に、エラーをメッセージとして委譲する手段を確保する。明示的なメッセージパッシングは負荷の管理と弾力性を可能とする。また、システム内にメッセージキューを作成して監視し、必要ならバックプレッシャーを適用することでフロー制御が可能になる。通信の手段として位置透過なメッセージングを使うことで、通信がクラスタを跨ぐ場合も単一のホスト内の場合も、同じ構成とセマンティクスで障害を管理できる。ノンブロッキング通信により、受信側はアクティブ時のみリソースを消費できるのでシステムのオーバヘッドを抑制できる。

大きなシステムはより小さなシステムからできているので、故にそれらの構成要素のリアクティブな性質に依存する。リアクティブシステムは設計原則を適用してリアクティブな性質をあらゆる規模で適用し、それら構成要素を合成できるようにする。世界最大級のシステムは、これらの性質に基づくアーキテクチャに依存することで数十億の人々のニーズを日々満たしている。こうした設計原則をその都度再発見するのをやめて、最初から自覚的に適用する時だ。

マイクロサービスが Scala を選ぶ3つの理由

 今年も開催される Scala Advent Calendar 2014 の 15 日目にエントリーしていて、ネタとしては先日 Tumblr が発表した "I/O and Microservice library for Scala" を謳う Colossus をやる予定なんだけど、前振りとして「なぜマイクロサービス化を進めるサービスは Scala を選ぶのか」という話をしてみるエントリ。ちなみに、Advent Calendar の前振りと書いたけど、とりあえず Scala をあまり知らない人向け。



そもそもマイクロサービスって何だっけ?

この記事とかよくまとまってると思います。

マイクロサービスへの移行と Scala

 成功を収めたウェブサービスが、自身のビジネスを持続的にスケールさせるため、巨大化・複雑化したモノリシックなアプリケーションを解体し、単機能のコンポーネントであるマイクロサービスからなるサービス指向アーキテクチャ (SOA) へと移行する…。近年、こんなストーリーを耳にする機会が多い。

 この際、ランタイムをスクリプト言語から JVM へ置き換えたり、特に使用言語として Scala を採用するケースが目立つ。以下は一例だが、名だたる有名サービスが Scala を使ったマイクロサービス化に取り組んでいることがわかる:

なぜ Scala が選ばれるのか?

 なぜ、マイクロサービス化で Scala が選ばれるのか? Scala 移行の事例を見ると、次の三つのポイントが指摘されることが多い。

  1. JVM 言語である
  2. 非同期 RPC フレームワーク "Finagle" の存在
  3. 静的型付き言語である

 以下に一つずつ見て行こう。

1. JVM 言語である

We were enamored by the level of performance that the JVM gave us. It wasn’t going to be easy to get our performance, reliability, and efficiency goals out of the Ruby VM, so we embarked on writing code to be run on the JVM instead. We estimated that rewriting our codebase could get us > 10x performance improvement, on the same hardware –– and now, today, we push on the order of 10 - 20K requests / sec / host.

New Tweets per second record, and how!

 近年、様々なサービスが、自社サービスの再構築にあたって、大規模なトラフィックに耐える性能・信頼性・効率性の要件を達成するために Java VM (JVM) を選んだと証言している。JVM は、過去 10 年以上にも渡ってサンやオラクル等によって大きな開発リソースが投じられてきたこともあって、高い性能と安定性を実現している。

 つまり、JVM 言語として作られた Scala は、当然その恩恵を受けることができる。

 また I/O 性能についても、JVMVM 内のプログラムから OS の低レベル I/O を直接叩ける NIO (Non-blocking I/O) API を備えている。よって、クライアント−サービス間に加えて、内部サービス間の I/O 性能が特に重要となるマイクロサービス・アーキテクチャとも相性が良い。

 また、採用面で技術者の確保が容易である点もしばしば挙げられる。

Changed to a JVM centric approach for hiring and speed of development reasons.

Tumblr Architecture - 15 Billion Page Views a Month and Harder to Scale than Twitter - High Scalability -

 高負荷環境での JVM の運用は GC(ガベージ・コレクション)との戦いになると言われるが、他の言語ランタイムに比べて、そのあたりのチューニングのノウハウを持った人材を確保しやすいということもあるのかもしれない。

2. Finagle の存在

Finagle was a compelling factor in choosing Scala. It is a library from Twitter. It handles most of the distributed issues like distributed tracing, service discovery, and service registration. You don’t have to implement all this stuff. It just comes for free.

Tumblr Architecture - 15 Billion Page Views a Month and Harder to Scale than Twitter - High Scalability -

 Scala 採用の理由として、Twitter が開発した Scala 製非同期 RPC フレームワーク "Finagle" の存在を挙げるサービスは多い。Finagle の興味深い点はたくさんあるが、マイクロサービスのためのフレームワーク、という観点から言うと以下の三つ(性能、プログラミングモデル、運用ツールとの連携)が挙げられる。

性能

 Finagle は、先に挙げた NIO フレームワークの定番である NettyScala ラッパーであり、高い性能を誇る。また、Twitter 自身が Netty の開発に大きくコミットしており、Finagle の開発とも密接に連携している。

 Finagle の高性能は、”バルス祭り”の大規模トラフィックにも耐えうるシステムの構築に大きく貢献した。

Our new stack has enabled us to reach new records in throughput and as of this writing our record tweets per second is 143,199.

Netty at Twitter with Finagle

プログラミングモデル

 マイクロサービス・アーキテクチャでは、各サービスの実装にあたって、必然的に他の内部サービスに対する非同期 RPC のコーディングが必要になるが、このスタイルには厄介な点がいくつもある。

  • ある RPC の結果を使って次の RPC をリクエストするような逐次処理や、並列にリクエストした複数の RPC の結果が揃うのを待って集約するような並列処理を書こうとすると、どうしてもコードが煩雑になる。
  • リクエストした処理が長時間戻ってこない場合、スレッドをブロックしないようにプログラムする必要がある。
  • あらゆるリモートへのリクエストは、想定外の理由で失敗する可能性がある(ネットワーク障害、リモートホストの障害、等々)。したがって、エラー処理やリトライ、タイムアウト等を考慮したコーディングが必要になる。

 Finagle は、非同期 RPC を FutureService/Filter というインタフェースで抽象化する。これらの API は、オブジェクト指向言語であると同時に関数型言語である Scala の特徴をよく活かしたものになっている。

 FutureService、そして Filter を組み合わせると、例えば、以下のようにマイクロサービスを組み合わせてサービスを作る際に、逐次処理と並列処理が絡み合った複雑な非同期 RPC 処理をクリーンかつシンプル、そして安全に記述できる(引用元)。

  1. 認証サービス (AuthService) に問い合わせて、ユーザ認証を行う
  2. タイムラインサービス(TimelineService) に問い合わせて、指定したユーザの Tweet ID の一覧を受け取る
  3. ツイートサービス (TweetService) に各 ID に対応するツイート本文を並列に問い合わせて、全ての結果が戻ってきたら集約してクライアントに返す
val timelineSvc = Thrift.newIface[TimelineService](...) // #1
val tweetSvc = Thrift.newIface[TweetService](...)
val authSvc = Thrift.newIface[AuthService](...)
  
val authFilter = Filter.mk[Req, AuthReq, Res, Res] { (req, svc) => // #2
  authSvc.authenticate(req) flatMap svc(_)
}
  
val apiService = Service.mk[AuthReq, Res] { req =>
  timelineSvc(req.userId) flatMap { tl =>
    val tweets = tl map tweetSvc.getById(_)
    Future.collect(tweets) map tweetsToJson(_)
  }
} //#3
Http.serve(":80", authFilter andThen apiService) // #4
  
// #1 サービスごとのクライアントを作成する
// #2 入ってくるリクエストを認証する Filter を作成する
// #3 認証されたタイムラインリクエストを JSON に変換して返す service を作成する
// #4 認証 filter と service を使って 80 番ポートで動作する HTTP サーバを開始する

 ここではエラー処理が明示的に書かれていないが、上記の処理のいずれかが失敗した段階(認証が失敗した場合とか、タイムラインサービスへの問い合わせがタイムアウトした場合とか)で処理全体が失敗するようになっている。もちろん、明示的に復旧処理を書くこともできる。

 Service の実装は一般的な HTTP (REST) だけではなく Thrift も使える*1し、MemcachedMySQL、あるいは Redis などのデータベース向けプロトコルも用意されている。

 このように様々なプロトコルService として抽象化しているため、アプリケーション非依存な機能である Filter を様々なプロトコルに対して直交的に適用できる。Filter で追加できる機能には、認証やリトライ、そして後述するモニタリングやトレースといったものがある。

運用ツールとの連携

 RPC ベースの分散システムへ移行する際の厄介事は、運用・監視の面でもたくさんある。例えば、以下のうちいずれを欠いても、多数のノードで構成される複雑なマイクロサービス群の運用やデバッグは難しくなる。

 また、個々の開発者が、担当するマイクロサービスに対してこれらのツールを簡単に組み込める必要がある。

 Finagle には、ZooKeeper や分散トレースシステムの Zipkin 等と連携する機能が最初から組み込まれている。つまり、必要な設定を追加してやるだけで、ZooKeeper を使ってクラスタを組み、リクエスト数などのメトリクスをリアルタイムに監視し、各ノードでリクエストの処理にかかった時間を Zipkin へ自動的に集約して可視化したりできる。

3. 静的型付き言語である

運用が必要なシステムで1万、2万行越えだすと、静的型付けであることによる保守性の高さは、結果的にコンパイル時間等を払拭できるほどの安全性、生産性、心の平安を生むと思っていて、要は静的型付けである事は非常に価値があって、特にテストが難しいテンプレート(twirl)の静的型付けは素晴しいという事を言いたかった。

ScalaMatsuri 2014 で「国技と Scala」というタイトルで発表しました - sandbox

 マイクロサービス化の究極的な目的は、ビジネス要求の変化に合わせて継続的にサービスを更新できる体制を作ることにある。つまり、Scala の静的型付け (static typing) が提供する保守性(≒安心してコードを書き換えられる性質)は、常に変化を必要とするようなシステムでこそ大きなメリットがある。

余談

 後日に紹介予定の Colossus は、TumblrFinagle に相当するものを自分たちの要件に合わせて作ったものだと考えると良さそう。

 また、Finagle が Netty ベースであるのに対し、Colossus は Scala 言語の開発元が作っている Akkaアクターモデルの分散フレームワーク)ベースで、さらに Akka 自身が Netty に相当するレイヤ (Akka I/O) の再実装を進めていたりと、そうした「フレームワーク同士の競争」という野次馬的な面でも興味深い。

*1:Twitter の内部サービス同士のやりとりは主に Thrift を使っているようだ。

Future/Promise はいつモナドになったのか

「非同期計算をモナドで合成し、依存関係に従ってパイプライン化する」というアイデアはいつ誰が提案したのか、というのを調べてみたけどよく分からなかった記録。網羅的な調べ方はしてないので、何か知ってる人がいたら教えてください。

明示的 vs. 暗黙的

id:xuwei さんに教えて頂いた Wikipedia の記事によると「まだ完了していない計算結果へのプロキシオブジェクト」というコンセプトが FuturePromise と名付けられたのは 1976〜1977 年頃らしい。

1976 年に出た Daniel P. Friedman と David Wise の論文や Peter Hibbard の論文で言及されていた Promise(あるいは Eventual)は明示的 (explicit) に使うものだった。つまり、Java の(Completable じゃない方の)Future のように、promise から値を取り出すのに get のようなメソッドを呼ぶ必要があるということ。

一方で、アクターモデルの研究者である Henry Baker と Carl Hewitt による論文(1977 年)が言及している Future は、そのまま普通の参照のように使える暗黙的 (implicit) のものとされていた。

Promise Pipelining

次に、Promise 同士をつなぎあわせてパイプライン化しましょうというアイデアの登場は、Promise の発明から約 10 年後、1988 年の Barbara Liskov と Liuba Shrira の論文を待つことになる。また、同様のアイデアは、一部のアレゲな紳士諸君に著名な Xanadu プロジェクトの一環として Mark S. Miller*1 や Dean Tribble らからも 1989 年に提出されている。

ただ、これらの Promise Pipelining に関する論文の主眼は、同一マシン上に配置した Promise 間のネットワーク通信を削減したりして性能を向上させることなので、「非同期計算の合成」という点に限れば、もっと以前からアイデアはあったのかもしれない。

また、当時、これらの論文のまともな形の実装が世に出ることは無かったらしい。Wikipedia では、Promise Pipelining を実装した処理系として、後に Miller らが 1997 年に発表した E 言語 や、同様に Tribble らが 1996 年に発表した Joule 言語が挙げられている。

ちなみに、E 言語での記法はこんな感じ(x <- a() は「x にメッセージ a() を送る」と読ます):

t3 := (x <- a()) <- c(y <- b())

これを展開するとこうなる:

t1 := x <- a();
t2 := y <- b();
t3 := t1 <- c(t2);

どことなく Reactive Programming っぽい…?

JavaScript の Deferred

「Deferred に A => Deferred[B] を受け取って Deferred[B] を返すメソッド then() を持たせましょう」みたいなのは JavaScript ではかなり以前からあり、CommonJS では、先行例として MochiKitDojo's Deferred挙げている。少なくとも 2006 年頃にはあった模様。Twisted の Deferred は「明示的」なスタイルなのでちょっと違うかな。

最近は、Promises/A+ 仕様として標準化されて Thenable と呼ばれている。

Future/Promise モナド

結論から言うと、少なくとも Scala の実装においては、「Future/Promise はモナドである」というようなことを書いてる論文等があったり、それを参照して実装したりした、というわけではなさそう。

まず、Akka において公開レポジトリで辿れる最古の Future 実装を見ると完全に「明示的」なスタイルで、mapflatMap も見当たらない。

sealed trait Future[T] {
  def await : Future[T]
  def awaitBlocking : Future[T]
  def isCompleted: Boolean
  def isExpired: Boolean
  def timeoutInNanos: Long
  def result: Option[T]
  def exception: Option[Throwable]
}

その後 map追加されたりしたけど、ちゃんとモナドであることを意識した実装になったのはこの辺(2011 年 2 月)。下記のディスカッションによると、提案者の当初のモチベーションとしては、Akka に Scalaz の(ような?)型クラスを導入したいということだったらしい。

ちなみに、Twitter-Util の Future は、2010 年 8 月の公開当初からモナディックになってる。Marius さんは、Twitter の非同期処理系の API について Concurrent ML を参照しているとか言ってたので、あるいはそっち方面からアイデアを得たのかなぁ…?

追記: Scalaz の昔のコードを調べたらもっと古い実装があった(2009 年 5 月)。Scala 界隈だとこれが最古? どちらにせよ、リファクタリングの一環で自然に入ってきた的なノリを感じる。

まとめ

Haskell 方面の歴史を知ってれば一発なのかもしれないけど、Haskell の Future/Promise モナドに相当する型クラスってどれなんですかね…。Continuation モナド

追記

この Par モナドに成功/失敗の文脈を組み合わせると近いのかな。

追記2

*1:Wikipedia によると、今は Google のひとで、ECMAScript の仕様策定をやってる TC39 のメンバーでもあるらしい。

Java/Scala で風景から歩行者を消してみる

一昨日くらいにホッテントリ入りしてた記事↓を見て、

Export["result.jpg",
 Image[Mean[Map[ImageData,
    Import["movie.mov", "ImageList"]]]]]

このくらいのコードで済むなら Java/Scala でもすぐに書けるかも? と思ってやってみた。

理想

ヤッター、こんなに簡単にできたよー^^

import opencv._                               // ← ん?
System.loadLibrary(Core.NATIVE_LIBRARY_NAME)  // ← んんん???
saveImage("result.jpg", loadVideo("movie.mov")(mean))

現実

Isolator requires Java bindings for OpenCV. 

$ curl -OL https://github.com/Itseez/opencv/archive/2.4.9.zip -o opencv-2.4.9.zip
$ unzip opencv-2.4.9.zip
$ cd opencv-2.4.9
$ mkdir build
$ cd build/
$ cmake -DBUILD_SHARED_LIBS=OFF ..
$ make -j8
import org.opencv.core.{Core, CvType, Mat, Scalar, Size}
import org.opencv.highgui.{Highgui, VideoCapture}

package object opencv {
  ...
  def mean(frames: Iterator[Mat]): Mat =
    if (frames.hasNext) {
      val head = convertTo(CvType.CV_64FC3)(frames.next)
      val (count, out) =
        frames.
          map(convertTo(CvType.CV_64FC3)).
          foldLeft((1, head)) { case ((cnt, sum), f) => (cnt + 1, add(sum, f)) }

      divide(out, new Scalar(count, count, count))
    } else {
      new Mat
    }

  def loadVideo[A](filename: String)(f: Iterator[Mat] => A): A = {
    val cap = new VideoCapture(filename)
    try {
      f(Iterator.continually(nextFrame(cap)).takeWhile(_ != None).map(_.get))
    } finally {
      cap.release()
    }
  }
  ...
  def saveImage(filename: String, m: Mat) = Highgui.imwrite(filename, m)
}

^^;;;

考察

https://raw.githubusercontent.com/okapies/isolator/master/examples/mean-opencv.jpg https://raw.githubusercontent.com/okapies/isolator/master/examples/mean-opencv.jpg

Mathematica、画像や動画の読み書きをネイティブサポートしているのはホントにいいなぁ、という感想。

なんか、動画を扱える Pure Java の成熟したソリューションって未だにあんまりないらしくて、したがって OpenCVJava から叩くのが一番堅実な選択肢ということになり、その道の先には jar をソースからビルドする楽しい作業が待ってたり、コーディング時もリソースの明示的な解放をサボると一瞬でメモリが爆裂したりと、色々めんどくさい…。

コード

今回書いたコードは GitHub に置いておきますので、世紀末ごっこして遊ぶなり改造するなりどうぞ。

あと、mean のアルゴリズムはもう少しマシな方法があるように思うので、OpenCV に詳しい方、どなたかご教示くださいませ…。

追記

@colspan一晩でやってくれました

非同期ストリーム処理の標準化を目指す "Reactive Streams" とは

TL でこんなのが流れてたので少し調べてみた。

Reactive Streams って?

JVM 上でのノンブロッキングなバックプレッシャーを持つ非同期ストリーム処理の標準の提案”(公式サイトより)。

ざっくり言うと、既にある JVM ベースの様々な非同期ストリーム処理フレームワーク実装の共通部分を括りだして API 化、SPI 化しようというもの。最終的には JSR での標準化を目指している。

ここで言う”非同期ストリーム処理”とは、(広義の)リアルタイム性が求められるデータ処理中心のアプリケーション、より具体的にはビデオストリーミングや数百万ユーザのトランザクション処理など。

下のインタビュー記事では、Reactive Streams を策定する理由として、API 策定による相互運用性の向上と共に、バックプレッシャー (back pressure) による流量制御の必要性が挙げられている。

以上から分かるように、この "API" はユーザが直接使うものではなく、各フレームワークが、この API を使ってエンドユーザ向けの DSL を提供するためのもの。これにより、異なるフレームワーク間の相互運用性を担保できる。逆に言うと、ユーザがデータ変換や分割・結合を記述する方法は守備範囲ではない

なお、この仕様が出てきた背景としては、去年の秋くらいから "Reactive Manifesto"日本語訳)というキャンペーンが始まっていて、Reactive Streams はその流れに連なっていると思われる。

どんな仕様なの?

概要がこの辺に、より詳細な仕様がこの辺に書かれている。

現状、SPI コンポーネントの構成要素は以下の三つ。普通の Pub/Sub だけど、バックプレッシャーを伝える方法が定義されているのが特徴?

  • Publisher
    • Subscriber からの要求に応じて、潜在的に無制個の順序付けされた要素を提供する。Publisher は複数の Subscriber に配信することができ、処理率に応じて配分する。
    • これ以上、要素を提供できないときは Subscriber の onComplete メソッドを呼ぶ。
  • Subscriber
    • 一つ以上の Publisher を購読して、順序付けされた要素ストリームを受け取る。Producer が Subscriber に要素を渡すときは onNext コールバックを呼ぶ。Subscriber は、Producer をブロックせずに非同期処理するかキューイングする必要がある。
  • Subscription
    • Subscriber から Producer へ要求を伝えるときに使う。Subscriber が Subscription の requestMore(int) メソッドを呼ぶと、Publisher は 制限時間 T が経つ前に最高 N 回まで onNext メソッドを呼び出せる。

API レベルでは、上記の Publisher に対応する型として Producer が、Subscriber に対応するものとして Consumer が定義される。また、両者を組み合わせて入力と出力の両方を行う Processor も提供される。

【追記】組み合わせるとこんな感じ?

誰が関わってるの?

Scala 言語Akka フレームワークの開発をしている Typesafe 社が中心となって仕様策定を進めている。Akka 開発者による解説記事はこちら

それ以外にも、以下のような人々やプロダクトが関わっている。

JVM エコシステムの各方面で実績のある人物・プロダクトが、ズラッと一堂に会しているのが印象的。特に、Oracle の人や Doug Lea 氏が関わっている辺りを見ても、標準化に対する本気が伺える。(元)Microsoft の人が一枚噛んでるのも興味深い。

批判とか

Reactive Streams の”思想的根拠”である Reactive Manifesto に対する批判。たしかに、”そもそも "Reactive" って何だよ” というのがよく分からない感じはある。

ただ、Akka というプロダクトは明らかに Erlang/OTP に対するリスペクトから出てきたものだし、本人達もそれを隠しているわけではない(例えば、Akka の公式サイトの名前は "Let it crash")。アクターモデルを広めたいという意図こそあれ、Erlang の成果を横取りしようとしている、というのは考え過ぎじゃないかなぁと。【追記】: @pokarim さんから「(引用した発言に)そのような意図はない」とご指摘を受けたので訂正します。失礼しました。

まとめ

非同期ストリーム処理フレームワークは乱立気味、というか思い切り乱立しているので、使う側からすると「標準化してくれるのは助かるなぁ」という感想。

とりあえず、これから出てくる実装を開発者が各々の立場から調べて、フレームワークの相互運用性や流量制御といったフィーチャーが、自分たちのユースケースに対してどの程度メリットがあるのか判断していけば良いのではないかと思う。

あと、そういえば Storm に声は掛かってないんだろうか? 明らかにこのモデルに乗っけられそうだけど。

Hadoop/Storm の統合を実現する Twitter の SummingBird

TwitterSummingBird を正式リリースして早二ヶ月。「日本語の紹介記事がほとんど出てないな」と気付いたので、調査がてらまとめてみました。

SummingBird とは?

MapReduce なプログラムを書くための Scala/Java ライブラリ。最大の特徴は、ひとたび SummingBird で書いたジョブは Hadoop でも Storm でも同じように実行できること。

SummingBird では、Hadoop を使う「バッチモード」と、Storm を使う「リアルタイムモード」に加えて、二つを同時に実行する「ハイブリッドモード」がある。ハイブリッドモードでは、ジョブの作者が特に配慮しなくても、バッチとリアルタイムの処理結果を自動的にマージできる

ハイブリッドモードでは、同じジョブを Hadoop と Storm で同時に実行できるので、Hadoop の耐障害性と Storm のリアルタイム性を両立できる。

例えば、Storm からリアルタイムな結果を得ながら、仮に Storm がコケた場合でも、あとから Hadoop が同じデータを処理して結果を復旧できる。また、Storm の苦手な処理(数ヶ月分の過去データを再計算したり、データベースにランダム書き込みしたり)は Hadoop で実行するといったことができる。

ちなみに、こうしたバッチ処理とリアルタイム処理を組み合わせる考え方は、Storm の作者である Nathan Marz (@nathanmarz) 氏が提唱する "Lambda Architecture" に基づいている。このへんにご本人による解説記事があるので、興味がある人はどうぞ。

ロジックの書き方

SummingBird の MapReduce DSL は、基本的には Scala 標準のコレクションライブラリ API に準拠している。例えば、単語の数え上げ (word count) を、普通の Scala コレクションの変換処理として書くとこうなる。

def wordCount(source: Iterable[String], store: MutableMap[String, Long]) =
  source.flatMap { sentence =>
    toWords(sentence).map(_ -> 1L)
  }.foreach { case (k, v) => store.update(k, store.get(k) + v) }

これを SummingBird で書くと以下のようになる。つまり、(1) source から入力される文 (sentence) を toWords で単語に分解して、(2) 各単語を ([word], 1L) のタプルに変換し、(3) sumByKey でキーごとに集約して、store の値に加算したあとに保存する。

def wordCount[P <: Platform[P]]
  (source: Producer[P, String], store: P#Store[String, Long]) =
    source.flatMap { sentence =>
      toWords(sentence).map(_ -> 1L) // (1) + (2)
    }.sumByKey(store) // (3)

一見、なじみ深い SQL ライクな DSL とは雰囲気が違うけど、これも SQL と同様に集合演算の考え方に基づいているので、概念的には大きな差はない。

データストリームとプラットフォームの抽象化

SummingBird の核となるコンセプトは ProducerPlatform だ。

Producerデータストリームを抽象化したものmap, filter, flatMap, merge, leftJoin などのメソッドが定義されており、これらを使ってデータストリームに対する処理を宣言的に記述する。

trait Producer[P <: Platform[P], +T] {
  def merge[U >: T](r: Producer[P, U]): Producer[P, U] = ...
  def collect[U](fn: PartialFunction[T,U]): Producer[P, U] = ...
  def filter(fn: T => Boolean): Producer[P, T] = ...
  def lookup[U >: T, V](service: P#Service[U, V]): KeyedProducer[P, U, Option[V]] = ...
  def map[U](fn: T => U): Producer[P, U] = ...
  def flatMap[U](fn: T => TraversableOnce[U]): Producer[P, U] = ...
  ...
}

なお、Producer のメソッドを呼び出しただけでは何の処理も走らない点に注意。PlatformインスタンスProducer を渡して初めて、データストリームに対する処理が実行される。

Platform は、Hadoop や Storm のようなストリーミング MapReduce ライブラリを抽象化したものPlatformの各インスタンスは、Producerで記述した処理を、特定の MapReduce 処理系で実行する方法を実装している。

trait Platform[P <: Platform[P]] {
  type Source[+T]
  type Store[-K, V]
  type Sink[-T]
  type Service[-K, V]
  type Plan[T]

  def plan[T](completed: Producer[P, T]): Plan[T]
}

例えば、Storm の実装はこうなる。

  type Source[+T]: Spout[(Long, T)]
  type Store[-K, V]: StormStore[K, V]
  type Sink[-T]: (T => Future[Unit])
  type Service[-K, V]: StormService[K, V]
  type Plan[T]: StormTopology

Platform が抽象化されているということは、つまり、SummingBird は Hadoop や Storm 以外のフレームワークにも適用できるということ。実際に、今後のバージョンで SparkAkka などのサポートが予定されている。

関連プロジェクト

SummingBird 自身は一万行程度のライブラリだが、多くの関連プロジェクトの成果の上に成り立っている。いずれも Twitter 自身が開発して OSS 化しており、SummingBird とは独立に利用できる。

  • Algebird: Scala 向けの抽象代数学ライブラリ(後述)。元々は Scala 向け Hadoop ラッパー Scalding の一部だった。
  • Bijection: 異なる型のオブジェクトを相互に変換するライブラリ。型クラスによる拡張をサポートし、異なるプラットフォームやクライアント上でシリアライズ方式を共有できる。
  • Chill: シリアライズライブラリ Kryo のラッパー。Storm、Scala、Hadoop 向けのモジュールを提供しているほか、Spark でも使われている。
  • Tormenta: Storm の Scala 拡張で、Spout に対して型安全性やマッピング、フィルタリング等の機能を提供する。
  • Storehaus: キーバリューストアに対する非同期クライアントを実装するためのライブラリ。(SummingBird における)Storm のリアルタイム集計処理は Storehaus の MergeableStore トレイトを使っており、Memcached や Redis などに対応している。

モノイドと並列処理

最初の紹介で「バッチとリアルタイムの処理結果を自動的にマージできる」と書いたが、この特性を支えるのが、上で紹介した Algebird が提供するモノイド半群の実装だ。

モノイドは、我々が日常的に使う「足し算」や「掛け算」の概念を抽象化したもので、結合律(と単位律)を満たす「足し算のようなもの」を総称する概念だ。「のようなもの」というのは、足されるのは「数」とは限らないから。身近な例では「リストの結合」もモノイドだ。

いきなり「結合律」と言われても何のこっちゃという感じだが、例えば、

a1 + a2 + a3

という足し算があったとしよう。これを (a1 + a2) + a3 という順番で足しても a1 + (a2 + a3) という順番で足しても、結果は変わらない。これが結合律。

つまり、この性質を満たす演算はどこからどの順番で足し合わせても同じ結果になるので、並列処理と相性がよい。また、その結果として、オンライン処理の結果とオフライン処理の結果も簡単にマージできる。

Algebird は、データ分析用途に使えるモノイドや半群の実装を多数提供している。例えば、HyperLogLogBloomFilter のようなアルゴリズムを SummingBird でそのまま利用できる。

trait Monoid[V] {
  def zero: V
  def plus(l: V, r: V): V
}

ユーザが自分のモノイドを実装することもできる。やることは Monoidzeroplus を実装して、あらゆる値で結合律を満たすことを確認するテストを書くだけ。

assertEquals(m.plus(a, m.plus(b,c)), m.plus(m.plus(a,b), c)

【11/15追記】 モノイドの自作については、yukitos さんが翻訳されている "Understanding monoids" シリーズがとても参考になります。

まとめ(というか余談)

この半年ほど、色々な Web 企業が公開している OSS プロダクトのソースコードを調べる機会があったのですが、個人的には、エンジニアリングの水準は Twitter が頭一つ抜けているなぁという印象を持っています。

Twitter 製 OSS プロダクトのレベルの高さを感じるのは、コードがよく整理されていて再利用性が高いのが一つ。もう一つは、アカデミックな知見の応用に積極的に取り組んでいる点です。

それも、単に学問的な純粋さを追求するのではなく、抽象化のレベルが適切で、理論と実践のバランスが注意深くはかられています。Tumblr や LinkedIn、Pinterest などの企業で Twitter 製プロダクトが続々と採用されているのも、開発生産性に加えて性能面での優秀さも一因でしょう。

あと、いち Scala ファンとしては、データ分析分野での Scala 活用事例としても注目しています。

なぜ SummingBird は、従来の SQL ライクな記法ではなく Scala DSL を採用したのか。その理由は今のところ特に語られていませんが、型安全な API や、型クラスによる拡張性の担保、あるいは非同期処理の実装を容易にする Future の活用など、SummingBird では Scala の機能がうまく活かされていると思います。

Scala 製分散データ処理フレームワークの代表格である Spark の方も、Databricks 創業や Cloudera との提携 といった明るいニュースが続いていますし、「データ分析用言語としての Scala」が盛り上がってきている感があります。この調子で、今後の躍進を期待したいところです。

参考文献