深入理解Vert.x Core(5)-Vertx(II)

CloseHooks机制

CloseHooks是Vert.x中一项内部使用的特性,在Vertx实例或者Verticle关闭时,CloseHooks能实现自动清理(比如HTTP Server,Net Server…)的功能。

我们了解下CloseHooks实现的大概流程。

在我们的Verticle或者Vertx实例部署好以后,我们通常会使用vertx.createHttpServer()去部署一些HTTP Server或者Net Server,Net Client等等。这些都是可以在关联的Verticle或者Vertx实例关闭之后实现自动清理的资源。
下面以在Verticle上的Net Server为例
①通过vertx.createNetServer()创建一个NetServer,返回一个NetServerImpl实例
②在NetServerImpl构造器中,会将当前Verticle所在Context hook上这个NetServer。

1
2
3
4
5
6
7
8
9
public NetServerImpl(VertxInternal vertx, NetServerOptions options) {
this.vertx = vertx;
// ...
this.creatingContext = vertx.getContext();
if (creatingContext != null) {
// ...
creatingContext.addCloseHook(this);
}
}

NetServerImpl实现了Closeable接口,Closeable接口只有一个close方法。

ContextaddCloseHook()方法就是将实现了Closeable接口的资源添加到自己内部类CloseHookscloseHooks的这个Set中。

VerticleunDeploy()以后,会将自己持有的Context对象的CloseHooks对象执行run()方法,就是对每一个Set中Closeable对象调用close()方法,这样就能实现自动清理所有hook上的资源的功能。

这种机制其实是一种观察者模式的实现,VerticleContext或者Vertx实例是被观察者,可以添加观察者Closeable,在订阅的关系形成之后,一旦Verticle或者Vertx关闭,那么就会通知观察者(可以关闭的资源)去执行它们自己的close()方法,这样就可以实现自动清理的机制。

周期任务与延时任务机制

Vert.x提供了一些API用来执行周期任务或者延时任务,在Vertx接口中定义好了以下API

1
2
3
4
5
6
7
8
9
long setTimer(long delay, Handler<Long> handler);
TimeoutStream timerStream(long delay);
long setPeriodic(long delay, Handler<Long> handler);
TimeoutStream periodicStream(long delay);
boolean cancelTimer(long id);

我们可以使用这些API非常简单的执行一些延时任务、周期任务或者取消任务的操作。

实现

setTimer()与setPeriodic()

VertxImpl类中找到setTimer()setPeriodic()方法的实现。

1
2
3
4
5
6
7
public long setPeriodic(long delay, Handler<Long> handler) {
return scheduleTimeout(getOrCreateContext(), handler, delay, true);
}
public long setTimer(long delay, Handler<Long> handler) {
return scheduleTimeout(getOrCreateContext(), handler, delay, false);
}

这两个方法都是调用scheduleTimeout()方法,接着看这个方法的实现

1
2
3
4
5
6
7
8
9
10
private long scheduleTimeout(ContextImpl context, Handler<Long> handler, long delay, boolean periodic) {
if (delay < 1) {
throw new IllegalArgumentException("Cannot schedule a timer with delay < 1 ms");
}
long timerId = timeoutCounter.getAndIncrement();
InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, delay, context);
timeouts.put(timerId, task);
context.addCloseHook(task);
return timerId;
}

这个方法返回的是我们创建的任务的timerId,这个id可以用在cancelTimer()方法取消该任务。ID是通过原子long类型变量timeoutCounter生成,每次新增一个任务时,这个原子变量就会取值并且自增1。然后创建一个InternalTimerHandler对象,我们马上会介绍这个类。id与task对象组成的kv会被添加到timeouts这个Map中,并且context会增加一个属于task的CloseHook。

InternalTimerHandler

InternalTimerHandlerVertxImpl的内部类,实现了HandlerCloseable接口。我们可以看下这个类的结构

ShowImage

handle()

由于实现了Handler,那么就会有handle()方法,我们看handle()代码

1
2
3
4
5
6
7
8
9
10
11
12
public void handle(Void v) {
if (!cancelled.get()) {
try {
handler.handle(timerID);
} finally {
if (!periodic) {
// Clean up after it's fired
cleanupNonPeriodic();
}
}
}
}

