上一篇文章,也就是去年(我真懒)写了一遍关于JVM上的协程文章。当时是基于Quasar来实现Java的协程,虽然能达到类似Golang的效果,当时需要在运行时加载一个修改字节码的Jar,用起来不是很舒服,毕竟Java出来的时候协程这个概念还没火,所以需要一些方法hack一下。

今年Google钦定了一个Andriod上的语言Kotlin,一下子把这个JVM新生的语言捧火了,这个语言亲爹是Jetbrain,干爹现在又是Google,真的是火的不行,当然目前也是在Andriod那边火,毕竟他们一直用的是阉割版的Java8。

JVM上的又一个新秀kotlin

Kotlin今年在Google大会上已经被捧起来了,各方面的资料犹如汗牛充栋,大家可以随便搜索到,我这里就简单的介绍一下他的优势。

  • 简约,支持局部变量类型推导,有完善的data class,函数扩展,智能转换(省代码),类委托,等等
  • 安全,比如空引用判断,NPE异常在Java里很常见,Kotlin可以通过在类型后面加?让程序在编译期间进行检查。
  • 通用,koltin除了可以跑在JVM上,也可以编译成javascript代码跑在node上。
  • 无缝兼容Java,Jetbrains的目标是让他们100%兼容(可以互相调用),这点比太阳系最骚语言Scala要强。

另外还有两点能体现出kotlin会比scala更流行的地方。

  • IDE支持力度,亲爹就是做IDE出身的,可以看到kotlin-plugin更新速度是kotlin编译器以及语言是一样快的。
  • kotlin直接编译成Java字节码,所以编译速度上也会比Scala要快一点

本篇文章并不打算详细介绍这门语言,有兴趣的可以自己去官网看一下文档,有Java8基础的同学估计1天就能入门,有Scala基础的同学更好入门了,别想太多就好。

kotlin的coroutines

上面说了一堆kotlin的优势,但是最大的优势是有牛人为他设计并实现了coroutines,为此kotlin引入了suspend关键字,作用在函数或者lambda上。
上两篇文章已经介绍了协程的概念以及大致的原理,这里就不啰嗦了,简单的介绍kotlin-coroutines概念后,直接上例子。

1
2
3
4
5
6
7
8
fun main(args: Array<String>) {
launch(CommonPool) {
delay(1000L) // 延迟1S
println("World!") // 打印World
}
println("Hello,") // 在launch块外面则是Main线程
Thread.sleep(2000L) // 主线程休眠2S
}

上面代码最终会输出Hello, World。大家可能对上面的一些方法与函数不太了解,我这里简单说一下。
launch(CommonPool){}中的launch其实是一个Builder方法,用他来构建一个跑coroutines的环境,同时接受一个参数,这个参数用来表示协程将跑在哪些线程里,CommonPool默认建议用Java7的forkJoinPool,这个线程池是特别优化过的支持work-stealing算法,与Golang的协程调度器很像,(当然还是有点不一样的),forkJoinPool默认数目是CPU * 2,这样可以最大限度使用CPU资源。
好了现在我们通过launch这个Builder方法以及CommonPool这个参数构造了一个可以跑协程的代码块,这里注意,所有协程的逻辑只能跑在launch {}块里,超出这个范围就是主线程了。
下面在这个代码块里我们执行了协程库提供的方法delay,这里的delay只停止某一个协程,而不是整个线程。所以外面的Thread.sleep不会阻塞到CommonPool里的协程,所以你会在1秒后看到World!

下面我们再看一个例子.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(CommonPool) { doSomethingUsefulOne() }
val two = async(CommonPool) { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
suspend fun doSomethingUsefulOne(): Int {
delay(1000L)
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L)
return 29
}

这里又多了一个新的coroutines环境Builder方法async,通过async方法调用一个标注suspend的fun可以得到一个返回值奥。
这里suspend的方法其实是告诉koltin编译器,我这里是一个可以中断的方法,用于coroutines环境,这样所有的其他的coroutines方法都可以用在这个函数内部比如delay()。现在我们回到async方法的返回值val one,这个里kotlin已经做了类型推断,所以我们没有定义类型,查看async方法的返回类型你会发现这里的one类型应该是Deferred,再具体一点应该是val one:Deferred<Int>。有点像Future<Integer>,但是注意一下println方法里的方法,我们这里可以直接的以同步的方式获取结果one.await(),而Future则需要再包一个回调,稍显麻烦。
await()方法被调用时会阻塞一个协程,并对其求值,如果没有发生异常则将其结果同步返回到引用的变量上,这就是coroutines的魅力,你可以以同步的方式来完成异步的逻辑,同时避免了回调地域,且逻辑非常符合人类的认知。
koltin的coroutines还提供了ChannelMutex, 和Select这些非常眼熟吧,具体由于篇幅关系我就不提了,大家有兴趣看官方文档,我这边只是抛个砖而已。

