Don't Repeat Yourself

Don't Repeat Yourself (DRY) is a principle of software development aimed at reducing repetition of all kinds. -- wikipedia

TwitterFuture の Future#collect と並列処理

よく忘れるのでメモします。Scala です。

Future#collect とは

com.twitter.util.Future についている便利関数で、Seq[Future[A]] を受け取り、Future[Seq[A]] を返すことができる。別名 sequence と呼ばれる処理をします。

この関数はリストになった Future を受け取って処理をしていくのですが、途中で1つでも Future が例外状態になった場合は、そこで処理が中断されます。もし、中断せずに処理を続行させたい場合には、Future#collectToTry という関数を利用できます。

この関数のシグネチャ的に、並列処理を裏側でやってくれそうな気がするのですが、通常通り Future を投げ込む限りでは並列処理はやってくれません。いくつか実験して調べてみましょう。

実験内容

下記の実験を今回は行います。

  • タスクを3つ用意する(A, B, Cと名前をつける)
  • 1秒に1回「ticking task N」(N には A, B, C のどれかが入る)と標準出力する。
  • 期待値としては
    • 逐次実行の場合は、A が終了してから B、B が終了してから C と順にタスクが走る。
    • 並列実行の場合は、A, B, C が同時にスタートする。

実験1: 逐次実行することを確かめる

次のようなテストを書くと、その挙動を確かめることができます。テストライブラリには ScalaTest を使用しています。

import com.twitter.util.{Await, Future}
import org.scalatest.FunSpecLike

class ParalleliseTest extends FunSpecLike {

  describe("1: 逐次実行するパターン") {
    it("正しく実行されること") {
      val taskA = Future {
        for (_ <- 0 until 10) {
          println("ticking task A")
          Thread.sleep(1000)
        }
      }

      val taskB = Future {
        for (_ <- 1 until 10) {
          println("ticking task B")
          Thread.sleep(1000)
        }
      }

      val taskC = Future {
        for (_ <- 1 until 10) {
          println("ticking task C")
          Thread.sleep(1000)
        }
      }

      Await.result(Future.collect(Seq(taskA, taskB, taskC)))
    }
  }
}

結果は下記のようになります。IntelliJ の計測だと、逐次実行しているので 29s かかって終了しているようです。

ticking task A
ticking task A
ticking task A
ticking task A
ticking task A
ticking task A
ticking task A
ticking task A
ticking task A
ticking task A
ticking task B
ticking task B
ticking task B
ticking task B
ticking task B
ticking task B
ticking task B
ticking task B
ticking task B
ticking task B
ticking task C
ticking task C
ticking task C
ticking task C
ticking task C
ticking task C
ticking task C
ticking task C
ticking task C

実験2: 並列実行することを確かめる

FuturePool を使用して処理を記述すると並列処理させることができます。これを利用して実装します。val pool = FuturePool.unboundedPool にて、FuturePool を用意しています。

import com.twitter.util.{Await, Future, FuturePool}
import org.scalatest.FunSpecLike

class ParalleliseTest extends FunSpecLike {

  describe("2: 並列処理するパターン") {
    val pool = FuturePool.unboundedPool

    it("正しく実行されること") {
      val taskA = pool {
        for (_ <- 0 until 10) {
          println("ticking task A")
          Thread.sleep(1000)
        }
      }

      val taskB = pool {
        for (_ <- 1 until 10) {
          println("ticking task B")
          Thread.sleep(1000)
        }
      }

      val taskC = pool {
        for (_ <- 1 until 10) {
          println("ticking task C")
          Thread.sleep(1000)
        }
      }

      Await.result(Future.collect(Seq(taskA, taskB, taskC)))
    }
  }
}

結果は下記のようになります。

ticking task A
ticking task B
ticking task C
ticking task A
ticking task C
ticking task B
ticking task A
ticking task B
ticking task C
ticking task A
ticking task C
ticking task B
ticking task A
ticking task C
ticking task B
ticking task A
ticking task B
ticking task C
ticking task A
ticking task B
ticking task C
ticking task A
ticking task B
ticking task C
ticking task A
ticking task C
ticking task B
ticking task A

並列処理(というか、並行処理?)をしていることがわかります。IntelliJ の計測だと 10s 弱かかって終了しているようです。結果の Seq に詰め込まれる要素順はこの場合、必ずしも保証されないようなので注意が必要そうです。

余談: sequence 3兄弟

TwitterFuture には、Seq[Future[A]] (あるいは、Seq[A]A => Future[B] にリフトさせる関数)を受け取り Future[Seq[B]] を返す関数が3つあります。

  • collect
  • collectToTry
  • traverseSequentially

このうち、collect と collectToTry は Seq を受け取った後、中で iterator に変換します。Seq の数が増えてくると遅延評価を利用したくなりおそらく Stream にしたくなると思うのですが、collect に Stream を入れたとしても iterator にて一度展開されるため、遅延評価にならず意味がなくなってしまうように思います。

この点に注意が必要で、私も実際プロダクトで使っていて、メモリに一気に載せているような挙動を示していました。traverseSequentially にしたら、Stream らしい挙動を示しました。

collect は fs.iterator にて確かに Iterator を呼び出しています。

def collect[A](fs: AnySeq[Future[A]]): Future[Seq[A]] =
    if (fs.isEmpty) emptySeq
    else {
      val result = new CollectPromise[A](fs)
      var i = 0
      val it = fs.iterator

      while (it.hasNext && !result.isDefined) {
        it.next().respond(result.collectTo(i))
        i += 1
      }

      result
    }

実装はこちら

  def collectToTry[A](fs: AnySeq[Future[A]]): Future[Seq[Try[A]]] = {
    //unroll cases 0 and 1
    if (fs.isEmpty) Nil
    else {
      val iterator = fs.iterator
      val h = iterator.next()
      if (iterator.hasNext) {
        val buf = Vector.newBuilder[Future[Try[A]]]
        buf.sizeHint(fs.size)
        buf += h.liftToTry
        buf += iterator.next().liftToTry
        while (iterator.hasNext) buf += iterator.next().liftToTry
        Future.collect(buf.result)
      } else {
        Future.collect(List(h.liftToTry))
      }
    }
  }

実装はこちら

一方でどうやら、traverseSequentially の場合はそのような事態は起こらないように見えます。as.foldLeft をすれば、たとえば as が Stream だった場合は、Stream の foldLeft 実装が呼び出されるはずだからです。

def traverseSequentially[A,B](as: Seq[A])(f: A => Future[B]): Future[Seq[B]] =
    as.foldLeft(Future.value(Vector.empty[B])) { (resultsFuture, nextItem) =>
      for {
        results    <- resultsFuture
        nextResult <- f(nextItem)
      } yield results :+ nextResult
    }

実装はこちら

まとめ

  • Future#collect は逐次実行をする。
  • FuturePool と合わせると並列実行になる。