Vert.x中的共享数据(Shared Data)
在Vert.x中,提出了共享数据SharedData
这个概念。要使用共享数据,可以调用SharedData sharedData = vertx.sharedData();
获取共享数据的一个对象。而在VertImpl
类中初始化SharedData
的操作是this.sharedData = new SharedDataImpl(this, clusterManager);
,我们先来看一下SharedData
接口。
官方文档说明了SharedData
的使用目的:
共享数据可以让你在应用的不同部分之间共享数据,或者在同一个Vert.x实例(instance)中的不同应用之间共享数据,还可以在多个Vert.x实例的集群之间共享数据。
共享数据提供了以下:
- 同步的共享map(本地模式)
- 异步的maps(本地模式或者集群范围)
- 异步的锁(本地模式或者集群范围)
- 异步计数器(本地模式或者集群范围)
对应的,我们可以在SharedData
接口中看到以下方法:
LocalMap
首先是最简单的KV实现LocalMap
,使用sharedData
对象获取LocalMap
的实现如下
localMaps
的定义在这private final ConcurrentMap<String, LocalMap<?, ?>> localMaps = new ConcurrentHashMap<>();
。
可以看到localMaps
只是一个ConcurrentHashMap
,存储了字符串name
与LocalMap
的映射关系,我们这样即可以通过这个字符串去找寻我们所需要的LocalMap
,这样就可以在应用的不同部分进行共享数据,仅仅只需要一个字符串去标识即可。而sharedData
对象拿到字符串后,会判断当前是否已经存在这个LocalMap
,不存在就创建一个新的本地KV同步实现。
而LocalMap
接口继承了java.util
的Map
接口,将实现委托给了一个ConcurrentHashMap
实例去处理,API在使用上大同小异,但是有一个显著不同的地方需要注意。即这个Map只支持Immutable类型的Key或者Value,这是为了确保在Vert.x实例中的多个不同EventLoop线程之间不会去共享可变状态,通过拷贝状态保证了共享数据时的线程安全,这也意味着不需要同步关键字或者锁来保护状态。Checker
类中会去检查Key与Value类型是否满足官方定义Immutable的类型,如果你要使用自己的Immutable类型,只要将其标注Sharable
接口就行了。
AsyncMap
异步map支持本地模式与集群模式,如果当前vertx实例没有开启集群模式(clusterManager对象为null),那么getAsyncMap()
方法得到的是LocalAsyncMapImpl
,否则和getClusterWideMap()
方法一样通过clusterManager
获取集群模式的Map。
Local模式
和LocalMap
相似,内部实现是一个ConcurrentHashMap
,并且Key与Value都需要满足Immutable条件。差别就是整个过程是异步的,提供了一个回调参数handler
;除此之外,还增加了可以设置TTL参数的put()
方法。
集群模式
集群模式获取的AsyncMap
仅仅是具体cluster实现的一个包装类,因此集群模式下的AsyncMap
的具体表现取决于使用的ClusterManager。在SPI包中已经定义了ClusterManager
的方法,对应Hazelcast, Infinispan等ClusterManager都有实现,后面的异步锁与异步计数器的集群模式都是类似的,都由具体实现决定。
异步锁
异步锁可以通过getLock()
与getLockWithTimeout()
获取,前者获取异步锁会设置一个10秒的默认超时时间。
Local模式
本地异步锁的实现在AsynchronousLock
类中,如果能直接获取到锁,那么直接获取,获取锁的执行都是在当前Context上完成的,否则加入到队列中等待,对这个锁的排队过程通过一个LinkedList
队列实现。队列元素类型为LockWaiter
,这是一个静态内部类,里面保存了要获取的锁对象,当前执行的vert.x context,获取锁的回调,是否超时以及是否获取过的信息。当锁排队加入队列时,会使用vertx.setTimer()
方法给该锁计时,如果超时那么将timeOut
布尔置为true并且回调返回一个失败的结果。每次锁执行release()
操作后,就会从队列中poll
一个新的获取锁请求出来执行,如果当前锁没有更多的请求了,就将owned
置为false,彻底释放这个锁。
异步计数器
异步计数器通过getCounter()
方法获取。
Local模式
这个异步计数器其实只是对java.util.concurrent.atomic
中AtomicLong
的一个封装,JDK已经实现了一个原子Long类型,而它所有获取或者增减的操作都是线程安全的。异步的回调操作会确保在当前context下执行。
小结
Vert.x SharedData
可以为我们提供跨Vertx实例节点的集群间(或者单一Vertx实例节点)的(同步/异步)共享KV数据结构、异步锁、异步计数器,而本地模式具体实现基于了JDK中的工具类,实现简单易懂,集群模式则依赖具体集群实现。