当koltin-coroutines遇到vert.x

kotlin-coroutines单独使用,没有任何问题,一般用在一些提供异步API的地方比如Java8的CompletableFuture,AIO等,也可以和Reactive库配合使用,官方默认提供了一些适配Rx1,Rx2,UI层面上也提供了一些适配比如Andriod,JavaFx等。coroutines本质上就是异步回调的代替者。
那JVM的全异步库vert.x岂不是和kotlin-coroutines是绝配,因为之前倒腾过vertx-sync,所以决定尝试一下用kotlin-coroutines包装vert.x的handler。因为kotlin-cortouines的封装以及API非常优雅,几乎没有多少行代码(两百多行有效代码),就把项目跑起来了,后来又经过社区各方面的牛人修修补补,以及语义上的规范,API的完善,已经进了vert.x官方的仓库,会与vert.x-3.5会一起发布。
下面给出几个例子,各位就知道vertx-lang-kotlin-coroutines有多优雅了,下面假设各位都熟悉vert.x了。

1
2
3
4
5
6
7
8
9
10
11
12
//标准vert.x timerAPI写法
vertx.setTimer(1000, tid -> {
System.out.println("Event fired from timer")
});
//使用coroutines
launch(vertx.dispatcher()) {
awaitEvent<Long> { handler ->
vertx.setTimer(1000, handler)
}
println("Event fired from timer")
}

标准的vert.x timerAPI是异步的,且逻辑也是写在回调Handler里的,所以代码看起来会是嵌套的,但是上面的coroutines例子你会发现没有这个问题,所有的逻辑都是在变量timerId后面,也就是说逻辑是线性的,直觉符合人类的思考方式,你会觉得上面的会阻塞1秒下面才会执行。

这里的launch接受的参数与标准的coroutines不一样,vertx.dispatcher()内部会返回一个包含vertxcontextScheduledExecutorService对象,随即在launch块里的代码都会跑在这个vertx的context下,从而保证了所有的协程都跑在vertx的EventLoop下,熟悉vert.x朋友应该知道vertx的context是在verticle启动的时候就已经分配给指定的EventLoop了,所以保证了所有的Handler都会安顺序跑在一个线程里面。
但是只把协程跑在vert.x的EventLoop里是不够的,我们还需要想办法把vert.x的回调型API改成同步的,所以这里你会看到一个新的方法awaitEvent<Long>,通过这个方法可以handler以协程的方式进行处理,从而可以直接得到一个返回值,使之变成同步返回,有兴趣的朋友可以看一下这个方法的源码,里面调用了kotlin-coroutines的APIsuspendCancellableCoroutine,是通过suspend进行修饰的。

如果各位kotlin用的也比较熟悉,上面的例子还可以更简单,直接变成一行。awaitEvent<Long>{vertx.setTimer(1000, it)}

OK,通过改变launch的参数,使coroutines可以跑在EventLoop下,通过包装vert.x的API使之可以通过协程同步返回。完成这两个功能vert.x就可以完美的和协程融合了。下面我们再看一个稍微复杂的例子

1
2
3
4
5
6
7
8
9
10
11
val consumer = vertx.eventBus().localConsumer<String>("a.b.c")
consumer.handler { message ->
println("Consumer received: ${message.body()}")
message.reply("pong")
}
// Send a message and wait for a reply
val reply = awaitResult<Message<String>> { h ->
vertx.eventBus().send("a.b.c", "ping", h)
}
println("Reply received: ${reply.body()}")

上面的例子用了vert.x的经典的消息总线例子ping-pong。
我们先注册了一个consumer,总线地址是a.b.c,同时给了handler用于监听总线上的消息,如果收到消息则立刻给个返回pong。接着,通过vert.x发送了一个ping消息到这个总线上,同时也注册了一个handler用于接收回答消息,也就是consumer里的hander会reply过来的pong。这里你会注意到我们用了awaitResult这个API包装了send的最后一个参数,这样send方法里的Handler参数可以同步的返回给reply,也就是说通过协程变成了同步。然后我们就可以直接打印出reply里的一些数据,整体逻辑也是线性的。
细心的朋友会发现,这里是通过awaitResult而不是awaitEvent。这两者的区别很简单,awaitResutl会有一个确定的返回值,awaitEvent一般没有确定的返回值,有也是void类型。这两个方法包装了vert.x的两个核心API,Handler<Void>Handler<AsyncResult<T>>,这样一说我觉得大家应该能够理解了。
上面只是接受一个消息,实践中可能不会这么用,另外kotlin-coroutines还提供了Channel类似于golang的channel,我们可以把消息总线以channelstream的方式来表达。

