深入理解Vert.x Core(7)-Streams

Reactive Programming

Vert.x宣称自己是一个反应式(reactive)的工具集,那么reactive在这里代表着什么呢?Wiki百科给出了反应式编程(Reactive Programming)的定义:

In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.

在反应式编程中,我们需要处理的是数据流,我们可以基于任何事物来创建一条数据流,流就是一个按照时间顺序发生的事件记录序列,下图就是一个流的示例。

ShowImage

流上面可以记录三种类型的事件,包括

  • 一个值 element value
  • 一个错误 error
  • 流完成的事件 completed

流可以是无限延长下去的,也就是说,如果不主动去关闭流或者因为错误而终止,流会一直进行。

我们通常可以对流进行订阅(subscribe)操作,在下游对发射的流数据进行处理。

ShowImage

上图就是我们使用一个订阅者去消费流的过程,看起来很美好,但是这里出现了问题,如果Publisher发布数据的速度远远超过Subscriber消费的速度会怎样,数据会在Subscriber这里产生堆积,一直到产生OOM(OutOfMemory)错误。所以,该如何解决这个问题呢,我们需要一个流控机制(Flow Control),也就是背压(Back-Pressure)。之前的数据消费是一个推(push)的过程,Publisher有多少数据要发送就推给Subscriber,但现在我们使用一种类似拉(pull)的机制,由Subscriber去控制消费数据的数量,主动去找Publisher索取数据,然后由Publisher push回可控数量内的数据。

ShowImage

Vert.x已经为我们提供了开箱即用的实现(ReadStreamWriteStream),类似的JVM实现还有RxJava, Project Reactor, Akka Streams等,Vert.x也提供了对Reactive Streams标准的支持vertx-reactive-streams,这样就可以通过规范在不同流处理实现间进行互操作了。

Vert.x中的流式处理

Vert.x中内建的流支持在io.vertx.core.streams包中,我们先来看一下这个包中的接口。

StreamBase

StreamBase中只有一个StreamBase exceptionHandler(@Nullable Handler<Throwable> handler);方法,表示流遇到异常时执行的handler。

ReadStream

实现ReadStream接口的类产生的对象就可以被看做可以读取数据的流了,我们可以对这个数据流进行控制,ReadStream继承了StreamBase,包含了以下方法:

1
2
3
4
5
6
7
8
9
ReadStream<T> exceptionHandler(Handler<Throwable> handler);
ReadStream<T> handler(@Nullable Handler<T> handler);
ReadStream<T> pause();
ReadStream<T> resume();
ReadStream<T> endHandler(@Nullable Handler<Void> endHandler);

exceptionHandler方法设置流读取数据遇到异常时执行的handler,handler方法设置这个流读到一条数据后执行的handler,pause表示暂停读取数据,resume表示继续读取数据,endHandler表示流结束了并且所有数据读取完了之后执行的handler。

官方文档已经说明了一些实现了ReadStream接口的API,之前在这篇文章中提及到了TimeoutStream的使用。

WriteStream

ReadStream相对应,实现了WriteStream的类产生的对象代表着可以写入数据的流,WriteStream有以下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
WriteStream<T> exceptionHandler(Handler<Throwable> handler);
WriteStream<T> write(T data);
void end();
default void end(T t) {
write(t);
end();
}
WriteStream<T> setWriteQueueMaxSize(int maxSize);
boolean writeQueueFull();
WriteStream<T> drainHandler(@Nullable Handler<Void> handler);

exceptionHandler设置写入数据时产生异常执行的handler,write方法表示将一个数据元素写入流中,write方法是非阻塞的,一旦调用立即返回,然后将需要写入的数据加入一个队列,然后这个真正的写入流的过程会异步发生。需要注意的是,如果需要写入的数据太多产生堆积,很可能会导致内存被耗尽。setWriteQueueMaxSize可以设置写入队列最大的长度,这个长度是由具体实现去做决定的,而writeQueueFull被用来检查写入队列是否已经排队排满了。end表示结束这个流。drainHandler表示当写入队列发生拥塞时,设置这个handler会进行以下操作:如果队列不再拥塞时,流接下来进行处理这个drainHandler的逻辑。

同样,官方文档也给出了一些实现了WriteStream接口的API。

Pump

官方文档给出了一个示例,介绍了如何将一个NetSocket所有读取到的入数据写到出口中,如下代码

1
2
3
4
5
6
7
8
9
10
11
server.connectHandler(sock -> {
sock.handler(buffer -> {
sock.write(buffer);
if (sock.writeQueueFull()) {
sock.pause();
sock.drainHandler(done -> {
sock.resume();
});
}
});
}).listen();

NetSocket收到数据后首先将读到的数据写到出口,并且检查写队列是否拥塞,如果拥塞就暂停下次读入,直到写队列不再拥塞,此时恢复socket的读取数据,并一直这样循环操作。上段代码其实实现了一段手动控制流量的机制,而Vert.x官方又提供了一种开箱即用的实现,只需要Pump.pump(sock, sock).start();,即可将sock读取到的数据控制地写入到sock出口中。Pump中文即泵的意思,好比用泵将读取的数据流抽取到写入的数据流中,而这个泵可以控制抽取的节奏(Flow Control流控)。下图就是一个流程图

ShowImage

直接看Pump接口与PumpImpl实现代码,使用Pump中静态方法创建一个泵,可以指定输入流ReadStream与输出流WriteStream,并且还可以指定输出流的写队列的最大长度。可以看到这里代码和刚才手动流控的代码基本是相似的,

1
2
3
4
5
6
7
8
9
10
11
12
13
PumpImpl(ReadStream<T> rs, WriteStream<T> ws) {
this.readStream = rs;
this.writeStream = ws;
drainHandler = v-> readStream.resume();
dataHandler = data -> {
writeStream.write(data);
incPumped();
if (writeStream.writeQueueFull()) {
readStream.pause();
writeStream.drainHandler(drainHandler);
}
};
}

不过这里多了一个incPumped方法,用来对抽取出去的数据进行计数,可以调用numberPumped获取已经抽取出去数据的数量。初始化好Pump之后,需要使用startstop方法进行开关,然后就可以由Pump去自动控制整个过程了。

总结

Vert.x已经为我们提供了内建Streams的支持,并且调用Rxi-fied API(或者通过Rxjava helper将内建stream转换为对应的rxjava实现)也能提供反应式支持,除此之外,Vert.x也支持了Reactive Streams规范标准,可以在Vert.x中使用多种方式来让你的API streaming化。

------ 本文结束 ------

版权声明


BillyYccc's blog by Billy Yuan is licensed under a Creative Commons BY-NC-SA 4.0 International License.
本文原创于BillyYccc's Blog,转载请注明原作者及出处!