深入理解Vert.x Core(6)-SharedData

Vert.x中的共享数据(Shared Data)

在Vert.x中,提出了共享数据SharedData这个概念。要使用共享数据,可以调用
SharedData sharedData = vertx.sharedData(); 获取共享数据的一个对象。而在VertImpl 类中初始化SharedData 的操作是this.sharedData = new SharedDataImpl(this, clusterManager); ,我们先来看一下SharedData接口。

官方文档说明了SharedData 的使用目的:

共享数据可以让你在应用的不同部分之间共享数据,或者在同一个Vert.x实例(instance)中的不同应用之间共享数据,还可以在多个Vert.x实例的集群之间共享数据。

共享数据提供了以下:

  • 同步的共享map(本地模式)
  • 异步的maps(本地模式或者集群范围)
  • 异步的锁(本地模式或者集群范围)
  • 异步计数器(本地模式或者集群范围)

对应的,我们可以在SharedData接口中看到以下方法:

1
2
3
4
5
6
<K, V> LocalMap<K, V> getLocalMap(String name);
<K, V> void getAsyncMap(String name, Handler<AsyncResult<AsyncMap<K, V>>> resultHandler);
<K, V> void getClusterWideMap(String name, Handler<AsyncResult<AsyncMap<K, V>>> resultHandler);
void getLock(String name, Handler<AsyncResult<Lock>> resultHandler);
void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler);
void getCounter(String name, Handler<AsyncResult<Counter>> resultHandler);

LocalMap

首先是最简单的KV实现LocalMap,使用sharedData对象获取LocalMap的实现如下

1
2
3
public <K, V> LocalMap<K, V> getLocalMap(String name) {
return (LocalMap<K, V>) localMaps.computeIfAbsent(name, n -> new LocalMapImpl<>(n, localMaps));
}

localMaps的定义在这private final ConcurrentMap<String, LocalMap<?, ?>> localMaps = new ConcurrentHashMap<>();
可以看到localMaps只是一个ConcurrentHashMap,存储了字符串nameLocalMap的映射关系,我们这样即可以通过这个字符串去找寻我们所需要的LocalMap,这样就可以在应用的不同部分进行共享数据,仅仅只需要一个字符串去标识即可。而sharedData对象拿到字符串后,会判断当前是否已经存在这个LocalMap,不存在就创建一个新的本地KV同步实现。

LocalMap接口继承了java.utilMap接口,将实现委托给了一个ConcurrentHashMap实例去处理,API在使用上大同小异,但是有一个显著不同的地方需要注意。即这个Map只支持Immutable类型的Key或者Value,这是为了确保在Vert.x实例中的多个不同EventLoop线程之间不会去共享可变状态,通过拷贝状态保证了共享数据时的线程安全,这也意味着不需要同步关键字或者锁来保护状态。Checker类中会去检查Key与Value类型是否满足官方定义Immutable的类型,如果你要使用自己的Immutable类型,只要将其标注Sharable接口就行了。

AsyncMap

异步map支持本地模式与集群模式,如果当前vertx实例没有开启集群模式(clusterManager对象为null),那么getAsyncMap()方法得到的是LocalAsyncMapImpl ,否则和getClusterWideMap()方法一样通过clusterManager获取集群模式的Map。

Local模式

LocalMap相似,内部实现是一个ConcurrentHashMap,并且Key与Value都需要满足Immutable条件。差别就是整个过程是异步的,提供了一个回调参数handler;除此之外,还增加了可以设置TTL参数的put()方法。

集群模式

集群模式获取的AsyncMap仅仅是具体cluster实现的一个包装类,因此集群模式下的AsyncMap的具体表现取决于使用的ClusterManager。在SPI包中已经定义了ClusterManager的方法,对应Hazelcast, Infinispan等ClusterManager都有实现,后面的异步锁与异步计数器的集群模式都是类似的,都由具体实现决定。

异步锁

异步锁可以通过getLock()getLockWithTimeout()获取,前者获取异步锁会设置一个10秒的默认超时时间。

Local模式

本地异步锁的实现在AsynchronousLock类中,如果能直接获取到锁,那么直接获取,获取锁的执行都是在当前Context上完成的,否则加入到队列中等待,对这个锁的排队过程通过一个LinkedList队列实现。队列元素类型为LockWaiter,这是一个静态内部类,里面保存了要获取的锁对象,当前执行的vert.x context,获取锁的回调,是否超时以及是否获取过的信息。当锁排队加入队列时,会使用vertx.setTimer()方法给该锁计时,如果超时那么将timeOut布尔置为true并且回调返回一个失败的结果。每次锁执行release()操作后,就会从队列中poll一个新的获取锁请求出来执行,如果当前锁没有更多的请求了,就将owned置为false,彻底释放这个锁。

异步计数器

异步计数器通过getCounter()方法获取。

Local模式

这个异步计数器其实只是对java.util.concurrent.atomicAtomicLong的一个封装,JDK已经实现了一个原子Long类型,而它所有获取或者增减的操作都是线程安全的。异步的回调操作会确保在当前context下执行。

小结

Vert.x SharedData可以为我们提供跨Vertx实例节点的集群间(或者单一Vertx实例节点)的(同步/异步)共享KV数据结构、异步锁、异步计数器,而本地模式具体实现基于了JDK中的工具类,实现简单易懂,集群模式则依赖具体集群实现。

------ 本文结束 ------

版权声明


BillyYccc's blog by Billy Yuan is licensed under a Creative Commons BY-NC-SA 4.0 International License.
本文原创于BillyYccc's Blog,转载请注明原作者及出处!