深入理解Vert.x Core(2)-Future

异步调用

Vert.x中几乎所有的API都是异步的,并且它们不会阻塞当前所在的线程。异步使得你可以安排做一项任务(调用函数),并且在任务结束后得到通知,在任务处理的阶段(函数处理过程)不必等待它完成,你可以在这个过程中做点别的事情。但是在实现异步API时,必须有一种机制能通知调用者关于这个任务执行的信息。

在Vert.x中,你可以用以下这几种方式完成异步调用操作:

  • Callback
  • Vert.x提供的Future
  • RxJava
  • Coroutines in Kotlin

Callback即回调,这种方式并不难理解,比如利用Vertx实例部署一个Verticle 这种代码就是基于回调的风格:

1
void deployVerticle(String name, Handler<AsyncResult<String>> completionHandler);

但是,在异步调用的Callback里面连续进行异步调用时,很快就会陷入”回调地狱”,你会发现代码非常的难以阅读,假如我们需要链式部署两个MyVerticle1和MyVerticle2,只有在MyVerticle1成功部署后才会去部署MyVerticle2,使用基于回调的方式,下面的代码是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
vertx.deployVerticle(MyVerticle1.class.getName(), res -> {
if (res.succeeded()) {
// MyVerticle1 deploy succeeded...
vertx.deployVerticle(MyVerticle2.class.getName(), ar -> {
if (ar.succeeded()) {
// MyVerticle2 deploy succeeded...
} else {
// MyVerticle2 deploy failed...
}
});
} else {
// MyVerticle1 deploy failed...
}
});

通过Vert.x自己实现的Future,在一定程度上解决了代码阅读性的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
Future<String> future1 = Future.future();
vertx.deployVerticle(MyVerticle1.class.getName(), future1.completer());
future1.compose(deploymentId -> {
Future<String> future2 = Future.future();
vertx.deployVerticle(MyVerticle2.class.getName(), future2.completer());
return future2;
}).setHandler(ar -> {
if (ar.succeeded()) {
// both verticles are OK
} else {
// one of the verticles is down
}
});

上面这两种方式在Vert.x Core中都有使用,RxjavaCoroutine属于Vert.x ext的高级特性,本文就省略了,但是他们都能使得你的代码阅读性与异步调用功能更加强大。

Future

Future表示一个可能已经完成或者可能还未完成的操作的结果。
概览一下与Future相关的类与接口:

ShowImage

那么先从Future接口开始吧,看一下结构

ShowImage

Future的创建

Future接口提供一组工厂方法

static <T> Future<T> future(Handler<Future<T>> handler)
static <T> Future<T> future()
static <T> Future<T> succeededFuture()
static <T> Future<T> succeededFuture(T result)
static <T> Future<T> failedFuture(Throwable t)
static <T> Future<T> failedFuture(String failureMessage)

这里就不多介绍每种工厂方法的用法了,直接看实现,他们都是通过factory工厂提供的。
找到初始化factory的代码
FutureFactory factory = ServiceHelper.loadFactory(FutureFactory.class);
FutureFactory是Vert.x提供的Future工厂的SPI接口,具体的实现类在$CLASSPATH/resources/MATA-INF/services/io.vertx.core.spi.FutureFactory中指定,可以知道指定的是io.vertx.core.impl.FutureFactoryImplServiceHelper类是一个帮助加载工厂的Helper类,在Vert.x 3.0之后,默认使用了flat classpath的类加载机制,如果在不指定isolation Groups的情况下,每个verticle的类加载器和vertx instance的类加载器是相同的。

现在继续看FutureFactoryImpl类,没有什么特别的地方,就是按照接口进行new对象操作并返回对应的future,每种Future等会再说。

Future的使用

从同步阻塞操作–>响应式

假设我们现在要部署一个Verticle,并且在部署成功后输出deploymentID,
先来看这样一段代码:

1
2
3
4
5
6
7
Future<String> future = Future.future();
vertx.deployVerticle(MyVerticle1.class.getName(), future.completer());
while (!future.isComplete()) {
// wait for the future to complete
System.out.println("future is not complete");
}
System.out.println("Future result: " + future.result());

要部署成功后输出这个deploymentID,用同步阻塞思维肯定是等这个verticle部署完以后再去输出ID值,也就是说在部署的这个过程中我们都在等待,这种方式是一种拉(pull)的方式,就是调用者找执行者要执行结果,调用者处于主动状态,执行者处于被动状态。

