読者です 読者をやめる 読者になる 読者になる

Hadoop/Storm の統合を実現する Twitter の SummingBird

TwitterSummingBird を正式リリースして早二ヶ月。「日本語の紹介記事がほとんど出てないな」と気付いたので、調査がてらまとめてみました。

SummingBird とは?

MapReduce なプログラムを書くための Scala/Java ライブラリ。最大の特徴は、ひとたび SummingBird で書いたジョブは Hadoop でも Storm でも同じように実行できること。

SummingBird では、Hadoop を使う「バッチモード」と、Storm を使う「リアルタイムモード」に加えて、二つを同時に実行する「ハイブリッドモード」がある。ハイブリッドモードでは、ジョブの作者が特に配慮しなくても、バッチとリアルタイムの処理結果を自動的にマージできる

ハイブリッドモードでは、同じジョブを Hadoop と Storm で同時に実行できるので、Hadoop の耐障害性と Storm のリアルタイム性を両立できる。

例えば、Storm からリアルタイムな結果を得ながら、仮に Storm がコケた場合でも、あとから Hadoop が同じデータを処理して結果を復旧できる。また、Storm の苦手な処理(数ヶ月分の過去データを再計算したり、データベースにランダム書き込みしたり)は Hadoop で実行するといったことができる。

ちなみに、こうしたバッチ処理とリアルタイム処理を組み合わせる考え方は、Storm の作者である Nathan Marz (@nathanmarz) 氏が提唱する "Lambda Architecture" に基づいている。このへんにご本人による解説記事があるので、興味がある人はどうぞ。

ロジックの書き方

SummingBird の MapReduce DSL は、基本的には Scala 標準のコレクションライブラリ API に準拠している。例えば、単語の数え上げ (word count) を、普通の Scala コレクションの変換処理として書くとこうなる。

def wordCount(source: Iterable[String], store: MutableMap[String, Long]) =
  source.flatMap { sentence =>
    toWords(sentence).map(_ -> 1L)
  }.foreach { case (k, v) => store.update(k, store.get(k) + v) }

これを SummingBird で書くと以下のようになる。つまり、(1) source から入力される文 (sentence) を toWords で単語に分解して、(2) 各単語を ([word], 1L) のタプルに変換し、(3) sumByKey でキーごとに集約して、store の値に加算したあとに保存する。

