深入理解Vert.x Core(4)-Vertx(I)

创建Vertx实例

官方文档上面写着这样一句话

If you’re embedding Vert.x then you simply create an instance as follows:
Vertx vertx = Vertx.vertx();

我们在使用Vert.x的时候一般将应用封装成多个Verticle,指定好MainVerticle并使用官方提供的Launcher,通过fat-jar打包,然后直接使用java -jar -xxx app-fat-jar-1.0.jar 执行,但是嵌入Vert.x使用的时候,比如你写个Main 方法去调用Vert.x的API,这种就属于嵌入使用,这时候就需要手动创建Vert.x instance。

那么在Vertx vertx = Vertx.vertx(); 执行时到底发生了什么呢?

查看Vertx 接口的静态工厂方法,跟Future.future()有些类似

1
2
3
static Vertx vertx() {
return factory.vertx();
}

找到factory的初始化地方
VertxFactory factory = ServiceHelper.loadFactory(VertxFactory.class);
Future一样,也是通过SPI接口暴露服务,找到真正的工厂实现类VertxFactoryImpl,返回的是VertxImpl实例。

VertxImpl

VertxImpl可以说是Vert.x的大脑,Vert.x的组件都跟这个类有着联系。

Context相关

我们在这篇文章 中已经讲过了Context以及一些场景,在VertxImpl 类中,封装了一些创建或者获取Context的接口。

Vertx.currentContext()

Vertx接口中提供了静态方法currentContext(),调用的是工厂的context()方法,而默认的VertxFactoryImpl实现类中context()即是VertxImpl的静态方法context()。看这个方法的实现

1
2
3
4
5
6
7
public static Context context() {
Thread current = Thread.currentThread();
if (current instanceof VertxThread) {
return ((VertxThread) current).getContext();
}
return null;
}

只有两种情况,当前线程如果为Vertx线程,那么就返回当前线程的Context,如果不是Vertx线程就返回null。

getContext()

VertxImpl中有实例方法getContext(),和上面讲的context()不同,context()方法只是获取当前线程的context,而getContext()首先会调用context()获取当前线程的context,并且保证这个context是关联调用getContext()方法的Vertx实例的,否则就返回null。

createEventLoopContext()

这个方法包含四个参数,第一个就是Verticle的deploymentID,第二个是给EventLoopContext执行阻塞任务用的workerPool,第三个参数就是一个自定义的Json config,最后一个是类加载器。

1
2
3
public EventLoopContext createEventLoopContext(String deploymentID, WorkerPool workerPool, JsonObject config, ClassLoader tccl) {
return new EventLoopContext(this, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deploymentID, config, tccl);
}

internalBlockingPool会使用当前Vertx实例的internalBlockingPool。而如果传入参数workerPool为null,那么workerPool也会直接使用当前Vertx实例的workerPool

createWorkerContext()

除了会根据第一个参数判断创建WorkerContext还是MultiThreadedWorkerContextcreateWorkerContext()createEventLoopContext()方法类似。

getOrCreateContext()

直接看代码

1
2
3
4
5
6
7
8
public ContextImpl getOrCreateContext() {
ContextImpl ctx = getContext();
if (ctx == null) {
// We are running embedded - Create a context
ctx = createEventLoopContext(null, null, new JsonObject(), Thread.currentThread().getContextClassLoader());
}
return ctx;
}

这个方法首先会调用getContext()方法,获取当前线程下与这个Vertx实例关联的context,如果不存在,就手动创建一个EventLoopContext

注意传给createEventLoopContext()方法的deploymentID,workerPool都为null,那么这个EventLoopContextworkerPool就是Vertx实例的workerPool

有一点需要注意,就是在context==null的情况下,创建一个context并不会改变任何当前已有的context状态。

runOnContext()

1
2
3
4
public void runOnContext(Handler<Void> task) {
ContextImpl context = getOrCreateContext();
context.runOnContext(task);
}

通过代码知道runOnContext()会使用当前调用这个方法的Vertx实例首先用getOrCreateContext()获取一个ContextImpl实例,然后再用ContextrunOnContext()方法执行task。

举两种典型情况示例分析:

  • Verticle中调用vertx运行这个方法

Verticle中调用,首先getOrCreateContext()会获取到当前这个Verticle绑定的Context,接着就在这个Context上执行task。具体执行的方法在讲Context的时候已经讲过了,这里就不再赘述。

  • 在Main方法中运行这个方法