而在响应式编程中,恰好相反,调用者处于被动状态,执行者处于主动状态,执行结果有了之后,由执行者告诉调用者执行完了,这个时候调用者就可以用执行结果做一些他本来需要做的事情了,这种方式是一种推(push)的方式。看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
Future<String> future = Future.future();
vertx.deployVerticle(MyVerticle1.class.getName(), future.completer());
future.setHandler(new Handler<AsyncResult<String>>() {
@Override
public void handle(AsyncResult<String> ar) {
if (ar.succeeded()) {
System.out.println(ar.result());
} else {
// fail
}
}
});

用Lambda表达式替换Handler匿名内部类

1
2
3
4
5
6
7
future.setHandler(ar -> {
if (ar.succeeded()) {
System.out.println(ar.result());
} else {
// fail
}
});

这样就是我们常见的用法了。Future 其实就相当于调用者在执行者处做了一次贷款,只不过调用者在执行者那边做完一些必要的工作之前暂时还是拿不到钱,不过执行者此时给了调用者一个贷款账单,而调用者此时可以拿着这个贷款账单计划好做一些事情,比如钱怎么用等等…一旦执行者那边任务执行完了那么就会通知调用者,这个时候调用者就会按照之前计划好的把这些钱花掉。

FutureImpl

result(), cause(), succeeded(), failed()

这几个方法都继承自AsyncResult接口,都是读取Future对象对应的内部状态result,throwable,succeeded,failed并返回。

isComplete()

表示future是否完成,succeededfailed默认都为false

1
2
3
public synchronized boolean isComplete() {
return failed || succeeded;
}

completer()

可以看到Future接口还继承了Handler<AsyncResult<T>>接口,因此可以把Future当作Handler使用,而Vert.x提供了completer()方法返回自身的引用,这样你就不用在参数处去写if,else来判断回调了。比如下面的示例代码,在HTTP服务器监听的回调参数就直接将future自身传递进去httpServerFuture.completer()

1
2
3
4
5
6
7
8
9
10
11
12
13
Vertx vertx = Vertx.vertx();
Future<HttpServer> httpServerFuture = Future.future();
HttpServer httpServer = vertx.createHttpServer();
httpServer
.requestHandler(req -> req.response().end("Enjoy Vert.x!"))
.listen(8080, "localhost", httpServerFuture.completer());
httpServerFuture.setHandler(ar -> {
if (ar.succeeded()) {
System.out.println("HTTP Server is up!");
} else {
System.out.println("HTTP Server failed to start!");
}
});

tryComplete()

直接看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public boolean tryComplete(T result) {
Handler<AsyncResult<T>> h;
synchronized (this) {
if (succeeded || failed) {
return false;
}
this.result = result;
succeeded = true;
h = handler;
}
if (h != null) {
h.handle(this);
}
return true;
}

tryComplete(T result)会尝试给当前这个future设置一个result,先判断这个future是否已经完成了,如果已经完成了,那么直接返回false,这次tryComplete操作就结束了。如果future还没完成,就将参数result设置成该future的result,并且将布尔值succeeded置为true,拿到future对应的handler,如果handler不为null,那么就通知这个handler,future已经完成了,该handler做后面的工作了(回调)。并且最后返回true。

tryComplete()重载方法就是直接调用tryComplete(null)

tryFail()

tryFail()主要的逻辑跟tryComplete()类似,目的是尝试给当前这个future设置一个failure。不同之处在于是将future的result的值置为传入参数的throwable对象,并且将置布尔值failed设为true。

complete(), fail()

这两个方法都有重载方法,它们的作用都是设置结果或者错误,在已经了解tryComplete()tryFail()之后,这两个方法都很简单,看下逻辑就知道了。

1
2
3
4
5
6
@Override
public void complete(T result) {
if (!tryComplete(result)) {
throw new IllegalStateException("Result is already complete: " + (succeeded ? "succeeded" : "failed"));
}
}
1
2
3
4
5
6
@Override
public void fail(Throwable cause) {
if (!tryFail(cause)) {
throw new IllegalStateException("Result is already complete: " + (succeeded ? "succeeded" : "failed"));
}
}

因为tryComplete()tryFail()如果该future已经完成的话,那么它们均返回false,否则返回true,而complete()fail()就是完全按前面两个方法逻辑来的。

setHandler()

这个方法前面用过好几次了,就是为你的future的结果设置一个handler,通俗点讲就是设置在任务结束之后将执行的handler。

