创建Vertx实例
官方文档上面写着这样一句话
If you’re embedding Vert.x then you simply create an instance as follows:
Vertx vertx = Vertx.vertx();
我们在使用Vert.x的时候一般将应用封装成多个Verticle,指定好MainVerticle并使用官方提供的Launcher,通过fat-jar打包,然后直接使用java -jar -xxx app-fat-jar-1.0.jar
执行,但是嵌入Vert.x使用的时候,比如你写个Main
方法去调用Vert.x的API,这种就属于嵌入使用,这时候就需要手动创建Vert.x instance。
那么在Vertx vertx = Vertx.vertx();
执行时到底发生了什么呢?
查看Vertx
接口的静态工厂方法,跟Future.future()
有些类似
找到factory
的初始化地方VertxFactory factory = ServiceHelper.loadFactory(VertxFactory.class);
和Future
一样,也是通过SPI接口暴露服务,找到真正的工厂实现类VertxFactoryImpl
,返回的是VertxImpl
实例。
VertxImpl
VertxImpl
可以说是Vert.x的大脑,Vert.x的组件都跟这个类有着联系。
Context相关
我们在这篇文章 中已经讲过了Context以及一些场景,在VertxImpl
类中,封装了一些创建或者获取Context
的接口。
Vertx.currentContext()
Vertx
接口中提供了静态方法currentContext()
,调用的是工厂的context()
方法,而默认的VertxFactoryImpl
实现类中context()
即是VertxImpl
的静态方法context()
。看这个方法的实现
只有两种情况,当前线程如果为Vertx线程,那么就返回当前线程的Context,如果不是Vertx线程就返回null。
getContext()
VertxImpl
中有实例方法getContext()
,和上面讲的context()
不同,context()
方法只是获取当前线程的context,而getContext()
首先会调用context()
获取当前线程的context,并且保证这个context是关联调用getContext()
方法的Vertx
实例的,否则就返回null。
createEventLoopContext()
这个方法包含四个参数,第一个就是Verticle的deploymentID,第二个是给EventLoopContext
执行阻塞任务用的workerPool,第三个参数就是一个自定义的Json config,最后一个是类加载器。
|
|
internalBlockingPool
会使用当前Vertx
实例的internalBlockingPool
。而如果传入参数workerPool
为null,那么workerPool
也会直接使用当前Vertx
实例的workerPool
。
createWorkerContext()
除了会根据第一个参数判断创建WorkerContext
还是MultiThreadedWorkerContext
,createWorkerContext()
与createEventLoopContext()
方法类似。
getOrCreateContext()
直接看代码
这个方法首先会调用getContext()
方法,获取当前线程下与这个Vertx
实例关联的context,如果不存在,就手动创建一个EventLoopContext
。
注意传给createEventLoopContext()
方法的deploymentID
,workerPool
都为null,那么这个EventLoopContext
的workerPool
就是Vertx
实例的workerPool
。
有一点需要注意,就是在context==null的情况下,创建一个context并不会改变任何当前已有的context状态。
runOnContext()
|
|
通过代码知道runOnContext()
会使用当前调用这个方法的Vertx
实例首先用getOrCreateContext()
获取一个ContextImpl
实例,然后再用Context
的runOnContext()
方法执行task。
举两种典型情况示例分析:
- 在
Verticle
中调用vertx运行这个方法
在Verticle
中调用,首先getOrCreateContext()
会获取到当前这个Verticle
绑定的Context
,接着就在这个Context
上执行task。具体执行的方法在讲Context
的时候已经讲过了,这里就不再赘述。
- 在Main方法中运行这个方法
在Main中运行,首先得手动创建一个Vertx
实例,注意当前所在线程为main
线程,因此调用getOrCreateContext()
方法会创建一个新的EventLoopContext
,接着用这个context执行task。
executeBlocking()
这个方法与runOnContext()
方法的结构类似,也是将具体要执行的任务委托给获取到的Context
或者新创建的Context
来完成。
需要注意的是,如果Context
是新创建的,那么这个context的workerPool
就是它所关联的Vertx
实例的workerPool
,执行阻塞任务的时候也会交给Vertx
实例的workerPool
去处理。
Verticle相关
在VertxImpl
中,如果是标准模式下部署Verticle
,这个工作就交给deploymentManager
去做了,而如果是HA高可用模式下部署Verticle
,就由HaManager
部署(其实还是委托给deploymentManager
去部署)。registerVerticleFactory()
与unregisterVerticleFactory()
等等与Verticle
相关的操作都是由deploymentManager
去完成的。
我们在这篇文章 中介绍了deploymentManager
部署Verticle
的详细过程。
Vert.x线程
Vert.x中主要使用了这两种线程EventLoop
以及Worker
,下面会介绍Vert.x中的线程。
VertxThread
我们先看VertxThread
的结构。
VertxThread
继承了Netty中的FastThreadLocalThread
,而FastThreadLocalThread
的父类就是JDK中的Thread
类。布尔值worker
表示该线程是EventLoop
线程还是Worker
线程。VertxThread
类提供了获取最大执行时间选项、设置线程当前的context等等方法。executeStart()
方法可以获取当前时间并存放在VertxThread#execStart
中保存,这个方法可以被用来度量任务执行时间的长短,从而得出任务是否阻塞了线程。executeEnd()
用来清理execStart
状态,一般在任务结束时调用。startTime()
方法就是获取execStart
的值。
VertxThreadFactory
接下来分析Vertx线程工厂的结构。
VertxThreadFactory
实现了ThreadFactory
接口,这个接口中只有一个方法,就是Thread newThread(Runnable r);
。
进入VertxThreadFactory
类中,首先看到prefix
与threadCount
,前者用来表示这个Vertx线程工厂创建的线程的前缀名,而threadCount
就是对创建的线程进行原子计数。它们组合在一起成为新创建的线程的名字weakMap
与unsetContext()
方法是为了在Verticle
卸载时,能将该Verticle
绑定的Context
所在的线程的所有绑定这个Context
相关信息清除。weakMap
的Key存放了VertxThread
对象,Value是一个无意义的值,当VertxThread
对象在其他地方没有引用的时候,weakMap
会自动进行清理。
newThread()
方法执行了以下几个步骤
- 创建新的Vertx线程,线程名字就是上面说的prefix+threadCount的组合
- 在
BlockedThreadChecker
中注册这个线程 - 在弱引用
weakMap
中存放这个线程 - 让这个线程成为用户线程而不是守护线程(daemon)
线程阻塞操作检测机制
在Vertx线程工厂创建Vertx线程时,会用BlockedThreadChecker
对象注册每个新创建的线程。
BlockedThreadChecker
类中,registerThread()
方法将VertxThread
对象添加到弱引用HashMap threads
中。而线程阻塞操作的检测逻辑是在构造方法中完成的,循环检测通过java.util.Timer
实现,Timer会运行一个固定间隔时间的task。这个间隔时间就是检测阻塞线程的周期,可以在VertxOptions#setBlockedThreadCheckInterval()
方法中进行设置,间隔时间就是构造器的第一个参数,而构造器的第二个参数是表示异常警告时间,就是如果任务执行时间超过了这个阈值,那么就不只是在Log日志打印信息,还会抛出异常,这个参数也可以在VertxOptions#setWarningExceptionTime()
方法中设置。
检测过程逻辑不复杂,对weakMap
中每个线程进行检查,通过获取当前时间now
减去线程中保留的任务启动时间Vertx#startTime()
得到任务当前正在执行已经占用线程的时间,再进行时间的相关判断逻辑。
在VertxImpl
类的初始化过程中,可以看到初始化了阻塞线程checker的代码
Vertx EventLoop机制
我们都知道在Vert.x中,一个EventLoopContext
只会绑定一个EventLoop
线程,而这个EventLoop
就是通过VertxImpl#getEventLoopGroup()#next()
获取到的。我们现在来仔细分析VertxImpl
中的EventLoopGroup
是怎么得到的。
首先先看这行代码
|
|
首先会准备好一个Vertx线程工厂,这个工厂的prefix为vert.x-eventloop-thread-
,然后是阻塞线程检测器checker
参数,worker
选项当然是false,还有VertxOptions
中EventLoop
最大执行时间(超过就发出警告)的选项,待会这个线程工厂会给EventLoopGroup
提供线程。
然后看到EventLoopGroup
创建的代码
|
|
注意到transport.eventLoopGroup()
方法,在Netty完成了对Native Transports(包括Epoll以及Kqueue)的支持后,Vert.x 3.5.0也加入了Native Transports的支持。如果在VertxOptions
中开启了setPreferNativeTransport(true)
选项,那么Vert.x就会去优先使用Native Transports而不是JDK原生的NIO。VertxImpl
的EventLoopGroup
会根据传输方式选择对应的Netty中的EventLoopGroup
。如果是Epoll,最后就是EpollEventLoopGroup
;如果是Kqueue,就是KQueueEventLoopGroup
,如果使用NIO,那么就是NioEventLoopGroup
。这些EventLoopGroup
的size都可以通过VertxOptions
去设置,线程就是由刚才创建的Vertx线程工厂提供。
除此之外,Vert.x提供了一个SystemProperty
去修改EventLoopGroup的IO Ratio
。
这里简单的提一下Netty的线程模型。在Netty的NioEventLoop
这个reactor
线程中,有两种事件需要处理,一种是IO事件,还有一种是Task任务。IO事件就包括读写操作read,write、connect、close、挂起等等,而Task任务就是我们封装提交给EventLoop
的Runnable
对象。在Netty的NioEventLoop
的run()
方法中,这个方法是无限循环的,大概有3个步骤。
- 进行SELECT操作,将所有关联到当前
EventLoop
的Channel
上的IO事件轮询出来 processSelectKeys()
,处理这些IO事件runAllTasks()
,处理任务队列
这个IO Ratio
就是EventLoop
一次循环处理IO事件与任务所占时间的比例,也就是第2步与第3步的占比,默认是1:1,即50。
那么我们之前讲过的EventLoopContext
的执行executeAsync()
方法都是封装成Runnable
对象提交给EventLoop
,为什么提交的方式是任务队列呢,我们知道不能阻塞EventLoop,所以我们在EventLoopContext
中执行的任务都是CPU占用时间短或者非阻塞的操作,通过任务队列的方式提交时,Netty的EventLoop会帮我们将任务队列中的所有任务串行化执行(顺序执行),不会产生并行执行的问题。
现在就能将EventLoopContext
与VertxImpl
中的EventLoopGroup
关联起来了,VertxImpl
这个大脑为EventLoopContext
提供(Netty中的)EventLoop
执行自定义Runnable
对象(任务)。
Vertx WorkerPool机制
可以看到WorkerPool
就是对ExecutorService
与Metrics
的封装。
那我们再来看一看VertxImpl
中的internalBlockingPool
与workerPool
,它们创建的代码如下
继续往上看,可以看到ExecutorService
的创建过程
发现WorkerPool
的线程池是由Executors.newFixedThreadPool
支撑的,这里简单的介绍一下FixedThreadPool
。
这个线程池创建后会复用一定数量的线程来处理一个共享的无边界任务队列,任何时候都只有固定数量(创建时指定的线程数)的线程处理任务,一旦需要处理的任务数量超过了线程数,那么多余无法处理的任务就会在队列中堆积一直到有可用的线程来处理它们。需要注意的是,这些线程会一直保留在线程池中除非线程池被显式关闭。
现在我们知道VertxOptions
中可以设置的workerPoolSize
其实就是这个VertxImpl
实例对象中FixedThreadPool
的线程数量,默认的值是20。internalBlockingPoolSize
也是同样的原理。
我们使用deploymentManager
部署Verticle
的过程中,会给每个Verticle
创建一个相关联的context,在调用VertxImpl
的三种创建Context方法时,都需要指定internalBlockingPool
,而这个internalBlockingPool
直接就是使用VertxImpl
实例的内部阻塞池。
除此之外,还需要给每种Context指定一个workerPool
。在DeploymentManager#doDeploy()
中,如果不去指定DeploymentOptions
的workerPoolName
,就不会根据名字去获取一个共享的workerExecutor
,那么workerPool
就会为null,所以此时使用VertxImpl
实例的workerPool
。而如果指定了workerPoolName
,就会使用VertxInternal#createSharedWorkerExecutor()
方法为这个Context单独创建一个WorkerExecutor
,而不是用VertxImpl
实例中的workerPool
。
下面谈一谈Vert.x中的SharedWorkerExecutor
。
SharedWorkerExecutor
WorkerExecutor
是Vert.x中的接口,表示在Vert.x中执行阻塞代码的executor
,这个接口提供了executeBlocking()
方法,类似Context
接口中的同名方法,但是WorkerExecutor
会使用一个独立的WorkerPool
。先看WorkerExecutor
的层次结构:
其实可以看到WorkerExecutorImpl
中包裹了一个WorkerPool
对象,而executeBlocking()
方法就是使用该WorkerExecutor关联的Vertx
当前所处的Context下执行executeBlocking()
方法,只不过workerPool
使用的是这个WorkerExecutor的WorkerPool。
那么我们的SharedWorkerExecutor
是如何实现的呢?VertxImpl
中有一个Key为String
,Value为SharedWorkerPool
的MapnamedWorkerPools
,这个Map就是用来保存SharedWorkerPool
实例的,字符串Key是我们的SharedWorkerExecutor
的名字。SharedWorkerPool
是VertxImpl
的内部类,在继承WorkerPool
的基础上加上了String字段name
以及一个引用计数refCount
。
我们看VertxImpl#createSharedWorkerExecutor()
方法代码
创建时首先通过namedWorkerPools
这个Map根据名字name去查询是否已经有这个SharedWorkerPool
了,如果不存在(即取出结果为null),那么就会重新创建一个FixedThreadPool
并用这个线程池去创建一个SharedWorkerPool
,然后将这个SharedWorkerPool
放入Map中;如果已经存在同名SharedWorkerPool
,直接将引用计数+1。最后就是用SharedWorkerPool
创建一个WorkerExecutorImpl
对象返回。
因此这个SharedWorkerExecutor
它们之间通过名字name来共享WorkerPool。
我们可以将WorkerExecutor
,WorkerPool
,FixedThreadPool
以及Context
,Verticle
等等的关系总结一下,用下面这张图比较直观。
还有一点,就是如果在DeployOptions
中只指定workerPoolSize
而不指定workerPoolName
,那么实际上Context使用的还是VertxImpl
中的workerPool
,而不是用设置的size重新创建的WorkerPool
,workerPoolSize
设置就没有生效,这个size实际上还是VertxImpl
的workerPool
的size。
小结
- Vertx线程可以绑定不同的Context,Context只能绑定一个线程。
- 不同的Vertx线程工厂为
EventLoopGroup
以及WorkerPool
提供线程。 - Vertx对象操纵
Verticle
的相关工作交给deploymentManager
完成。 EventLoopContext
中的任务是交给Netty的EventLoop
的任务队列串行执行。WorkerPool
底层使用FixedThreadPool
实现。- SharedWorkerExecutor是独立的,它们共享的是
WorkerPool
。