在Main中运行,首先得手动创建一个Vertx实例,注意当前所在线程为main线程,因此调用getOrCreateContext()方法会创建一个新的EventLoopContext,接着用这个context执行task。

executeBlocking()

这个方法与runOnContext()方法的结构类似,也是将具体要执行的任务委托给获取到的Context或者新创建的Context来完成。

需要注意的是,如果Context是新创建的,那么这个context的workerPool就是它所关联的Vertx实例的workerPool,执行阻塞任务的时候也会交给Vertx实例的workerPool去处理。

Verticle相关

VertxImpl中,如果是标准模式下部署Verticle,这个工作就交给deploymentManager去做了,而如果是HA高可用模式下部署Verticle,就由HaManager部署(其实还是委托给deploymentManager去部署)。registerVerticleFactory()unregisterVerticleFactory()等等与Verticle相关的操作都是由deploymentManager去完成的。

我们在这篇文章 中介绍了deploymentManager部署Verticle的详细过程。

Vert.x线程

Vert.x中主要使用了这两种线程EventLoop以及Worker,下面会介绍Vert.x中的线程。

VertxThread

我们先看VertxThread的结构。

ShowImage

VertxThread继承了Netty中的FastThreadLocalThread,而FastThreadLocalThread的父类就是JDK中的Thread类。布尔值worker表示该线程是EventLoop线程还是Worker线程。VertxThread类提供了获取最大执行时间选项、设置线程当前的context等等方法。executeStart()方法可以获取当前时间并存放在VertxThread#execStart中保存,这个方法可以被用来度量任务执行时间的长短,从而得出任务是否阻塞了线程。executeEnd()用来清理execStart状态,一般在任务结束时调用。startTime()方法就是获取execStart的值。

VertxThreadFactory

接下来分析Vertx线程工厂的结构。

ShowImage

VertxThreadFactory实现了ThreadFactory接口,这个接口中只有一个方法,就是Thread newThread(Runnable r);
进入VertxThreadFactory类中,首先看到prefixthreadCount,前者用来表示这个Vertx线程工厂创建的线程的前缀名,而threadCount就是对创建的线程进行原子计数。它们组合在一起成为新创建的线程的名字
weakMapunsetContext()方法是为了在Verticle卸载时,能将该Verticle绑定的Context所在的线程的所有绑定这个Context相关信息清除。weakMap的Key存放了VertxThread对象,Value是一个无意义的值,当VertxThread对象在其他地方没有引用的时候,weakMap会自动进行清理。

newThread()方法执行了以下几个步骤

1
2
3
4
5
6
7
8
9
10
public Thread newThread(Runnable runnable) {
VertxThread t = new VertxThread(runnable, prefix + threadCount.getAndIncrement(), worker, maxExecTime);
if (checker != null) {
checker.registerThread(t);
}
addToMap(t);
// we want to prevent the JVM from exiting until Vert.x instances are closed
t.setDaemon(false);
return t;
}

  1. 创建新的Vertx线程,线程名字就是上面说的prefix+threadCount的组合
  2. BlockedThreadChecker中注册这个线程
  3. 在弱引用weakMap中存放这个线程
  4. 让这个线程成为用户线程而不是守护线程(daemon)

线程阻塞操作检测机制

在Vertx线程工厂创建Vertx线程时,会用BlockedThreadChecker 对象注册每个新创建的线程。

BlockedThreadChecker 类中,registerThread() 方法将VertxThread 对象添加到弱引用HashMap threads中。而线程阻塞操作的检测逻辑是在构造方法中完成的,循环检测通过java.util.Timer 实现,Timer会运行一个固定间隔时间的task。这个间隔时间就是检测阻塞线程的周期,可以在VertxOptions#setBlockedThreadCheckInterval() 方法中进行设置,间隔时间就是构造器的第一个参数,而构造器的第二个参数是表示异常警告时间,就是如果任务执行时间超过了这个阈值,那么就不只是在Log日志打印信息,还会抛出异常,这个参数也可以在VertxOptions#setWarningExceptionTime() 方法中设置。

检测过程逻辑不复杂,对weakMap 中每个线程进行检查,通过获取当前时间now 减去线程中保留的任务启动时间Vertx#startTime() 得到任务当前正在执行已经占用线程的时间,再进行时间的相关判断逻辑。

VertxImpl 类的初始化过程中,可以看到初始化了阻塞线程checker的代码

1
checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getWarningExceptionTime());

Vertx EventLoop机制