这段代码不复杂,就是检查当前任务是否被取消,如果没有取消,就去执行持有的handler的逻辑,这个handler对象是在构造器传入的,通过this.handler = runnable;将我们最开始在调用API时写的handler保存到这个对象的handler对象中,handler执行完以后如果不是周期任务就执行cleanupNonPeriodic清理。

Constructor

我们注意到之前传递给InternalTimerHandler构造器参数包括了任务ID,任务要执行的Handler,是否为周期任务的布尔值,执行间隔(延时),以及调用的context。我们直接看构造器中代码

1
2
3
4
5
6
7
EventLoop el = context.nettyEventLoop();
Runnable toRun = () -> context.runOnContext(this);
if (periodic) {
future = el.scheduleAtFixedRate(toRun, delay, delay, TimeUnit.MILLISECONDS);
} else {
future = el.schedule(toRun, delay, TimeUnit.MILLISECONDS);
}

通过调用的context获取关联的EventLoop(Netty),将context.runOnContext(this)这个方法封装成一个Runnable对象,this指的就是自己InternalTimerHandler对象,因为这个对象也是一个Handler对象,然后将Runnable对象提交给EventLoop的任务队列处理,如果是周期任务就使用scheduleAtFixedRate()提交,延时任务就使用schedule()提交。这些任务提交后的返回值是一个ScheduledFuture对象,保存在future对象中。

cancel()

cancel()方法就是通过刚才保存的future对象来控制取消,可以调用Futurecancel()方法来取消任务。

cancelTimer

cancelTimer(long id) 这个API就是委托InternalTimerHandler对象的cancel()方法完成取消任务。

1
2
3
4
5
6
7
8
9
public boolean cancelTimer(long id) {
InternalTimerHandler handler = timeouts.remove(id);
if (handler != null) {
handler.context.removeCloseHook(handler);
return handler.cancel();
} else {
return false;
}
}

timerStream与periodicStream

除了直接创建任务之外,还可以使用提供的API创建一个延时流或者周期流。注意这个流只能执行一个你指定的任务(延时任务或者周期任务),但是你可以对流进行一些常用的操作。

开启TimeoutStream的代码,返回的是一个TimeoutStreamImpl对象。

1
2
3
4
5
6
7
8
9
@Override
public TimeoutStream timerStream(long delay) {
return new TimeoutStreamImpl(delay, false);
}
@Override
public TimeoutStream periodicStream(long delay) {
return new TimeoutStreamImpl(delay, true);
}

