よく忘れるのでメモします。Scala です。
Future#collect とは
com.twitter.util.Future についている便利関数で、Seq[Future[A]]
を受け取り、Future[Seq[A]]
を返すことができる。別名 sequence
と呼ばれる処理をします。
この関数はリストになった Future を受け取って処理をしていくのですが、途中で1つでも Future が例外状態になった場合は、そこで処理が中断されます。もし、中断せずに処理を続行させたい場合には、Future#collectToTry
という関数を利用できます。
この関数のシグネチャ的に、並列処理を裏側でやってくれそうな気がするのですが、通常通り Future
を投げ込む限りでは並列処理はやってくれません。いくつか実験して調べてみましょう。
実験内容
下記の実験を今回は行います。
- タスクを3つ用意する(A, B, Cと名前をつける)
- 1秒に1回「ticking task N」(N には A, B, C のどれかが入る)と標準出力する。
- 期待値としては
- 逐次実行の場合は、A が終了してから B、B が終了してから C と順にタスクが走る。
- 並列実行の場合は、A, B, C が同時にスタートする。
実験1: 逐次実行することを確かめる
次のようなテストを書くと、その挙動を確かめることができます。テストライブラリには ScalaTest を使用しています。
import com.twitter.util.{Await, Future} import org.scalatest.FunSpecLike class ParalleliseTest extends FunSpecLike { describe("1: 逐次実行するパターン") { it("正しく実行されること") { val taskA = Future { for (_ <- 0 until 10) { println("ticking task A") Thread.sleep(1000) } } val taskB = Future { for (_ <- 1 until 10) { println("ticking task B") Thread.sleep(1000) } } val taskC = Future { for (_ <- 1 until 10) { println("ticking task C") Thread.sleep(1000) } } Await.result(Future.collect(Seq(taskA, taskB, taskC))) } } }
結果は下記のようになります。IntelliJ の計測だと、逐次実行しているので 29s かかって終了しているようです。
ticking task A ticking task A ticking task A ticking task A ticking task A ticking task A ticking task A ticking task A ticking task A ticking task A ticking task B ticking task B ticking task B ticking task B ticking task B ticking task B ticking task B ticking task B ticking task B ticking task B ticking task C ticking task C ticking task C ticking task C ticking task C ticking task C ticking task C ticking task C ticking task C
実験2: 並列実行することを確かめる
FuturePool を使用して処理を記述すると並列処理させることができます。これを利用して実装します。val pool = FuturePool.unboundedPool
にて、FuturePool を用意しています。
import com.twitter.util.{Await, Future, FuturePool} import org.scalatest.FunSpecLike class ParalleliseTest extends FunSpecLike { describe("2: 並列処理するパターン") { val pool = FuturePool.unboundedPool it("正しく実行されること") { val taskA = pool { for (_ <- 0 until 10) { println("ticking task A") Thread.sleep(1000) } } val taskB = pool { for (_ <- 1 until 10) { println("ticking task B") Thread.sleep(1000) } } val taskC = pool { for (_ <- 1 until 10) { println("ticking task C") Thread.sleep(1000) } } Await.result(Future.collect(Seq(taskA, taskB, taskC))) } } }
結果は下記のようになります。
ticking task A ticking task B ticking task C ticking task A ticking task C ticking task B ticking task A ticking task B ticking task C ticking task A ticking task C ticking task B ticking task A ticking task C ticking task B ticking task A ticking task B ticking task C ticking task A ticking task B ticking task C ticking task A ticking task B ticking task C ticking task A ticking task C ticking task B ticking task A
並列処理(というか、並行処理?)をしていることがわかります。IntelliJ の計測だと 10s 弱かかって終了しているようです。結果の Seq に詰め込まれる要素順はこの場合、必ずしも保証されないようなので注意が必要そうです。
余談: sequence 3兄弟
TwitterFuture には、Seq[Future[A]]
(あるいは、Seq[A]
と A => Future[B]
にリフトさせる関数)を受け取り Future[Seq[B]]
を返す関数が3つあります。
- collect
- collectToTry
- traverseSequentially
このうち、collect と collectToTry は Seq を受け取った後、中で iterator に変換します。Seq の数が増えてくると遅延評価を利用したくなりおそらく Stream
にしたくなると思うのですが、collect に Stream
を入れたとしても iterator にて一度展開されるため、遅延評価にならず意味がなくなってしまうように思います。
この点に注意が必要で、私も実際プロダクトで使っていて、メモリに一気に載せているような挙動を示していました。traverseSequentially にしたら、Stream らしい挙動を示しました。
collect は fs.iterator
にて確かに Iterator を呼び出しています。
def collect[A](fs: AnySeq[Future[A]]): Future[Seq[A]] = if (fs.isEmpty) emptySeq else { val result = new CollectPromise[A](fs) var i = 0 val it = fs.iterator while (it.hasNext && !result.isDefined) { it.next().respond(result.collectTo(i)) i += 1 } result }
def collectToTry[A](fs: AnySeq[Future[A]]): Future[Seq[Try[A]]] = { //unroll cases 0 and 1 if (fs.isEmpty) Nil else { val iterator = fs.iterator val h = iterator.next() if (iterator.hasNext) { val buf = Vector.newBuilder[Future[Try[A]]] buf.sizeHint(fs.size) buf += h.liftToTry buf += iterator.next().liftToTry while (iterator.hasNext) buf += iterator.next().liftToTry Future.collect(buf.result) } else { Future.collect(List(h.liftToTry)) } } }
一方でどうやら、traverseSequentially の場合はそのような事態は起こらないように見えます。as.foldLeft
をすれば、たとえば as が Stream だった場合は、Stream の foldLeft 実装が呼び出されるはずだからです。
def traverseSequentially[A,B](as: Seq[A])(f: A => Future[B]): Future[Seq[B]] = as.foldLeft(Future.value(Vector.empty[B])) { (resultsFuture, nextItem) => for { results <- resultsFuture nextResult <- f(nextItem) } yield results :+ nextResult }
まとめ
- Future#collect は逐次実行をする。
- FuturePool と合わせると並列実行になる。