1
2
3
4
5
6
7
8
9
10
11
public Future<T> setHandler(Handler<AsyncResult<T>> handler) {
boolean callHandler;
synchronized (this) {
this.handler = handler;
callHandler = isComplete();
}
if (callHandler) {
handler.handle(this);
}
return this;
}

看下代码逻辑,就是设置好handler之后,如果future执行完之后,那么立即就执行handler。

handle()

handle()方法就是将future通过参数asyncResult设置出一个结果(成功或失败)。

1
2
3
4
5
6
7
8
@Override
public void handle(AsyncResult<T> asyncResult) {
if (asyncResult.succeeded()) {
complete(asyncResult.result());
} else {
fail(asyncResult.cause());
}
}

现在,我们再回头去看SucceededFutureFailedFuture就十分简单了。

Future的操作符

compose()

这里有两个重载方法,分别是
default <U> Future<U> compose(Handler<T> handler, Future<U> next)
default <U> Future<U> compose(Function<T, Future<U>> mapper)
方法①的作用: 调用compose()方法的future完成的时候,接着去调用handler回调,如果成功就将next这个future也置为完成状态。看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
default <U> Future<U> compose(Handler<T> handler, Future<U> next) {
setHandler(ar -> {
if (ar.succeeded()) {
try {
handler.handle(ar.result());
} catch (Throwable err) {
if (next.isComplete()) {
throw err;
}
next.fail(err);
}
} else {
next.fail(ar.cause());
}
});
return next;
}

2-15行表示设置第一个future的handler,12-14行表示如果第一个future失败了,那么就会将next这个future也置为失败状态。6-11行表示如果参数中的那个handler失败了,也会将next这个future置为失败。

方法②的作用:调用compose()方法的future完成的时候,执行参数中的function,然后返回一个future,只有当这个返回的future完成的时候,这次compose操作才算完成。看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
default <U> Future<U> compose(Function<T, Future<U>> mapper) {
if (mapper == null) {
throw new NullPointerException();
}
Future<U> ret = Future.future();
setHandler(ar -> {
if (ar.succeeded()) {
Future<U> apply;
try {
apply = mapper.apply(ar.result());
} catch (Throwable e) {
ret.fail(e);
return;
}
apply.setHandler(ret);
} else {
ret.fail(ar.cause());
}
});
return ret;
}

6-19行表示设置前面一个future的handler,9-15行设置mapper中future的对应的handler。两个future必须都能完成才不会使得compose失败。

map()

map()一共有三个重载方法

default <U> Future<U> map(Function<T, U> mapper)

default <V> Future<V> map(V value)

default <V> Future<V> mapEmpty()

mapper 这个Function 接受调用的future的result value作用input参数,执行转换操作后,返回一个包含转换后值的future

②将调用的future的result value传入进行转换并得到一个新的future

③将调用的future的result value传入得到新的future,并且result为null

recover()

default Future<T> recover(Function<Throwable, Future<T>> mapper)

recover() 将一个失败的future通过mapper进行操作,如果mapper中得到的是一次成功的future,那么recover() 得到的就是这个mapper返回的新的成功的future。

otherwise()

default Future<T> otherwise(Function<Throwable, T> mapper)

default Future<T> otherwise(T value)

default Future<T> otherwiseEmpty()

otherwise()recover() 操作有些类似,只不过recover() 的mapper返回类型是Future<T>, 而otherwise() 的返回类型直接就是T ,其他的逻辑是类似的,如果调用的future成功,就直接使用该future的result value,如果失败了,那么就使用mapper的result value。②是在失败情况下使用参数T 的value,③是使用null作为失败时的result value。

CompositeFuture

CompositeFuture 接口提供了一些静态工厂方法:

static CompositeFuture all(List<Future> futures)

static CompositeFuture any(List<Future> futures)

static CompositeFuture join(List<Future> futures)

all() 方法是指所有Future都成功时,才返回一个成功的CompositeFuture ,任意一个Future失败时就会返回一个失败的CompositeFuture

any() 方法是指所有Future之中任意一个成功时,就返回一个成功的CompositeFuture,不会去等待其他Future。只有所有Future都失败时,才会返回一个失败的CompositeFuture

join() 方法同样也是在所有Future都成功时才返回一个成功的CompositeFuture ,但是它与all() 的区别是任何一个Future失败时,它会等所有Future都完成后才会再返回一个失败的CompositeFuture

join()

下面以join() 为例,详细剖析join() 的实现。

我们先看join是怎么使用的,下面是一段调用代码

