Java CompletableFuture 详解

Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class BasicFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(10);
Future<Integer> f = es.submit(() ->{
// 长时间的异步计算
// ……
// 然后返回结果
return 100;
});
// while(!f.isDone())
// ;
f.get();
}
}

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

很多语言,比如Node.js,采用回调的方式实现异步编程。Java的一些框架,比如Netty,自己扩展了Java的 Future接口,提供了addListener等多个扩展方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
future.addListener(new ChannelFutureListener()
{
@Override
public void operationComplete(ChannelFuture future) throws Exception
{
if (future.isSuccess()) {
// SUCCESS
}
else {
// FAILURE
}
}
});

Google guava也提供了通用的扩展Future:ListenableFutureSettableFuture 以及辅助类Futures等,方便异步编程。

1
2
3
4
5
6
7
8
9
10
11
final String name = ...;
inFlight.add(name);
ListenableFuture<Result> future = service.query(name);
future.addListener(new Runnable() {
public void run() {
processedCount.incrementAndGet();
inFlight.remove(name);
lastProcessed.set(name);
logger.info("Done with {0}", name);
}
}, executor);

Scala也提供了简单易用且功能强大的Future/Promise异步编程模式

作为正统的Java类库,是不是应该做点什么,加强一下自身库的功能呢?

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

下面我们就看一看它的功能吧。

阅读全文

Web框架性能基准测试 (Round 12)

以前我发布过 techempower的 第11轮的测试第9轮的测试,现在第12轮的测试出来了,结果肯定又会出人意料。

techempower的测试有好几个case,我们以每个request包含12个数据库的插入操作为例,看看各个web框架的性能,以TPS为指标排序(每秒返回的response多的在前面,性能越好)

go fasthttp + postgresql居然排到了第一,啧啧,以前前几名都是C/C++的框架排第一,这次C++ web框架wt排在了第二。

nodejs + Mysql表现不俗,排在了第6位。

undertow edge + Postgres排在了第8位

依然没有netty的测试,我期待Netty的表现,应该比undertow要高吧。

Go原生web框架的表现不好,排名很低。

Java生态圈的框架如Spring、dropwizard等表现的不温不火。

排名靠前的Web框架

异步编程模型的说明

前几日在看到Alexander Temerev在github上创建了一个项目skynet,用来比较各语言的并发编程的性能,当时觉得这个项目挺有趣,也就翻译整理了一下,写了一篇文章:1百万线程的性能,并且分享在开发者头条上。

本身这个项目涉及的语言很多,测试代码也不一定完全合理,因此这个项目的issue中也有很多讨论,也有很多贡献者提供了其它语言的测试代码,或者完善了现有的测试代码。

很多读者也对这个项目感兴趣,也在我的文章中留言表达了自己的观点,我很赞赏这些有意义的观点。

这个项目的描述是"Skynet 1M threads microbenchmark",我也采用了这样的描述作为文章的标题。当然大家都知道这个项目比较的并不是1百万线程的性能,
而是各个语言中为并发编程实现的类似线程的编程模型,这个各个语言为并发编程而做的努力。

当然,每个编程语言实现的并发编程模型也不尽相同,我们没办法用一个统一的概念称呼它们,姑且叫做"线程”吧。

各种语言的编程模型从内存实现上可以分为两类:

  1. 基于共享内存的模型:采用单一的统一的内存镜像,并发单元通过共享内存进行通讯,比如Java中的线程
  2. 基于消息传递的模型:每个并发单元包含自有的内存,并发单元通过消息交换进行通讯,比如go channel,Scala Actor等

从实现上来说,至少有三种模型实现:

  1. 基于线程的实现: 大部分的操作系统(轻量级的进程、内核级、用户级)、Java、C、C++都是这种实现
  2. 基于Actor的实现:Scala, Erlang等
  3. 基于Coroutine的实现:Haskell, Python等

当然,有些语言也不止一种实现,比如Python还有Pykka,它是一种actor的实现。 Java也可以使用Akka Actor实现Actor模型,比如Go实现了goroutine和CSP模型(channel)。

Paul Butcher写了一本书,叫七周七并发模型, 对并发编程想了解的不妨看看。

参考文档:

  1. https://en.wikipedia.org/wiki/Light-weight_process
  2. https://en.wikipedia.org/wiki/Coroutine
  3. https://en.wikipedia.org/wiki/Green_threads
  4. http://tutorials.jenkov.com/java-concurrency/concurrency-models.html
  5. https://en.wikipedia.org/wiki/Concurrency_%28computer_science%29#Models
  6. https://en.wikipedia.org/wiki/Communicating_sequential_processes
  7. http://www.amazon.com/Seven-Concurrency-Models-Weeks-Programmers/dp/1937785653
  8. http://berb.github.io/diploma-thesis/original/056_other.html
  9. http://grid.cs.gsu.edu/~tcpp/curriculum/sites/default/files/Programming%20with%20Concurrency%20-%20Threads%20Actors%20and%20Coroutines.pptx

