Reactive Programming
Vert.x宣称自己是一个反应式(reactive)的工具集,那么reactive在这里代表着什么呢?Wiki百科给出了反应式编程(Reactive Programming)的定义:
In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.
在反应式编程中,我们需要处理的是数据流,我们可以基于任何事物来创建一条数据流,流就是一个按照时间顺序发生的事件记录序列,下图就是一个流的示例。
流上面可以记录三种类型的事件,包括
- 一个值 element value
- 一个错误 error
- 流完成的事件 completed
流可以是无限延长下去的,也就是说,如果不主动去关闭流或者因为错误而终止,流会一直进行。
我们通常可以对流进行订阅(subscribe)操作,在下游对发射的流数据进行处理。
上图就是我们使用一个订阅者去消费流的过程,看起来很美好,但是这里出现了问题,如果Publisher发布数据的速度远远超过Subscriber消费的速度会怎样,数据会在Subscriber这里产生堆积,一直到产生OOM(OutOfMemory)错误。所以,该如何解决这个问题呢,我们需要一个流控机制(Flow Control),也就是背压(Back-Pressure)。之前的数据消费是一个推(push)的过程,Publisher有多少数据要发送就推给Subscriber,但现在我们使用一种类似拉(pull)的机制,由Subscriber去控制消费数据的数量,主动去找Publisher索取数据,然后由Publisher push回可控数量内的数据。
Vert.x已经为我们提供了开箱即用的实现(ReadStream
与WriteStream
),类似的JVM实现还有RxJava, Project Reactor, Akka Streams等,Vert.x也提供了对Reactive Streams标准的支持vertx-reactive-streams,这样就可以通过规范在不同流处理实现间进行互操作了。
Vert.x中的流式处理
Vert.x中内建的流支持在io.vertx.core.streams
包中,我们先来看一下这个包中的接口。
StreamBase
StreamBase
中只有一个StreamBase exceptionHandler(@Nullable Handler<Throwable> handler);
方法,表示流遇到异常时执行的handler。
ReadStream
实现ReadStream
接口的类产生的对象就可以被看做可以读取数据的流了,我们可以对这个数据流进行控制,ReadStream
继承了StreamBase
,包含了以下方法:
|
|
exceptionHandler
方法设置流读取数据遇到异常时执行的handler,handler
方法设置这个流读到一条数据后执行的handler,pause
表示暂停读取数据,resume
表示继续读取数据,endHandler
表示流结束了并且所有数据读取完了之后执行的handler。
官方文档已经说明了一些实现了ReadStream
接口的API,之前在这篇文章中提及到了TimeoutStream
的使用。
WriteStream
与ReadStream
相对应,实现了WriteStream
的类产生的对象代表着可以写入数据的流,WriteStream
有以下方法:
|
|
exceptionHandler
设置写入数据时产生异常执行的handler,write
方法表示将一个数据元素写入流中,write
方法是非阻塞的,一旦调用立即返回,然后将需要写入的数据加入一个队列,然后这个真正的写入流的过程会异步发生。需要注意的是,如果需要写入的数据太多产生堆积,很可能会导致内存被耗尽。setWriteQueueMaxSize
可以设置写入队列最大的长度,这个长度是由具体实现去做决定的,而writeQueueFull
被用来检查写入队列是否已经排队排满了。end
表示结束这个流。drainHandler
表示当写入队列发生拥塞时,设置这个handler会进行以下操作:如果队列不再拥塞时,流接下来进行处理这个drainHandler
的逻辑。
同样,官方文档也给出了一些实现了WriteStream
接口的API。
Pump
官方文档给出了一个示例,介绍了如何将一个NetSocket
所有读取到的入数据写到出口中,如下代码
NetSocket
收到数据后首先将读到的数据写到出口,并且检查写队列是否拥塞,如果拥塞就暂停下次读入,直到写队列不再拥塞,此时恢复socket的读取数据,并一直这样循环操作。上段代码其实实现了一段手动控制流量的机制,而Vert.x官方又提供了一种开箱即用的实现,只需要Pump.pump(sock, sock).start();
,即可将sock
读取到的数据控制地写入到sock
出口中。Pump中文即泵的意思,好比用泵将读取的数据流抽取到写入的数据流中,而这个泵可以控制抽取的节奏(Flow Control流控)。下图就是一个流程图
直接看Pump
接口与PumpImpl
实现代码,使用Pump
中静态方法创建一个泵,可以指定输入流ReadStream
与输出流WriteStream
,并且还可以指定输出流的写队列的最大长度。可以看到这里代码和刚才手动流控的代码基本是相似的,
不过这里多了一个incPumped
方法,用来对抽取出去的数据进行计数,可以调用numberPumped
获取已经抽取出去数据的数量。初始化好Pump
之后,需要使用start
与stop
方法进行开关,然后就可以由Pump
去自动控制整个过程了。
总结
Vert.x已经为我们提供了内建Streams的支持,并且调用Rxi-fied API(或者通过Rxjava helper将内建stream转换为对应的rxjava实现)也能提供反应式支持,除此之外,Vert.x也支持了Reactive Streams规范标准,可以在Vert.x中使用多种方式来让你的API streaming化。