Scala Future and Promise

Futures 和 Promises是Scala的语言的功能加强: SIP-14。官方文档很好的介绍了这个功能。
Future提供了一个漂亮的方式提供并行执行代码的能力,高效且非阻塞。Future可以并发地执行,可以提供更快,异步,非阻塞的并发代码。
通常,future和promise都是非阻塞的执行,可以通过回调函数来获得结果。但是,你也可以通过阻塞的方式串行的执行Future。
本文主要编译于官方文档,以及参考了其它一些网上公开的资料。

Future

一个Future会持有一个值,这个值在将来的某个时间点才可用,这个值通常是其它运算的结果:

  1. 如果计算还没有完成,这个Future就还没有完成.
  2. 如果计算完成(得到一个结果或者发生异常),这个Future就已经完成.

因此,即使Future已经完成,也可能是下面的两种情况:

  1. 计算完成时返回一个值, 则future成功的完成并计算出一个结果.
  2. 计算完成时发生异常, 则future失败了,并会提供一个异常对象.

Future有一个很重要的属性就是它只可能指派一次。一旦一个Future对象给定了一个值或者异常, 它就会变成不可变了, 它的结果绝不会再被更改。
而Promise和Future的概念非常相像,但是你可以写返回结果或者异常。

最简单的创建Future对象的方式就是使用future此方法定义如下:

1
def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body)

了解它最好的方式就是看一个代码:

1
2
3
4
5
6
import scala.concurrent._
import ExecutionContext.Implicits.global
val session = socialNetwork.createSessionFor("user", credentials)
val f: Future[List[Friend]] = future {
session.getFriends()
}

session.getFriends()从服务器获取当前用户的朋友,这是一个耗时的操作。这段代码把这个操作封装成Future对象,可以异步的执行。返回Future[List[Friend]]对象作为占位符。一旦服务器返回朋友列表,则这个Future对象就完成了,而且返回结果也可以得到。
如果获取朋友列表时出现异常呢,如:

1
2
3
4
val session = null
val f: Future[List[Friend]] = future {
session.getFriends
}

那么这个Furture失败了,返回一个异常。

获取计算结果

前面所讲,可以根据Future得到结果。下面我们就演示如何或者计算结果或者异常。

异步方式

可以通过回调的方式实现完全的异步。 Future提供了onSuccessonFailure的回调方法,或者更抽象的onComplete方法。
onComplete方法的回调函数类型为:Try[T] => U,可以应用在Success[T]或者Failure[T]上。

Try[T]类型类似Option[T] 或者Either[T, S],它可以是Success[T]或者Failure[T],因此Try[T]可以看作Either[Throwable, T]。
举例说明:

1
2
3
4
5
6
7
8
import scala.util.{Success, Failure}
val f: Future[List[String]] = future {
session.getRecentPosts
}
f onComplete {
case Success(posts) => for (post <- posts) println(post)
case Failure(t) => println("An error has occured: " + t.getMessage)
}

或者也可以这样:

1
2
3
4
5
6
7
8
9
val f: Future[List[String]] = future {
session.getRecentPosts
}
f onFailure {
case t => println("An error has occured: " + t.getMessage)
}
f onSuccess {
case posts => for (post <- posts) println(post)
}

你需要知道的是当计算结果完成时回调函数才会被调用。而且不会担保哪个线程会执行回调函数,但是回调函数最终肯定会被执行。
你还可以执行多个回调函数,尽管不担保它们的执行顺序:

1
2
3
4
5
6
7
8
9
10
@volatile var totalA = 0
val text = future {
"na" * 16 + "BATMAN!!!"
}
text onSuccess {
case txt => totalA += txt.count(_ == 'a')
}
text onSuccess {
case txt => totalA += txt.count(_ == 'A')
}

回调函数执行完后就会从这个Future对象中移除,可以被GC.

同步方式

尽管大部分情况我们都首选异步回调的方式处理Future,但是有些特殊的情况我们还是希望用同步阻塞的方式执行。
录入上面的代码:

1
2
3
4
5
import scala.util.{Success, Failure}
val f: Future[List[String]] = future {
session.getRecentPosts
}
Await.result(f, 0 nanos)

通过Await.result可以同步阻塞的获取结果,或者超时,或者抛出异常。
Await.ready等待结果完成,不返回结果。

对于其它没有像Future实现Awaitable trait的代码,可以通过下面的代码实现阻塞:

1
2
3
blocking {
potentiallyBlockingCall()
}

复合操作

对于其它严重依赖回调的框架,如node.js,你经常会看到future串联起来(chain)。一个future依赖另外一个future。
如果按照前面的代码,不是很方便:

1
2
3
4
5
6
7
8
9
10
11
12
13
val rateQuote = future {
connection.getCurrentValue(USD)
}
rateQuote onSuccess { case quote =>
val purchase = future {
if (isProfitable(quote)) connection.buy(amount, quote)
else throw new Exception("not profitable")
}
purchase onSuccess {
case _ => println("Purchased " + amount + " USD")
}
}

上面的代码演示当有利可图时,买入美元。
我们不得不在onSuccess中嵌套第二个Future。而且purchase的scope也限制在了onSuccess方法中。想象一下如果有四五重的嵌套的话,
代码将变得惨不忍睹。

为结果这些问题,future提供了一些选择符(combinators), 如map

1
2
3
4
5
6
7
8
9
10
val rateQuote = future {
connection.getCurrentValue(USD)
}
val purchase = rateQuote map { quote =>
if (isProfitable(quote)) connection.buy(amount, quote)
else throw new Exception("not profitable")
}
purchase onSuccess {
case _ => println("Purchased " + amount + " USD")
}

map第一个future rateQuote 计算成功的值,生成第二个future。当 rateQuote失败时,这时没有值可以map,purchase 自动失败,异常和rateQuote的异常一样。
future还有flatMap, filter 和 foreach 等选择符。
另外future还提供了failed映射函数,它返回一个值为Throwable的future,如果先前的future成功,则failed返回的Future会抛出NoSuchElementException

Promise

前面主要讲Future,通过future方法来创建一个Future对象。 我们还可以通过Promise的方式来创建future。
Future是一个只读的,值还没有计算的占位符。而Promise可以看作一个可写的,单次指派的容器,可以完成一个future。也就是说Promise可以通过调用success方法是一个Future成功完成,或者failure方法抛出异常。或者更抽象的complete
如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import scala.concurrent.{ future, promise }
import scala.concurrent.ExecutionContext.Implicits.global
val p = promise[T]
val f = p.future
val producer = future {
val r = produceSomething()
p success r
continueDoingSomethingUnrelated()
}
val consumer = future {
startDoingSomething()
f onSuccess {
case r => doSomethingWithResult()
}
}

因为Promise是单次指派的,你不应该调用success或者failure两次,否则会抛出IllegalStateException.

参考文档

  1. http://docs.scala-lang.org/sips/completed/futures-promises.html
  2. http://docs.scala-lang.org/sips/completed/futures-promises.html
  3. https://code.csdn.net/DOC_Scala/chinese_scala_offical_document/file/Futures-and-Promises-cn.md
  4. http://programmers.stackexchange.com/questions/207136/what-is-the-difference-between-a-future-and-a-promise