我们都知道在Vert.x中,一个EventLoopContext只会绑定一个EventLoop 线程,而这个EventLoop 就是通过VertxImpl#getEventLoopGroup()#next() 获取到的。我们现在来仔细分析VertxImpl 中的EventLoopGroup是怎么得到的。

首先先看这行代码

1
eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime());

首先会准备好一个Vertx线程工厂,这个工厂的prefix为vert.x-eventloop-thread-,然后是阻塞线程检测器checker参数,worker选项当然是false,还有VertxOptionsEventLoop最大执行时间(超过就发出警告)的选项,待会这个线程工厂会给EventLoopGroup提供线程。

然后看到EventLoopGroup创建的代码

1
eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);

注意到transport.eventLoopGroup()方法,在Netty完成了对Native Transports(包括Epoll以及Kqueue)的支持后,Vert.x 3.5.0也加入了Native Transports的支持。如果在VertxOptions中开启了setPreferNativeTransport(true) 选项,那么Vert.x就会去优先使用Native Transports而不是JDK原生的NIO。VertxImplEventLoopGroup会根据传输方式选择对应的Netty中的EventLoopGroup。如果是Epoll,最后就是EpollEventLoopGroup;如果是Kqueue,就是KQueueEventLoopGroup,如果使用NIO,那么就是NioEventLoopGroup。这些EventLoopGroup的size都可以通过VertxOptions去设置,线程就是由刚才创建的Vertx线程工厂提供。

除此之外,Vert.x提供了一个SystemProperty去修改EventLoopGroup的IO Ratio

这里简单的提一下Netty的线程模型。在Netty的NioEventLoop这个reactor线程中,有两种事件需要处理,一种是IO事件,还有一种是Task任务。IO事件就包括读写操作read,write、connect、close、挂起等等,而Task任务就是我们封装提交给EventLoopRunnable对象。在Netty的NioEventLooprun()方法中,这个方法是无限循环的,大概有3个步骤。

  1. 进行SELECT操作,将所有关联到当前EventLoopChannel上的IO事件轮询出来
  2. processSelectKeys(),处理这些IO事件
  3. runAllTasks(),处理任务队列

这个IO Ratio就是EventLoop一次循环处理IO事件与任务所占时间的比例,也就是第2步与第3步的占比,默认是1:1,即50。

那么我们之前讲过的EventLoopContext的执行executeAsync()方法都是封装成Runnable对象提交给EventLoop,为什么提交的方式是任务队列呢,我们知道不能阻塞EventLoop,所以我们在EventLoopContext中执行的任务都是CPU占用时间短或者非阻塞的操作,通过任务队列的方式提交时,Netty的EventLoop会帮我们将任务队列中的所有任务串行化执行(顺序执行),不会产生并行执行的问题。

现在就能将EventLoopContextVertxImpl中的EventLoopGroup关联起来了,VertxImpl这个大脑为EventLoopContext提供(Netty中的)EventLoop执行自定义Runnable对象(任务)。

Vertx WorkerPool机制

可以看到WorkerPool就是对ExecutorServiceMetrics的封装。

ShowImage

那我们再来看一看VertxImpl中的internalBlockingPoolworkerPool,它们创建的代码如下

1
2
internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics);
workerPool = new WorkerPool(workerExec, workerPoolMetrics);

继续往上看,可以看到ExecutorService的创建过程

