深入理解Vert.x Core(3)-Verticle

进入Verticle的世界

尽管官方文档说不会强迫你去使用Verticle这种模型,但是Verticle可以轻松给你带来scalability, 如果想进行水平扩展,增加部署的verticle instances就可以了。除此之外,使用Standard Verticle或者Worker Verticle能保证线程安全,而不需要你自己去处理一些多线程的问题,所以为什么不使用Verticle呢?

在进入Verticle之前,我们必须通过Vertx这一关,在概览中已经说过了,Vertx实例是Vert.x API的入口。那么我们接下来看看如何从Vertx中进入Verticle

部署一个Verticle

Vertx接口中有很多方法可以部署Verticle,我们分下组

A组:使用一个Verticle对象部署

1
2
3
4
void deployVerticle(Verticle verticle);
void deployVerticle(Verticle verticle, Handler<AsyncResult<String>> completionHandler);
void deployVerticle(Verticle verticle, DeploymentOptions options);
void deployVerticle(Verticle verticle, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler);

B组:使用类名部署

1
2
void deployVerticle(Class<? extends Verticle> verticleClass, DeploymentOptions options);
void deployVerticle(Class<? extends Verticle> verticleClass, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler);

C组:使用Supplier函数接口部署

1
2
void deployVerticle(Supplier<Verticle> verticleSupplier, DeploymentOptions options);
void deployVerticle(Supplier<Verticle> verticleSupplier, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler);

D组:使用String部署

1
2
3
4
void deployVerticle(String name);
void deployVerticle(String name, Handler<AsyncResult<String>> completionHandler);
void deployVerticle(String name, DeploymentOptions options);
void deployVerticle(String name, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler);

我们知道Vertx实例就是通过静态工厂方法生成的VertxImpl类的实例,可以参考这篇

VertxImpl类中看对应的实现代码,A组B组和C组都是通过deploymentManager.deployVerticle(verticleSupplier, options, completionHandler);部署Verticle的,而D组是通过deploymentManager.deployVerticle(name, options, completionHandler);。这里可以知道Vertx实例将部署Verticle的任务交给了deploymentManager去做。

A组通过Verticle对象部署的时候,instance 数量只能为1,可以看到如果instance不为1就会抛出异常,但是这个Verticle对象是我们自己来完成构造的。

1
2
3
if (options.getInstances() != 1) {
throw new IllegalArgumentException("Can't specify > 1 instances for already created verticle");
}

B组通过类名来部署,其实调用的就是C组的方法,只不过Verticle对象是通过反射机制使用默认构造器生成的

1
2
3
4
5
6
7
8
9
10
@Override
public void deployVerticle(Class<? extends Verticle> verticleClass, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
deployVerticle(() -> {
try {
return verticleClass.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, options, completionHandler);
}

A,B,C三组都能拿到Verticle的对象去部署,而D组则是通过字符串去部署的,而字符串这种方式也方便了Polyglot多语言Verticle的部署。

DeploymentManager部署管理器

使用对象部署Verticle

如果要使用Verticle Isolation Groups,必须使用D组这种方式去部署,因为自从Vert.x 3.0以后,使用了flat classpath,默认情况下Verticle与vertx的classloader是相同的,如果你自己初始化Verticle,则classloader无法隔离,所以在ABC三组中的DeploymentOptions使用isolated功能会抛出异常。

1
2
3
4
5
6
7
8
9
if (options.getExtraClasspath() != null) {
throw new IllegalArgumentException("Can't specify extraClasspath for already created verticle");
}
if (options.getIsolationGroup() != null) {
throw new IllegalArgumentException("Can't specify isolationGroup for already created verticle");
}
if (options.getIsolatedClasses() != null) {
throw new IllegalArgumentException("Can't specify isolatedClasses for already created verticle");
}

接着通过ClassLoader cl = getClassLoader(options, currentContext);获取到当前要部署的Verticle的Classloader。下面是getClassloader()这个方法

1
2
3
4
5
6
7
8
9
10
private ClassLoader getClassLoader(DeploymentOptions options, ContextImpl parentContext) {
String isolationGroup = options.getIsolationGroup();
ClassLoader cl;
if (isolationGroup == null) {
cl = getCurrentClassLoader();
} else {
// 使用isolationGroup的时候(使用verticle实例部署不行)
}
return cl;
}

如果isolationGroup为null的时候,那么就调用getCurrentClassLoader()方法获取。

1
2
3
4
5
6
7
private ClassLoader getCurrentClassLoader() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null) {
cl = getClass().getClassLoader();
}
return cl;
}