1
2
3
4
5
6
7
8
9
10
11
val adapter = vertx.receiveChannelHandler<Message<Int>>()
vertx.eventBus().localConsumer<Int>("a.b.c").handler(adapter)
// Send 15 messages
for (i in 0..15) vertx.eventBus().send("a.b.c", i)
// Receive the first 10 messages
for (i in 0..10) {
val message = adapter.receive()
println("Received: ${message.body()}")
}

这里的vertx.receiveChannelHandler是kotlin的方法扩展,也就是说直接附加到Vertx这个类里的,且不需要继承或者重写Vertx接口。不得不说这样的扩展也方便也很优雅。这个方法也很简单,就是把Vertx的ReadStrem接口通过kotlin-cortouines的ReceiveChannel包装了一下,使之变成同步。
然后我们将这个adapter变量以handler的参数传给consumer就可以了。跳过send方法不看,我们看到通过adapterreceive()方法,我们可以把消息流看成一个迭代器,直接遍历即可,是不是很符合我们之前的代码风格跟习惯呢。
这里我们一次只遍历10个,实践中我们可以再加一个定时器,定时从消息流里取数据,或者直接while(true),这个取决于具体项目与你的用法。
coroutines,还提供了包装Rx以及FutureAPI的功能,vert.x里默认有自己的Future实现,我们也提供了相应的转换,而且非常简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val httpServerFuture = Future.future<HttpServer>()
vertx.createHttpServer()
.requestHandler { req -> req.response().end("Hello!") }
.listen(8000, httpServerFuture)
val httpServer = httpServerFuture.await()
println("HTTP server port: ${httpServer.actualPort()}")
val result = CompositeFuture.all(httpServerFuture, httpServerFuture).await()
if (result.succeeded()) {
println("The server is now running!")
} else {
result.cause().printStackTrace()
}

上面的例子其实只有一个重点,就是httpServerFuture.await()。前面的代码我们很熟悉,定义了一个Future,然后接收随后的Future值,通过setHandler()方法,但这样也是需要传一个回调handler的。这里通过扩展Future的方法Future.await直接将结果以同步的方式返回出来,瞬间变得简单多了。同理一样可以用于CompositeFuture的结果处理。

另外大家可能对异常捕获,以及超时处理的方式可能感兴趣,这里再贴一个例子,以解大家之惑。

1
2
3
4
5
6
7
8
9
10
11
launch(vertx.dispatcher()) {
val async = testContext.async()
try {
withTimeout(250) {
awaitEvent<Long> { h -> vertx.setTimer(500, h) }
}
Assert.fail()
} catch (e: CancellationException) {
testContext.assertTrue(Context.isOnEventLoopThread())
}
}

这里的withTimeout(250)是kotlin-croutines提供的方法,vert.x可以无缝兼容,另外因为是同步的书写方式,大家可以直接用try catch去捕获异常,上面的例子就是通过超时机制,产生了一个异常,然后同步的捕获,这与同步的书写方式一样,而且也只发生在vert.x的eventLoop线程里面。

写在最后

这篇主要介绍了Kotlin,以及他的衍生项目coroutines,然后重点讲了vert.x与kotlin-coroutines的结合,而且这个项目也是得到官方支持的,并且也有kotlin-coroutines作者指点加持,所以项目还是比较可靠的。kotlin-coroutines这个项目本身的野心也是很大的,从他对第三方库兼容性来看,目标JVM平台的所有的异步库都想包装一下。从最新的kotlin-coroutines发布来看,还带了actor模型实现以及自己的asynchronousIO库,而且还为了支持Java9做了模块化。所以虽然官方一直说这个库处于experiment,但是作者也在一次演讲中表示完全可以用于生产环境,因为外部API已经稳定,不稳定的是内部API,而这个内部API一般不会对直接用户有影响。所以vert.x可以无缝的升级kotlin-coroutines,且不需要改变API。vertx-lang-kotlin-coroutines项目的测试也很完善,且测试本身也包含了很多使用方法。另外官方也提供很多例子。我上面大部分例子也是引用这里。