1
2
3
4
5
6
ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics workerPoolMetrics = metrics != null ? metrics.createMetrics(workerExec, "worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null;
ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createMetrics(internalBlockingExec, "worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null;

发现WorkerPool的线程池是由Executors.newFixedThreadPool支撑的,这里简单的介绍一下FixedThreadPool

这个线程池创建后会复用一定数量的线程来处理一个共享的无边界任务队列,任何时候都只有固定数量(创建时指定的线程数)的线程处理任务,一旦需要处理的任务数量超过了线程数,那么多余无法处理的任务就会在队列中堆积一直到有可用的线程来处理它们。需要注意的是,这些线程会一直保留在线程池中除非线程池被显式关闭。

现在我们知道VertxOptions中可以设置的workerPoolSize其实就是这个VertxImpl实例对象中FixedThreadPool的线程数量,默认的值是20。internalBlockingPoolSize也是同样的原理。

我们使用deploymentManager部署Verticle 的过程中,会给每个Verticle 创建一个相关联的context,在调用VertxImpl 的三种创建Context方法时,都需要指定internalBlockingPool ,而这个internalBlockingPool 直接就是使用VertxImpl 实例的内部阻塞池。

除此之外,还需要给每种Context指定一个workerPool 。在DeploymentManager#doDeploy() 中,如果不去指定DeploymentOptionsworkerPoolName ,就不会根据名字去获取一个共享的workerExecutor ,那么workerPool 就会为null,所以此时使用VertxImpl 实例的workerPool 。而如果指定了workerPoolName ,就会使用VertxInternal#createSharedWorkerExecutor() 方法为这个Context单独创建一个WorkerExecutor ,而不是用VertxImpl 实例中的workerPool

下面谈一谈Vert.x中的SharedWorkerExecutor

SharedWorkerExecutor

WorkerExecutor是Vert.x中的接口,表示在Vert.x中执行阻塞代码的executor,这个接口提供了executeBlocking()方法,类似Context接口中的同名方法,但是WorkerExecutor会使用一个独立的WorkerPool。先看WorkerExecutor的层次结构:

ShowImage

其实可以看到WorkerExecutorImpl中包裹了一个WorkerPool对象,而executeBlocking()方法就是使用该WorkerExecutor关联的Vertx当前所处的Context下执行executeBlocking()方法,只不过workerPool使用的是这个WorkerExecutor的WorkerPool。

那么我们的SharedWorkerExecutor是如何实现的呢?
VertxImpl中有一个Key为String,Value为SharedWorkerPool的MapnamedWorkerPools,这个Map就是用来保存SharedWorkerPool实例的,字符串Key是我们的SharedWorkerExecutor的名字。SharedWorkerPoolVertxImpl的内部类,在继承WorkerPool的基础上加上了String字段name以及一个引用计数refCount

我们看VertxImpl#createSharedWorkerExecutor()方法代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime) {
if (poolSize < 1) {
throw new IllegalArgumentException("poolSize must be > 0");
}
if (maxExecuteTime < 1) {
throw new IllegalArgumentException("maxExecuteTime must be > 0");
}
SharedWorkerPool sharedWorkerPool = namedWorkerPools.get(name);
if (sharedWorkerPool == null) {
ExecutorService workerExec = Executors.newFixedThreadPool(poolSize, new VertxThreadFactory(name + "-", checker, true, maxExecuteTime));
PoolMetrics workerMetrics = metrics != null ? metrics.createMetrics(workerExec, "worker", name, poolSize) : null;
namedWorkerPools.put(name, sharedWorkerPool = new SharedWorkerPool(name, workerExec, workerMetrics));
} else {
sharedWorkerPool.refCount++;
}
ContextImpl context = getOrCreateContext();
WorkerExecutorImpl namedExec = new WorkerExecutorImpl(this, sharedWorkerPool, true);
context.addCloseHook(namedExec);
return namedExec;
}

创建时首先通过namedWorkerPools这个Map根据名字name去查询是否已经有这个SharedWorkerPool了,如果不存在(即取出结果为null),那么就会重新创建一个FixedThreadPool并用这个线程池去创建一个SharedWorkerPool,然后将这个SharedWorkerPool放入Map中;如果已经存在同名SharedWorkerPool,直接将引用计数+1。最后就是用SharedWorkerPool创建一个WorkerExecutorImpl对象返回。

因此这个SharedWorkerExecutor它们之间通过名字name来共享WorkerPool。

我们可以将WorkerExecutor,WorkerPool,FixedThreadPool以及Context,Verticle等等的关系总结一下,用下面这张图比较直观。

ShowImage

还有一点,就是如果在DeployOptions中只指定workerPoolSize而不指定workerPoolName,那么实际上Context使用的还是VertxImpl中的workerPool,而不是用设置的size重新创建的WorkerPoolworkerPoolSize设置就没有生效,这个size实际上还是VertxImplworkerPool的size。

小结

  1. Vertx线程可以绑定不同的Context,Context只能绑定一个线程。
  2. 不同的Vertx线程工厂为EventLoopGroup以及WorkerPool提供线程。
  3. Vertx对象操纵Verticle的相关工作交给deploymentManager完成。
  4. EventLoopContext中的任务是交给Netty的EventLoop的任务队列串行执行。
  5. WorkerPool底层使用FixedThreadPool实现。
  6. SharedWorkerExecutor是独立的,它们共享的是WorkerPool
------ 本文结束 ------

版权声明


BillyYccc's blog by Billy Yuan is licensed under a Creative Commons BY-NC-SA 4.0 International License.
本文原创于BillyYccc's Blog,转载请注明原作者及出处!