这个私有方法首先获取当前线程的上下文类加载器,如果不存在的话,那么就获取DeploymentManager类的类加载器,而DeploymentManager这个类是在VertxImpl类中进行初始化的,那么前面的cl就会被设置为Vertx instance的类加载器。

isolationGroup!=null 的情况在使用Verticle实例去部署的时候是不行的,因为在之前就会抛出异常了,要想使用必须使用字符串来部署Verticle。

确定了这个Verticle 的类加载器之后,就开始对实例做一些前期工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int nbInstances = options.getInstances();
Set<Verticle> verticles = Collections.newSetFromMap(new IdentityHashMap<>());
for (int i = 0; i < nbInstances; i++) {
Verticle verticle;
try {
verticle = verticleSupplier.get();
} catch (Exception e) {
completionHandler.handle(Future.failedFuture(e));
return;
}
if (verticle == null) {
completionHandler.handle(Future.failedFuture("Supplied verticle is null"));
return;
}
verticles.add(verticle);
}
if (verticles.size() != nbInstances) {
completionHandler.handle(Future.failedFuture("Same verticle supplied more than once"));
return;
}

先通过options 拿到verticle的instance数量存在nbInstances 中。

然后通过Collections.newSetFromMap(new IdentityHashMap<>()); 创建一个Set<Verticle>,这个Set的元素会使用==操作符比较Verticle而不是equals()比较,verticleSupplier.get();通过supplier.get()获取的verticle进行==比较时返回都是false的。这样的话就将要部署的Verticle的所有instance都存在verticles这个Set中了,并且中间进行了检查操作。

接着将verticles转换成verticlesArray数组,并且将这个verticle类名保存为verticleClass,最后将部署工作委托给doDeploy()方法。

1
2
3
Verticle[] verticlesArray = verticles.toArray(new Verticle[verticles.size()]);
String verticleClass = verticlesArray[0].getClass().getName();
doDeploy("java:" + verticleClass, generateDeploymentID(), options, currentContext, currentContext, completionHandler, cl, verticlesArray);

看下doDeploy()方法的参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* doDeploy()方法注释,执行了实际上的verticle部署的逻辑
*
* @param identifier verticle的标识符
* @param deploymentID 这次部署的ID
* @param options 部署的选项DeploymentOptions
* @param parentContext 当前这次部署的父Context
* @param callingContext 当前执行这次部署所在的Context
* @param completionHandler 完成后的Handler
* @param tccl 给verticle指定的线程上下文类加载器
* @param verticles 即将部署的verticle数组
*/
private void doDeploy(String identifier, String deploymentID, DeploymentOptions options,
ContextImpl parentContext,
ContextImpl callingContext,
Handler<AsyncResult<String>> completionHandler,
ClassLoader tccl, Verticle... verticles) {}

回到刚才调用doDeploy()方法的地方,参数是一一对应的,但是会给每个verticle的类名前面加上java:组成标识符,deploymentID是通过随机的UUID生成的。

接着进入doDeploy() 方法,了解具体的执行逻辑是怎样的。

1
2
JsonObject conf = options.getConfig() == null ? new JsonObject() : options.getConfig().copy(); // Copy it
String poolName = options.getWorkerPoolName();

上面这两行代码拿到verticle的config与poolName。poolName影响了Verticle的WorkerPool的选择。

接下来两行代码建立了verticle的父子关系,

1
2
Deployment parent = parentContext.getDeployment();
DeploymentImpl deployment = new DeploymentImpl(parent, deploymentID, identifier, options);

Deployment接口

我们之前在这篇简单讲过verticle与context的关系,ContextImpl会持有一个Deployment 引用,现在我们来仔细分析下Deployment 的原理。

首先看Deployment的结构

ShowImage

Deployment是定义的接口,而DeploymentImpl是在DeploymentManager类中的一个实现接口的内部类。DeploymentImpl中的fields没有什么特别的,需要注意的是,VerticleHolderDeploymentManager中的一个静态内部类,被用来保存ContextImpl对象与Verticle对象之间的对应关系。