TimeoutStreamImpl类实现了TimeoutStream接口与Handler<Long>接口。`

TimeoutStream继承了ReadStream接口,提供一些常用的流操作的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
TimeoutStream exceptionHandler(Handler<Throwable> handler);
@Override
TimeoutStream handler(Handler<Long> handler);
@Override
TimeoutStream pause();
@Override
TimeoutStream resume();
@Override
TimeoutStream endHandler(Handler<Void> endHandler);
void cancel();

handle()

我们看handle()方法,如果流没有停止,那么就会使用该对象持有的handler去执行逻辑,如果是延时任务并且注册了endHandler,那么执行完handler后还会执行endHandler。

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public synchronized void handle(Long event) {
try {
if (!paused) {
handler.handle(event);
}
} finally {
if (!periodic && endHandler != null) {
endHandler.handle(null);
}
}
}

handler()

handler()方法为这个TimeoutStream注册一个handler,当触发了时间事件时(延时或者周期),这个注册的handler就会执行。如果注册的handler是null,那么就取消这个任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public synchronized TimeoutStream handler(Handler<Long> handler) {
if (handler != null) {
if (id != null) {
throw new IllegalStateException();
}
this.handler = handler;
id = scheduleTimeout(getOrCreateContext(), this, delay, periodic);
} else {
cancel();
}
return this;
}

看实现就知道一个TimeoutStream是不可以注册多个任务的,会抛出IllegalStateException。设置任务的工作还是交给了scheduleTimeout()这个方法去处理。和之前有点不同的是传入参数handler是TimeoutStreamImpl对象,通过多态机制TimeoutStreamImpl在这里扮演了handler的角色。

TimeoutStreamImpl上面写了一些注释

This class is optimised for performance when used on the same event loop that is was passed to the handler with. However it can be used safely from other threads. The internal state is protected using the synchronized keyword. If always used on the same event loop, then we benefit from biased locking which makes the overhead of synchronized near zero.

尽管大量使用了synchronized 关键字,但由于JDK1.6后偏向锁的优势,如果总是在一个EventLoop线程上,而该EventLoop线程如果没有被其他线程抢占锁,这时偏向锁是偏向模式,那么这个线程再进行请求锁时,就不用再一次进行同步操作,这样开销会降低从而避免了性能由于重量级锁急剧下降,同时偏向锁也能保证程序的线程安全执行。

Netty任务队列机制

上面说到了我们的延时任务或者周期任务都是通过提交给EventLoop 实现的,下面以NioEventLoop为例介绍Netty中的任务队列的机制。

我们先大致看一下NioEventLoop的结构

ShowImage

可以看到层次非常多,我们针对任务队列自底向上进行分析。

execute()执行任务

execute()方法之前在vert.x context接触过,就是将一个任务封装成Runnable对象提交到任务队列。execute()方法在Executor接口中定义,我们在SingleThreadEventExecutor找到它的实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

实际上execute()方法只是执行了addTask(task)添加任务。

添加任务代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}

我们的任务都被通过offer()操作提交到队列taskQueue中了,而这个任务队列的定义是private final Queue<Runnable> taskQueue;,任务队列维护着一组Runnable对象。

而在一次NioEventLoop循环中,即无限循环run()方法逻辑中,调用runAllTasks()方法去执行任务队列中的任务,而这些任务就是通过pollTaskFrom(taskQueue)从任务队列中取出Runnable对象,最后调用safeExecute(task); 直接run()执行。这些从任务队列取出任务去执行的逻辑即runAllTasks()都是在SingleThreadEventExecutor类中实现的。

schedule调度执行任务

EventLoop中,由于NioEventLoop实现了ScheduledExecutorService接口,因此有该接口的调度任务的方法。

1
2
3
4
5
6
7
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

这几个调度执行任务的方法都是可以在EventLoop上调用的。Schedule具体的实现是在AbstractScheduledEventExecutor中完成的。下面我们以Vert.x中的周期任务与延时任务为例分析scheduleAtFixedRate()schedule()的实现。

添加任务

我们关注上面调度方法①与④的实现部分,在AbstractScheduledEventExecutor 中找到对应实现。

方法①的返回值如下

1
2
return schedule(new ScheduledFutureTask<Void>(
this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));

方法④的返回值如下

1
2
3
return schedule(new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(command, null),
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));

它们都返回了一个接受参数ScheduledFutureTask 的重载方法,我们先看重载方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}

这个重载方法就是将刚才传入的ScheduledFutureTask对象加入到调度任务队列scheduledTaskQueue中,并且返回这个任务。注意到Netty中很多方法都是命令与查询不分离的,即方法不仅仅是返回结果,还有对结果的操作,这个重载方法就干了命令与查询两件事。

ScheduledFutureTask

接下来我们看一看刚才加入到调度任务队列中的ScheduledFutureTask是干什么用的。
我们先从构造器开始,先看周期任务创建的ScheduledFutureTask调用的构造器

1
2
3
4
5
6
7
8
9
10
11
ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Callable<V> callable, long nanoTime, long period) {
super(executor, callable);
if (period == 0) {
throw new IllegalArgumentException("period: 0 (expected: != 0)");
}
deadlineNanos = nanoTime;
periodNanos = period;
}

首先会接收一个AbstractScheduledEventExecutor对象作为第一个参数,指定了该任务执行的Executor,然后是一个Callable对象,这个Callable对象是将我们提交的Runnable对象通过Executors.<Void>callable(command, null)转换而得,这个command代表我们需要执行的任务,而null值代表结果(由于Runnable执行无法直接获取结果,转换成Callable后用null代替结果)。第三个参数是该周期任务下一次执行(也有可能是第一次)的开始时间,在调用处使用ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay))静态方法计算而得,第四个参数是周期任务的周期时间,注意时间的单位都会被转换成纳秒,并且用nanoTime方式表示。

我们可以看到ScheduledFutureTask的作用,就是将一个调度任务执行的Executor,执行时间,周期信息保存在该对象中。

同样的,对于定时任务创建ScheduledFutureTask

1
2
3
4
5
6
7
8
ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Callable<V> callable, long nanoTime) {
super(executor, callable);
deadlineNanos = nanoTime;
periodNanos = 0;
}

第一个参数还是一样,表示该定时任务执行的Executor,Callable对象通过toCallable(runnable, result)转换而得,第三个参数是定时任务执行的实际nanoTime时间,周期时间当然是0,这些信息同样也被保存在ScheduledFutureTask对象中。

现在我们了解了ScheduledFutureTask的作用,接下来分析执行过程。

执行调度任务

回到NioEventLoop 类的循环run() 方法,注意到任务队列处理的地方,会根据ioRatio 进行判断,如果ioRatio 是100,那么就执行runAllTasks() 无参方法;否则就执行runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 方法。

我们先看runAllTasks() 无参方法,选取关键部分代码

1
2
3
4
5
6
do {
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

在分析上面代码之前,我们先注意fetchFromScheduledTaskQueue() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}

fetchFromScheduledTaskQueue() 方法就是将调度任务队列的任务抽取出来,然后加入到普通任务队列中,如果普通任务队列满了,调度任务就退回原来的调度任务队列,接着继续循环一直到调度任务队列取不出来为止。

这里要特别关注抽取方法pollScheduledTask() 方法,因为这个方法就是用来确定我们的调度任务是否满足时间触发条件的,在这个方法中会将要取出的调度任务的下一次运行时间(定时运行时间) 与当前时间进行比较,如果满足了时间条件,那么才会把这个调度任务从队列中取出。而我们的调度任务中已经保存了周期任务的下一次执行时间(或者定时任务的定时时间)的信息。

1
2
3
4
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}

然后回到之前runAllTasks() 方法,现在知道了fetchedAll 表示调度任务队列中可以取出任务是否全部被取出,而上面代码的目的就是将调度任务队列所有能取出的任务放入普通任务队列中,然后runAllTasksFrom(taskQueue) 执行所有任务。

还有一点要想起的是,我们的周期任务是怎么周期运行的呢?还要回到ScheduledFutureTask 类,找到run() 方法,可以见到如果周期时间periodNanos被设置为0,那么这个任务就是个定时任务,那么仅仅执行task.call() 一次即可;而如果周期时间不为0,那么任务就是周期任务了,这个时候除了执行task.call(),还会修改deadlineNanos,把下一次执行时间累加上这个周期任务的周期时间,接着把修改好的任务再插入到调度任务队列中去,这样我们的周期任务也能周期执行了,可以对照下面的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (!isCancelled()) {
task.call();
if (!executor().isShutdown()) {
long p = periodNanos;
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = nanoTime() - p;
}
if (!isCancelled()) {
// scheduledTaskQueue can never be null as we lazy init it before submit the task!
Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
assert scheduledTaskQueue != null;
scheduledTaskQueue.add(this);
}
}
}

再来分析ioRatio 不为100的情况,即调用runAllTasks(long timeoutNanos) 方法。基本的逻辑都一样,首先将满足条件的调度任务从调度任务队列取出到普通任务队列,接着执行普通任务。
不过这里多了一步检查的逻辑,它会对每个执行过的任务进行计数,每当执行的任务计数到达64时(用位与来实现),就会将执行这些任务的总时间与分配的时间进行对比,如果超出分配时间了,那么这次循环就不再执行任务队列中的任务,这个分配的时间是由io事件执行时间(processSelectedKeys()这个方法执行时间)与ioRatio计算所得,代码ioTime * (100 - ioRatio) / ioRatio 比较直观。需要提醒的是,ioRatio 被设置为100的时候,意味着我们取消了超时检测的机制,这时EventLoop会处理所有当前任务队列中的任务。

小结

在本文中一开始介绍了Vert.x的CloseHooks 的机制,接着对Vert.x周期任务,定时任务的源码进行了探索,还对Netty中事件循环的任务调度处理机制进行了仔细的分析。现在我们应该明白了Vert.x中周期任务与定时任务的原理。

------ 本文结束 ------

版权声明


BillyYccc's blog by Billy Yuan is licensed under a Creative Commons BY-NC-SA 4.0 International License.
本文原创于BillyYccc's Blog,转载请注明原作者及出处!