Monday, February 25, 2013

Effective com.twitter.util.Future

Writing scala software that effective manages asynchronous execution through Future's is quite straightforward, once you understand the ins and outs of the particular framework. Twitter's implementation of Future in Scala is part of their util library. I wanted to understand how to best use them, and after lots of reading, source code examination, and experimentation, I have results I feel are worth sharing. There are other Future implementations out there, including one in Scala 2.10. I hope what I have to share here will transfer to other frameworks with some tweaks.

The Setup

Running the Scala REPL via sbt made the exploration substantially easier. I used the following build.sbt:
name := "Future Test"

version := "0.0"

organization := "org.sfmishras"

scalaVersion := "2.9.2"

resolvers += "twitter" at "http://maven.twttr.com/"

libraryDependencies +=  "com.twitter" % "util-core" % "6.0.5"

initialCommands in console := """
        import com.twitter.util._
        import java.lang.Thread
        import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
"""

Run sbt console, and you will have everything you need to follow along.

Running Asynchronously

Perhaps a bit confusingly, not all futures run asynchronously. Hopefully by the time you get to the end of this post you will realize why this is so. The following calls all execute the function provided to the future immediately, in the current thread. So where the function calls Thread.sleep, execution will pause.
scala> var f = Future[Int] { 10 }
f: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@435fd27a

scala> f.get()
res0: Int = 10

scala> f = Future[Int] { Thread.sleep(5000); 10 }
f: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@5ea96ab7

Now we demonstrate asynchronous execution. Unlike Future, FutureTask instantiates and returns immediately. It has to be explicitly executed in an executor, and calling get() on that task waits for the sleep to complete. (You have to execute the get within five seconds of executing the task to see this in action!)
scala> val pool: ExecutorService = Executors.newFixedThreadPool(2)
pool: java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPoolExecutor@22add54d[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]

scala> var ft = new FutureTask[Int]( { Thread.sleep(5000); 10 } )
ft: com.twitter.util.FutureTask[Int] = Promise@1343838719(state=Waiting(null,List()))

scala> pool.execute(ft); ft.get()
res1: Int = 10

Parallel Futures

The method select on a future runs this and its argument future in parallel. The result of select is a new future that holds the value of the first future to complete. Let's see how this works when we combine Future and FutureTask in different ways.
scala> ft = new FutureTask[Int]({ Thread.sleep(5000); 10 })
ft: com.twitter.util.FutureTask[Int] = Promise@1875826460(state=Waiting(null,List()))

scala> f = Future[Int] { Thread.sleep(5000); 20 }
f: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@2294eb7c

scala> ft.isDefined
res2: Boolean = false

scala> f.isDefined
res3: Boolean = true

scala> var sf = ft.select(f)
sf: com.twitter.util.Future[Int] = Promise@1353874654(state=Done(Return(20)))

scala> sf.isDefined
res4: Boolean = true

scala> sf.get()
res5: Int = 20
f.select(ft) produces the same outcome as ft.select(f), so I'm leaving that out of this writeup. It isn't useful to combine synchronous and asynchronous execution in this manner. We really see the benefit with two asynchronous executions. The result of select isn't defined until one of the futures complete execution. And the first to complete gives sf its value. Also note that sf can't itself be executed. Its value can only be observed.
scala> ft = new FutureTask[Int]({ Thread.sleep(5000); 10 })ft: com.twitter.util.FutureTask[Int] = Promise@698352755(state=Waiting(null,List()))

scala> var ft2 = FutureTask[Int]({ Thread.sleep(3000); 20 })
ft2: com.twitter.util.FutureTask[Int] = Promise@1215135919(state=Waiting(null,List()))

scala> sf = ft2.select(ft)
sf: com.twitter.util.Future[Int] = Promise@621087996(state=Interruptible(List(),<function1>))

scala> sf.isDefined
res8: Boolean = false

scala> pool.execute(sf)
<console>:17: error: type mismatch;
 found   : com.twitter.util.Future[Int]
 required: java.lang.Runnable
              pool.execute(sf)
                           ^

scala> pool.execute(ft); pool.execute(ft2)

scala> sf.isDefined
res11: Boolean = true

scala> sf.get()
res12: Int = 20

Another way to set up the above experiment is through a FuturePool. This data structure takes an ordinary function, and turns it into a future that's running asynchronously.
scala> val fp = FuturePool(pool)
fp: com.twitter.util.ExecutorServiceFuturePool = com.twitter.util.ExecutorServiceFuturePool@3ea32dba

scala> sf = (fp { Thread.sleep(5000); 10 }).select(fp { Thread.sleep(3000); 20 })
sf: com.twitter.util.Future[Int] = Promise@1324160278(state=Interruptible(List(),<function1>))

// More than 3 seconds later...

scala> sf.isDefined
res13: Boolean = true

scala> sf.get()
res14: Int = 20

Serial Futures


We now consider two forms of serial execution: passing the result of a future into another computation, and taking over from the execution of a future in case it fails. At this point we have a reasonable understanding of the behavior of Future vs FutureTask, so we will not explore possible combinations of these.

The result of a future can be passed into a new future using flatMap. This is equivalent to simple function composition. We have the new future continue execution in the same thread as f1. If f1 throws, the function argument of flatMap isn't executed.
scala> var f1 = Future { 10 }
f1: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@2a21d0b1

scala> (f1 flatMap { x: Int => Future { x + 5 } }).get()
res15: Int = 15

Now, suppose we want our computation to be timebound. ft can thus fail through timeout or exception. The within method creates a timebound future that throws after the timer has expired. The example below thus also covers the ordinary exception situation.
scala> ft = new FutureTask[Int]({ Thread.sleep(5000); 10 })
ft: com.twitter.util.FutureTask[Int] = Promise@1003224409(state=Waiting(null,List()))

scala> f = ft.within(new JavaTimer(false), Duration(1, TimeUnit.SECONDS))
f: com.twitter.util.Future[Int] = Promise@759077407(state=Transforming(List(),Promise@1003224409(state=Waiting(<function1>,List()))))

scala> f.rescue({ case _: Throwable => Future { 20 } })
res16: com.twitter.util.Future[Int] = Promise@694580545(state=Done(Return(20)))

scala> pool.execute(ft)

scala> f.isDefined
res18: Boolean = true

scala> f.get()
com.twitter.util.TimeoutException: 1.seconds
    at com.twitter.util.Future$$anonfun$2.apply$mcV$sp(Future.scala:568)
    at com.twitter.util.JavaTimer$$anon$1.run(Timer.scala:160)
    at java.util.TimerThread.mainLoop(Timer.java:555)
    at java.util.TimerThread.run(Timer.java:505)


scala> res16.get()
res20: Int = 20

Putting It Together

We've explored setting up parallel and serial combinations of futures in Scala using Twitter's implementation. And we have used get() at various points to examine results. Note that in practice, you never really have to call get all that often. Doing so will put your application in a synchronous wait, precisely what you were trying to avoid. Most of the time, if you do this right, you should be able to accumulate results from futures into concurrent data structures directly. For instance, if you were trying to fill entries of a map, you could have futures write directly into a concurrent hash map, and pick up the result when they all completed. There is a lot more to this that I don't understand, for instance, where one might want to use a Promise directly.