现在我们可以这么理解,因为一个Verticle instance对应一个唯一的Context,而Deployment类负责维护该Verticle的父子关系以及该Verticle的相关信息,Context实例又包含这个Verticle对应的Deployment对象,所以Context与Verticle还有Deployment之间的关系可以用下面这张图表示。

ShowImage

doDeploy()方法

再次回到doDeploy()方法,Deployment对象parent就是通过parentContext 获取的,同时为即将部署的verticle也创建一个DeploymentImpl对象deployment 。接下来创建两个原子变量deployCountfailureReported ,前者表示已经部署的verticle instance的数量,后者表示部署过程中是否有failure。

接下来进入循环,将传入doDeploy() 方法的verticles数组中所有verticle都进行部署操作。

1
2
WorkerExecutorImpl workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize()) : null;
WorkerPool pool = workerExec != null ? workerExec.getPool() : null;

上面代码为这个Verticle准备WorkerPool,如果在DeploymentOptions 中指定了poolName,那么会复用VertxImpl 中已经存在的同名pool(不存在就创建),如果不进行指定,那么pool这个时候为null。

1
2
ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
vertx.createEventLoopContext(deploymentID, pool, conf, tccl);

然后为这个Verticle创建Context,会根据DeploymentOptions中的选项选择对应的Context类型,可以跳进VertxImpl 类查看对应的创建方法,创建方法基本是根据参数进行new对应的Context操作,只有internalBlockingPoolVertx 实例创建的,而在workerPool 传入null时(刚才不指定poolName情况下),就使用Vertx 创建的workerPool

下面将新创建的Context 对象与deployment 对象关联起来。

1
2
context.setDeployment(deployment);
deployment.addVerticle(new VerticleHolder(verticle, context));

前面的准备工作都做好以后,后面就是verticle启动的操作了。

调用Verticle中init() 方法,将Vertx实例以及context对象和verticle绑定在一起,接下来执行verticle.start(startFuture); ,对verticle进行启动,并设置startFuture的handler。简化handler的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
startFuture.setHandler(ar -> {
if (ar.succeeded()) {
if (parent != null) {
parent.addChild(deployment);
deployment.child = true;
}
// metrics
deployments.put(deploymentID, deployment);
if (deployCount.incrementAndGet() == verticles.length) {
reportSuccess(deploymentID, callingContext, completionHandler);
}
} else if (failureReported.compareAndSet(false, true)) {
rollbackDeployment(callingContext, completionHandler, deployment, context, ar.cause());
}
});

如果startFuture是成功的,执行if中成功的逻辑。如果这次deployment是有parent的(不为null),那么会为该parent注册这个deployment 为child,并且将deploymentchild 设为true。同时将deploymentID与deployment组成的KV对添加到deployments 这个Map中保存(可以提供deploymentIDs的信息)。最后判断deployCount,是否所有数组中的verticle都部署完了,如果都部署成功了,那么就向completionHandler 报告成功。如果还有没部署完的verticle,就回到循环继续执行。

容错处理(rollbackDeployment)

在执行doDeploy() 方法过程中,做了一些容错处理。将代码简化,只保留处理错误的部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
context.runOnContext(v -> {
try {
// some details
startFuture.setHandler(ar -> {
if (ar.succeeded()) {
// do something when succeeded
} else if (failureReported.compareAndSet(false, true)) {
rollbackDeployment(callingContext, completionHandler, deployment, context, ar.cause());
}
});
} catch (Throwable t) {
if (failureReported.compareAndSet(false, true))
rollbackDeployment(callingContext, completionHandler, deployment, context, t);
}
});

可以看到如果在start()方法过程中失败或者整个过程捕获到异常都会进行回滚操作。回滚操作通过rollbackDeployment()方法进行。rollbackDeployment()方法调用传入参数deploymentdoUndeployChildren() 方法。

doUndeployChildren()方法会检查调用该方法的deployment 中有无child deployment,如果有那么就对每个child再调用doUndeploy() 方法,如果没有就将completionHandler置为成功的Future

doUndeploy() 方法会继续检查调用方法的deployment 是否有child deployment, 如果有就调用doUndeployChildren() ,这样就又进入child deployment的卸载(undeploy)并继续往下层遍历,因此可以将verticle的层次结构当成树型结构。

使用字符串部署Verticle

使用字符串去部署Verticle时,调用DeploymentManager 公有方法deployVerticle()代码如下