1
2
3
4
5
6
7
8
9
Future<String> future1 = Future.future();
Future<Integer> future2 = Future.future();
CompositeFuture.join(future1, future2).setHandler(ar -> {
System.out.println(ar.result().list());
});
future1.complete("Vert.x!");
future2.complete(666);

这段代码运行之后,控制台会输出[Vert.x!, 666] ,现在来一步步看CompositeFutureImpl 源码,看下这个join()到底是如何运作的。

首先,静态工厂方法join() 会调用CompositeFutureImpl 类的公有静态方法join(Future<?>... results) ,这个公有静态方法又会去调用类中的私有静态方法join(Function<CompositeFuture, Throwable> pred, Future<?>... results)。传入的第一个参数是ALL , 这个ALL就是个Function,接受一个CompositeFuture进入,返回一个Throwable,继续看实现。

1
2
3
4
5
6
7
8
9
private static final Function<CompositeFuture, Throwable> ALL = cf -> {
int size = cf.size();
for (int i = 0;i < size;i++) {
if (!cf.succeeded(i)) {
return cf.cause(i);
}
}
return null;
};

这个CompositeFuture#succeeded(int index)表示Composite对应index的future是否成功,那么这段代码就很明确是干什么了,对CompositeFuture中每个future进行检查,只要有一个不成功就会返回该future的异常,只有所有future都成功了,这个Function才会返回null。

回到join()私有静态方法,看它的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static CompositeFuture join(Function<CompositeFuture, Throwable> pred, Future<?>... results) {
CompositeFutureImpl composite = new CompositeFutureImpl(results);
int len = results.length;
for (int i = 0; i < len; i++) {
results[i].setHandler(ar -> {
// 简化逻辑
// CompositeFuture每个Future回调对应的handler
});
}
if (len == 0) {
composite.setCompleted(null);
}
return composite;
}

注意这里将每个future设置handler的部分简化掉了,这样便于理解,因为这个时候这些handler还不会去执行,我们先看主干部分,发现没有什么特别的,就是会给每个future设置一个handler。

假设这个时候我们的future1完成了,现在触发它对应的handler了,来看之前简化的那部分逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
results[i].setHandler(ar -> {
Handler<AsyncResult<CompositeFuture>> handler = null;
synchronized (composite) {
composite.count++;
if (!composite.isComplete() && composite.count == len) {
// Take decision here
Throwable failure = pred.apply(composite);
handler = composite.setCompleted(failure);
}
}
if (handler != null) {
handler.handle(composite);
}
});

future1对这个composite进行同步操作,确保不会被多个future同时更改,count由于初始化后值为0,这里composite.count++后变为1,由于count并不等于len,而且handler也为空,因此这次handler回调就执行完了。

注:看完后与all()或者any()对比,不同之处在于前者任一future一旦出现错误,就会将整个CompositeFuture置为失败并不去执行其他future的handler了,而这一不同特性在这里就体现了,join()每一次循环完不管future结果都会去执行下一个future的handler。any()在这一块也是同样的道理。

到了future2变成完成状态时,还是执行handler部分,这次不同的是,count这时的值与len相同了,这意味着所有的future都执行到了,这个时候就会去判断是否有失败的future,通过Throwable failure = pred.apply(composite);这一行代码,而我们实例代码的future1future2都成功了,因此failure被设置为null。
接着执行handler = composite.setCompleted(failure);,看下setCompleted()的实现

1
2
3
4
5
6
7
8
9
10
private Handler<AsyncResult<CompositeFuture>> setCompleted(Throwable cause) {
synchronized (this) {
if (completed) {
return null;
}
this.completed = true;
this.cause = cause;
return handler != null ? handler : NO_HANDLER;
}
}

completed被设为true,cause这时被设为null,由于handler不为空(我们通过setHandler()方法设置了打印结果),返回的就是我们设置的handler。

回去之前join()里面回调的地方,接着往下走,由于handler不为空,那么现在就执行它,结果就是控制台打印出我们future结果的list了。

由于all()any()相对join()来说逻辑更简单一些,这里就省略了,但是基本思路是差不多的,了解它们的目的后对照join()源码不难理解。

小结

Rxjava和Coroutine是Vert.x解决异步操作的高级特性,但是Future与Callback是Vert.x内部使用的机制,尽管将来有可能被JDK的CompletableFuture替代,仍然有必要进行更详细的了解。

------ 本文结束 ------

版权声明


BillyYccc's blog by Billy Yuan is licensed under a Creative Commons BY-NC-SA 4.0 International License.
本文原创于BillyYccc's Blog,转载请注明原作者及出处!