Ignite vs Hazelcast

内存数据网格HazelcastIgnite是大家非常熟悉的两种分布式内存数据网格工具。

Hazelcast 是一款基于 Java的内存数据网格,它的名称和公司的名称相同。hazelcast支持分布式队列,集合,map,线程池,锁,支持事务处理,分布式的监听和事件,支持动态增加集群节点,动态备份数据,动态failover等。

关于Apache Ignite 的中文介绍可以参考李玉珏写的Apache Ignite(一):简介以及和Coherence、Gemfire、Redis等的比较等系列文章。Ignite来源于尼基塔·伊万诺夫于2007年创建的GridGain系统公司开发的GridGain软件,2015年1月,GridGain通过Apache 2.0许可进入Apache的孵化器进行孵化,很快就于8月25日毕业并且成为Apache的顶级项目,9月28日即发布了1.4.0版,2016年1月初发布了1.5.0版,迭代速度很快。

两个产品背后的公司Hazelcast和GridGain都有风投的背影。所以产品在开源免费的基础上还会提供商业版的支持。

我没有在实际产品中使用过这两款产品,仅仅关注过这一类的产品,所以并不完全了解它们的详细特性,但是最近的一些有趣的争论引起了我的兴趣,特地跟踪了多个帖子,弄清楚了争论的来龙去脉,特地整理了一下,也算作为我的性能系列的文章的一部分吧。

最近的事件是这两个产品背后的公司进行了激烈的性能之争。

起因是GridGain发布了一篇性能报告:GridGain vs. Hazelcast Benchmarks, 它比较了最新的GridGain Community Edition 1.5.0 和 最新的Hazelcast 3.6-EA2的性能,测试数据显示Ignite的性能要好于Hazelcast。相关的测试代码可以参照yardstick-ignite yardstick-hazelcast

进一步GridGain还到Hazelcast的用户讨论组中踢馆子,他们把测试结果和代码发布在Hazelcast的邮件列表中,请Hazelcast的人review和提意见。嚣张啊!
Hazelcast的CEO Luck把这个帖子从邮件列表中删除了,并说:

我们认为你在我的地盘上发布这样的性能数据是不合适的。 我们将删除这个帖子,请发布在你的地盘上。

当然,这也不是GridGain第一次踢馆子,在2015初Apache孵化器Ignite项目的导师Konstantin Boudnik就到Tachyon 的邮件列表中比较这两个项目的缓存特性差异,也被认为是营销惨遭删帖。

阅读全文

Kafka Connect简介

Kafka 0.9+增加了一个新的特性Kafka Connect,可以更方便的创建和管理数据流管道。它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统。Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理。而导出工作则是将数据从Kafka Topic中导出到其它数据存储系统、查询系统或者离线分析系统等,比如数据库、Elastic SearchApache Ignite等。

Kafka Connect特性包括:

  • Kafka connector通用框架,提供统一的集成API
  • 同时支持分布式模式和单机模式
  • REST 接口,用来查看和管理Kafka connectors
  • 自动化的offset管理,开发人员不必担心错误处理的影响
  • 分布式、可扩展
  • 流/批处理集成

阅读全文

1百万线程的性能

瑞士的金融软件工程师和创业者Alexander Temerev在github上创建了一个项目skynet ,用来测试各语言(框架)的多线程并行计算的性能,并得到了一些有用的数据。本文翻译整理自这个项目的说明。

测试并行性能的代码逻辑很简单:创建一个actor(goroutine,或者其它语言中类似的并发库),它会创建10个子actor,然后每个子actor再创建10个子actor,一直这样创建下去,直到创建了1百万个actor,每个actor包含一个唯一的数字(0到999999)。然后最底层的actor把它们的数字返回给父actor,父actor计算总和后再把结果返回给它的父actor,一直返回直到根actor,这样根actor的包含数字就是0到999999的和,结果应该为499999500000。

所以测试代码的逻辑就是并行计算0到999999的和,测试各种语言的并行库性能。

当然标题是不准确的,只是借用了这个项目的名称,1百万的线程不太可能在一台机器上创建,Alexander Temerev比较的是一些语言的并发框架的实现,聪明的读者应该明白项目性能测试的是什么东西,
它包括了Scala Actor、Scala-Future、Go、erlang、haskell、C# Core、C# TPL、RxJava、node-bluebird、python、rust等语言/框架的测试代码。所以大家不必吐槽标题中的"线程",其实比较的是异步编程模型的性能。

阅读全文

Scala Async 库

在我以前的文章中,我介绍了Scala Future and PromiseFuture代表一个异步计算,你可以设置你的回调函数或者利用Await.result等待获取异步计算的结果,你还可以组合多个future为一个新的futurePromise让你可以控制是否完成计算还是抛出异常,它的future方法返回一个Future对象,completesuccessfailure允许你完成计算。如果想要同步操作,可以使用Await.result等待Future完成或者超时,对于没有实现Awaitable的代码块,可以使用blocking方法实现同步执行。

阅读全文