1
2
3
ContextImpl callingContext = vertx.getOrCreateContext();
ClassLoader cl = getClassLoader(options, callingContext);
doDeployVerticle(identifier, generateDeploymentID(), options, callingContext, callingContext, cl, completionHandler);
isolated类加载器

如果使用字符串部署,那么isolatedGroup 是允许的,当isolationGroup == null 时classloader跟之前是一样的,沿用当前Vertx实例对应的类加载器。isolationGroup!=null 时,首先会先从classloaders 这个弱引用Map取Key为isolationGroup 这个字符串的Classloader,如果取到了就直接返回,没取到就要进行一些操作。

首先用getCurrentClassLoader() 方法获取classloader,并且将DeploymentOptions 中所有classpath[String类型]转换成URL,添加到一个列表urls中, 然后通过URL类加载器的getURLs() 方法得到current 类加载器(Vertx实例的类加载器)的URL(打印输出就是本地依赖的URL),并且将这些URL也添加到urls 列表中。接下来用这个urls 列表、current 类加载器和DeploymentOptions 中的isolatedClasses 列表作为参数new一个新的IsolatingClassLoader

可以看一看这个自定义的IsolatingClassLoaderloadClass() 方法,先调用findLoadedClass(name) 找到系统加载的这个类,如果已经加载过直接返回这个类,没加载(为null)的话由我们现在来执行加载。然后用isIsolatedClass(name)来判断是否为我们设置的隔离类,看isIsolatedClass() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private boolean isIsolatedClass(String name) {
if (isolatedClasses != null) {
for (String isolated : isolatedClasses) {
if (isolated.endsWith(".*")) {
String isolatedPackage = isolated.substring(0, isolated.length() - 1);
String paramPackage = name.substring(0, name.lastIndexOf('.') + 1);
if (paramPackage.startsWith(isolatedPackage)) {
// Matching package
return true;
}
} else if (isolated.equals(name)) {
return true;
}
}
}
return false;
}

由于我们在这个类加载器的构造器已经传入了isolatedClasses 的String组成的List,因此我们只要将列表中的String与要加载的类名进行一一比较就可以了,列表有的使用了通配符,我们需要将尾缀去掉再比较,如果是类名直接进行字符串equals() 方法比较就可以了,这样我们就能判断要加载的类是否属于isolatedClasses 。如果不是,那么加载任务就让URLCLassLoader去决定(通常是双亲委派加载),如果属于隔离类,接着会判断是不是Vert.x相关的类或者系统类,如果是这些类就交给Parent类加载器去加载,包括以下这些类:

1
2
3
4
5
6
7
8
9
10
private boolean isVertxOrSystemClass(String name) {
return
name.startsWith("java.") ||
name.startsWith("javax.") ||
name.startsWith("sun.*") ||
name.startsWith("com.sun.") ||
name.startsWith("io.vertx.core") ||
name.startsWith("io.netty.") ||
name.startsWith("com.fasterxml.jackson");
}

如果是我们自定义的类,那么就由我们的isolatingClassLoader类加载器去加载这个类。

回到DeploymentManager ,classloader的准备工作完成后,就将一些必要的参数传给重载方法doDeployVerticle() 。 这个重载方法只有三行代码,就是解析Verticle工厂,拿到VerticleFactory 的列表的迭代器后交给下面的重载方法继续处理。

1
2
3
List<VerticleFactory> verticleFactories = resolveFactories(identifier);
Iterator<VerticleFactory> iter = verticleFactories.iterator();
doDeployVerticle(iter, null, identifier, deploymentID, options, parentContext, callingContext, cl, completionHandler);
VerticleFactory
  • VerticleFactory SPI接口

先看SPI接口结构,测试相关的Factory省略掉,Vert.x Core中只有一个JavaVerticleFactory实现类。

ShowImage

requiresResolve()方法表示是否需要对标识符(identifier)解析(调用resolve()方法),resolve()方法就是解析实际的操作。prefix()方法表示工厂的前缀,blockingCreate()指明createVerticle()方法是否是阻塞操作的(放入EventLoop or Worker Thread)。

注:在Vert.x中,提供了一些已经实现的VerticleFactory,比如Vert.x Service Factory,或者JSVerticleFactory等等。它们都是实现了VerticleFactory的SPI接口并进行了一些修改。

  • 加载VerticleFactories