def wordCount[P <: Platform[P]]
  (source: Producer[P, String], store: P#Store[String, Long]) =
    source.flatMap { sentence =>
      toWords(sentence).map(_ -> 1L) // (1) + (2)
    }.sumByKey(store) // (3)

一見、なじみ深い SQL ライクな DSL とは雰囲気が違うけど、これも SQL と同様に集合演算の考え方に基づいているので、概念的には大きな差はない。

データストリームとプラットフォームの抽象化

SummingBird の核となるコンセプトは ProducerPlatform だ。

Producerデータストリームを抽象化したものmap, filter, flatMap, merge, leftJoin などのメソッドが定義されており、これらを使ってデータストリームに対する処理を宣言的に記述する。

trait Producer[P <: Platform[P], +T] {
  def merge[U >: T](r: Producer[P, U]): Producer[P, U] = ...
  def collect[U](fn: PartialFunction[T,U]): Producer[P, U] = ...
  def filter(fn: T => Boolean): Producer[P, T] = ...
  def lookup[U >: T, V](service: P#Service[U, V]): KeyedProducer[P, U, Option[V]] = ...
  def map[U](fn: T => U): Producer[P, U] = ...
  def flatMap[U](fn: T => TraversableOnce[U]): Producer[P, U] = ...
  ...
}

なお、Producer のメソッドを呼び出しただけでは何の処理も走らない点に注意。PlatformインスタンスProducer を渡して初めて、データストリームに対する処理が実行される。

Platform は、Hadoop や Storm のようなストリーミング MapReduce ライブラリを抽象化したものPlatformの各インスタンスは、Producerで記述した処理を、特定の MapReduce 処理系で実行する方法を実装している。

trait Platform[P <: Platform[P]] {
  type Source[+T]
  type Store[-K, V]
  type Sink[-T]
  type Service[-K, V]
  type Plan[T]

  def plan[T](completed: Producer[P, T]): Plan[T]
}

例えば、Storm の実装はこうなる。

  type Source[+T]: Spout[(Long, T)]
  type Store[-K, V]: StormStore[K, V]
  type Sink[-T]: (T => Future[Unit])
  type Service[-K, V]: StormService[K, V]
  type Plan[T]: StormTopology

Platform が抽象化されているということは、つまり、SummingBird は Hadoop や Storm 以外のフレームワークにも適用できるということ。実際に、今後のバージョンで SparkAkka などのサポートが予定されている。

関連プロジェクト

SummingBird 自身は一万行程度のライブラリだが、多くの関連プロジェクトの成果の上に成り立っている。いずれも Twitter 自身が開発して OSS 化しており、SummingBird とは独立に利用できる。

  • Algebird: Scala 向けの抽象代数学ライブラリ(後述)。元々は Scala 向け Hadoop ラッパー Scalding の一部だった。
  • Bijection: 異なる型のオブジェクトを相互に変換するライブラリ。型クラスによる拡張をサポートし、異なるプラットフォームやクライアント上でシリアライズ方式を共有できる。
  • Chill: シリアライズライブラリ Kryo のラッパー。Storm、Scala、Hadoop 向けのモジュールを提供しているほか、Spark でも使われている。
  • Tormenta: Storm の Scala 拡張で、Spout に対して型安全性やマッピング、フィルタリング等の機能を提供する。
  • Storehaus: キーバリューストアに対する非同期クライアントを実装するためのライブラリ。(SummingBird における)Storm のリアルタイム集計処理は Storehaus の MergeableStore トレイトを使っており、Memcached や Redis などに対応している。

モノイドと並列処理

最初の紹介で「バッチとリアルタイムの処理結果を自動的にマージできる」と書いたが、この特性を支えるのが、上で紹介した Algebird が提供するモノイド半群の実装だ。

モノイドは、我々が日常的に使う「足し算」や「掛け算」の概念を抽象化したもので、結合律(と単位律)を満たす「足し算のようなもの」を総称する概念だ。「のようなもの」というのは、足されるのは「数」とは限らないから。身近な例では「リストの結合」もモノイドだ。

いきなり「結合律」と言われても何のこっちゃという感じだが、例えば、

a1 + a2 + a3

という足し算があったとしよう。これを (a1 + a2) + a3 という順番で足しても a1 + (a2 + a3) という順番で足しても、結果は変わらない。これが結合律。

つまり、この性質を満たす演算はどこからどの順番で足し合わせても同じ結果になるので、並列処理と相性がよい。また、その結果として、オンライン処理の結果とオフライン処理の結果も簡単にマージできる。

Algebird は、データ分析用途に使えるモノイドや半群の実装を多数提供している。例えば、HyperLogLogBloomFilter のようなアルゴリズムを SummingBird でそのまま利用できる。

trait Monoid[V] {
  def zero: V
  def plus(l: V, r: V): V
}

ユーザが自分のモノイドを実装することもできる。やることは Monoidzeroplus を実装して、あらゆる値で結合律を満たすことを確認するテストを書くだけ。

assertEquals(m.plus(a, m.plus(b,c)), m.plus(m.plus(a,b), c)

【11/15追記】 モノイドの自作については、yukitos さんが翻訳されている "Understanding monoids" シリーズがとても参考になります。

まとめ(というか余談)

この半年ほど、色々な Web 企業が公開している OSS プロダクトのソースコードを調べる機会があったのですが、個人的には、エンジニアリングの水準は Twitter が頭一つ抜けているなぁという印象を持っています。

Twitter 製 OSS プロダクトのレベルの高さを感じるのは、コードがよく整理されていて再利用性が高いのが一つ。もう一つは、アカデミックな知見の応用に積極的に取り組んでいる点です。

それも、単に学問的な純粋さを追求するのではなく、抽象化のレベルが適切で、理論と実践のバランスが注意深くはかられています。Tumblr や LinkedIn、Pinterest などの企業で Twitter 製プロダクトが続々と採用されているのも、開発生産性に加えて性能面での優秀さも一因でしょう。

あと、いち Scala ファンとしては、データ分析分野での Scala 活用事例としても注目しています。

なぜ SummingBird は、従来の SQL ライクな記法ではなく Scala DSL を採用したのか。その理由は今のところ特に語られていませんが、型安全な API や、型クラスによる拡張性の担保、あるいは非同期処理の実装を容易にする Future の活用など、SummingBird では Scala の機能がうまく活かされていると思います。

Scala 製分散データ処理フレームワークの代表格である Spark の方も、Databricks 創業や Cloudera との提携 といった明るいニュースが続いていますし、「データ分析用言語としての Scala」が盛り上がってきている感があります。この調子で、今後の躍進を期待したいところです。

参考文献