DeploymentManager类的构造器中,除了传递Vertx实例引用就是加载VerticleFactories了。看一下loadVerticleFactories()方法

1
2
3
4
5
6
7
private void loadVerticleFactories() {
Collection<VerticleFactory> factories = ServiceHelper.loadFactories(VerticleFactory.class);
factories.forEach(this::registerVerticleFactory);
VerticleFactory defaultFactory = new JavaVerticleFactory();
defaultFactory.init(vertx);
defaultFactories.add(defaultFactory);
}

首先用ServiceHelper去加载所有VerticleFactory的实现类,默认情况下是没有的(除非使用了Vert.x Service Factory等等),接着为每个加载到的VerticleFactory执行一次注册操作。然后DeploymentManager会提供默认的VerticleFactory,而这个就是JavaVerticleFactory,之后将这个Factory添加到列表defaultFactories中。

  • 注册VerticleFactory

这个流程不是很复杂,先获取当前要注册的VerticleFactoryprefixprefix的用处在官方文档已经说的很明白了,就是被用来查询对应的VerticleFactory。然后将由prefix与要注册的VerticleFactory组成的一对KV添加到verticleFactories这个Map中,最后执行init()方法。

  • 注销VerticleFactory

注销流程跟注册流程类似,将需要注销的VerticleFactory从MapverticleFactories移除。

  • 解析VerticleFactory

resolveFactories()方法的注释已经把步骤给写出来了,也像文档写的那样。
1.根据参数标识符找到前缀,比如你的identifier是java:MyVerticle.java,那么就会用冒号前面的java作为查询关键词。
2.如果不存在冒号,就去找标识符后缀,就是点号后面的字符串(类似文件扩展名),如果identifier是YourVerticle.java,那么就会用点后面的java作为查询关键词。
3.都不存在就用默认的factory(java)。

找到关键词以后,就去DeploymentManager中的MapverticleFactories用关键词作为key查询对应的List<VerticleFactory>,最后list作为解析方法的返回。

doDeployVerticle()

现在看剩余的最后一个重载doDeployVerticle()方法。
注意到传入方法的第一个参数是根据当前这个要部署的Verticle标识符解析出的Verticle工厂列表的迭代器。

直接上流程图,这样看更加直观。

ShowImage

首先可以知道中间过程如果出错,最后回调只会报告最后一次错误,因为会不断尝试取下一个工厂调用doDeployVerticle方法,到最后一个方法时(迭代器中无更多的工厂元素),就会向handler报告上一次错误。

doDeployVerticle这个方法主要做了什么事情呢?可以看到最后具体的部署过程还是交给前面讲的doDeploy()方法去做的。在doDeployVerticle()方法中,主要完成的任务是:基于传递给它的标识符,根据相关联的Verticle工厂去创建Verticle对象,然后将这些创建好的Verticle对象交给doDeploy()方法去基于对象部署。

卸载Verticle

Vertx根据deploymentId去undeploy Verticle时,也是交给deploymentManager完成的。首先部署管理器会根据deploymentId找到id对应的deployment对象,而这个deployment对象会执行自己的doUndeploy()方法,将它所在的context绑定的所有verticle(包括child verticle)也一同undeploy,doUndeploy()之前已经简单的介绍过了,可以将verticle的父子关系当做树的层次关系,然后对每个节点进行深度遍历(类似DFS的思想)。

小结

  1. Vertx对象的deployVerticle()方法其实是委托给它自己对象中的deploymentManager对象去完成的。Vertx中的registerVerticleFactory方法其实就是给deploymentManager对象注册工厂。

  2. 部署Verticle总的来说有两种方式,基于对象部署或者基于字符串部署,基于对象部署是用在java中的,由调用者去提供Verticle对象;基于字符串部署用处比较广,不仅可以按组隔离部署Verticle,还可以进行多语言支持或者基于service的方式,这种会用Verticle工厂去创建需要的verticle对象,然后再交给前者去用对象部署。

  3. verticle的信息自己并不保存,每个verticle instance都相关联一个context,context会保证verticle的线程安全性,verticle的信息在context中;context中还有一个deployment对象,用来维护verticle部署相关的信息(父子verticle关联,部署实例数量等等)。

------ 本文结束 ------

版权声明


BillyYccc's blog by Billy Yuan is licensed under a Creative Commons BY-NC-SA 4.0 International License.
本文原创于BillyYccc's Blog,转载请注明原作者及出处!