BillyYccc


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

深入理解Vert.x Core(7)-Streams

发表于 2018-04-15 | 分类于 Vert.x

Reactive Programming

Vert.x宣称自己是一个反应式(reactive)的工具集,那么reactive在这里代表着什么呢?Wiki百科给出了反应式编程(Reactive Programming)的定义:

In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.

在反应式编程中,我们需要处理的是数据流,我们可以基于任何事物来创建一条数据流,流就是一个按照时间顺序发生的事件记录序列,下图就是一个流的示例。

ShowImage

流上面可以记录三种类型的事件,包括

  • 一个值 element value
  • 一个错误 error
  • 流完成的事件 completed

流可以是无限延长下去的,也就是说,如果不主动去关闭流或者因为错误而终止,流会一直进行。

我们通常可以对流进行订阅(subscribe)操作,在下游对发射的流数据进行处理。

ShowImage

上图就是我们使用一个订阅者去消费流的过程,看起来很美好,但是这里出现了问题,如果Publisher发布数据的速度远远超过Subscriber消费的速度会怎样,数据会在Subscriber这里产生堆积,一直到产生OOM(OutOfMemory)错误。所以,该如何解决这个问题呢,我们需要一个流控机制(Flow Control),也就是背压(Back-Pressure)。之前的数据消费是一个推(push)的过程,Publisher有多少数据要发送就推给Subscriber,但现在我们使用一种类似拉(pull)的机制,由Subscriber去控制消费数据的数量,主动去找Publisher索取数据,然后由Publisher push回可控数量内的数据。

ShowImage

Vert.x已经为我们提供了开箱即用的实现(ReadStream与WriteStream),类似的JVM实现还有RxJava, Project Reactor, Akka Streams等,Vert.x也提供了对Reactive Streams标准的支持vertx-reactive-streams,这样就可以通过规范在不同流处理实现间进行互操作了。

阅读全文 »

深入理解Vert.x Core(6)-SharedData

发表于 2018-04-02 | 分类于 Vert.x

Vert.x中的共享数据(Shared Data)

在Vert.x中,提出了共享数据SharedData这个概念。要使用共享数据,可以调用
SharedData sharedData = vertx.sharedData(); 获取共享数据的一个对象。而在VertImpl 类中初始化SharedData 的操作是this.sharedData = new SharedDataImpl(this, clusterManager); ,我们先来看一下SharedData接口。

官方文档说明了SharedData 的使用目的:

共享数据可以让你在应用的不同部分之间共享数据,或者在同一个Vert.x实例(instance)中的不同应用之间共享数据,还可以在多个Vert.x实例的集群之间共享数据。

共享数据提供了以下:

  • 同步的共享map(本地模式)
  • 异步的maps(本地模式或者集群范围)
  • 异步的锁(本地模式或者集群范围)
  • 异步计数器(本地模式或者集群范围)

对应的,我们可以在SharedData接口中看到以下方法:

1
2
3
4
5
6
<K, V> LocalMap<K, V> getLocalMap(String name);
<K, V> void getAsyncMap(String name, Handler<AsyncResult<AsyncMap<K, V>>> resultHandler);
<K, V> void getClusterWideMap(String name, Handler<AsyncResult<AsyncMap<K, V>>> resultHandler);
void getLock(String name, Handler<AsyncResult<Lock>> resultHandler);
void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler);
void getCounter(String name, Handler<AsyncResult<Counter>> resultHandler);

阅读全文 »

DDIA distilled(III)

发表于 2018-01-21 | 分类于 Reading

Derived Data

存储与处理数据的系统大致上可以分为两个大类:

  • Systems of record
    记录系统(system of record),也被认为是事实的来源(source of truth),保存着权威版本的数据。每一个事实(fact)都被准确地表达了一次(这个表达representation的过程被称为normalized)。

  • Derived data systems
    衍生系统(derived system)中的数据是另外一个系统中已有的数据以某种方式转换或处理得出的结果。从技术角度来说,衍生数据是冗余的,因为它重复了已有的信息。如果你丢失掉了衍生数据,你可以通过源数据再重新得到这份数据。从一个单一的数据源衍生出不同的数据集,可以从不同角度来理解这份数据,这个过程被称为denormalized。

我们现在来区分三种不同类型的系统:

  • Services(online systems)
    一个服务等待着客户端的一个请求或者指令的到达,之后尽可能快地处理然后发送响应。响应时间(response time)通常是服务性能的衡量标准,可用性也非常重要。

  • Batch processing systems(offline systems)
    一个批处理系统一次接收大量的输入数据,运行一个任务(job),处理数据并生成一些数据。这种任务通常会花费一段时间,因此用户一般不会等待任务的完成。衡量批处理的性能指标是吞吐量(throughput)。

  • Stream processing systems(near-real-time systems)
    流处理介于在线(online)处理与离线(offline/batch)处理之间,因此有时被叫做近实时(near-real-time)处理。类似批处理系统,流处理消费输入数据并生成输出数据,不同之处在于,流处理任务在事件发生后短暂时间内就开始处理,而批处理一次操作一个固定的输入数据集。

阅读全文 »

DDIA distilled(II)

发表于 2018-01-15 | 分类于 Reading

分布式数据

我们有以下一些理由去将数据库分布在多台机器上:

  • 可伸缩性Scalability
    如果数据量、读写负载大到超过单台机器所能达到的上限,就可以将负载分摊到多台机器上。

  • 容错能力/高可用性Fault tolerance/high availability
    如果单台机器或者部分机器出现故障,我们需要多台机器提供的冗余(redundancy)能力确保服务依然可用。

  • 延迟Latency
    如果用户分布在世界各地,我们期望用户能够访问离他们地理位置最近的服务器或者数据中心,这样可以减少网络延时。

通常有两种方式将数据分布到多个节点

  • 复制Replication
    将相同的数据拷贝到多个不同的节点上,复制能够提供冗余能力,保证服务的高可用性,并且还能提升性能。

  • 分区Partitioning
    将一个大的数据库划分为更小的子集,这些子集被称作分区(partition),不同的分区就会被分配到不同的节点上(这种方式也被称为分片sharding)。

阅读全文 »

DDIA distilled(I)

发表于 2018-01-12 | 分类于 Reading

最近读完了 Designing Data-Intensive Applications,这本书从实际工程的角度出发,全方位地剖析了现代系统与数据的关系和各种技术背后的设计思想,为我们分析了每种设计天生所带来的优势劣势以及为了适用不同使用场景上所作出的权衡。此系列文章是对该书内容的一些总结和精炼。

前言

近些年服务端或者后端系统都被存储与处理数据这两个目标带来的问题围绕。我们可以将这样的一个应用称作数据密集型应用(Data-intensive applications),这个应用的首要挑战是数据,包括数据量,数据复杂性以及数据变化的速度。

以下一些条件推动了构建数据库、分布式系统或者基于前二者的应用上的发展。

  • 互联网公司有着海量数据与流量需要处理,使得他们必须找出一种能高效处理如此大规模数据的新工具
  • 业务需要更加敏捷,为了快速响应市场必须缩短开发周期并且保持数据模型的灵活
  • 开源软件行业更加成功,并且被大量使用在商业环境中
  • CPU的速度增长减缓,“The free lunch is over”, 而多核处理器与网络却在高速发展,并行处理成为了唯一增加性能的方式
  • IaaS使得构建在多台甚至跨地理区域的多台机器的系统更加容易
  • 服务对高可用性要求更高

过去的十年间,为了帮助数据密集型应用响应存储与处理数据的需要,NoSQL, Big Data等等新兴技术出现了爆炸式增长,消息队列、缓存、搜索索引、批处理与流处理框架等等工具被使用在实战中。

在如此多技术中选出适合自身需求的工具成为新的烦恼,但是只需要记住“There are no silver bullets.” 这一条经典法则即可。这些不同的工具不过是不同目标导致设计理念上差异化带来的结果,因此它们有自己独特的优势,也一定会因为一些局限性做出了trade-offs。

阅读全文 »

深入理解Vert.x Core(5)-Vertx(II)

发表于 2017-12-25 | 分类于 Vert.x

CloseHooks机制

CloseHooks是Vert.x中一项内部使用的特性,在Vertx实例或者Verticle关闭时,CloseHooks能实现自动清理(比如HTTP Server,Net Server…)的功能。

我们了解下CloseHooks实现的大概流程。

在我们的Verticle或者Vertx实例部署好以后,我们通常会使用vertx.createHttpServer()去部署一些HTTP Server或者Net Server,Net Client等等。这些都是可以在关联的Verticle或者Vertx实例关闭之后实现自动清理的资源。
下面以在Verticle上的Net Server为例
①通过vertx.createNetServer()创建一个NetServer,返回一个NetServerImpl实例
②在NetServerImpl构造器中,会将当前Verticle所在Context hook上这个NetServer。

1
2
3
4
5
6
7
8
9
public NetServerImpl(VertxInternal vertx, NetServerOptions options) {
this.vertx = vertx;
// ...
this.creatingContext = vertx.getContext();
if (creatingContext != null) {
// ...
creatingContext.addCloseHook(this);
}
}

NetServerImpl实现了Closeable接口,Closeable接口只有一个close方法。

Context的addCloseHook()方法就是将实现了Closeable接口的资源添加到自己内部类CloseHooks中closeHooks的这个Set中。

③Verticle在unDeploy()以后,会将自己持有的Context对象的CloseHooks对象执行run()方法,就是对每一个Set中Closeable对象调用close()方法,这样就能实现自动清理所有hook上的资源的功能。

这种机制其实是一种观察者模式的实现,Verticle的Context或者Vertx实例是被观察者,可以添加观察者Closeable,在订阅的关系形成之后,一旦Verticle或者Vertx关闭,那么就会通知观察者(可以关闭的资源)去执行它们自己的close()方法,这样就可以实现自动清理的机制。

阅读全文 »

深入理解Vert.x Core(4)-Vertx(I)

发表于 2017-12-15 | 分类于 Vert.x

创建Vertx实例

官方文档上面写着这样一句话

If you’re embedding Vert.x then you simply create an instance as follows:
Vertx vertx = Vertx.vertx();

我们在使用Vert.x的时候一般将应用封装成多个Verticle,指定好MainVerticle并使用官方提供的Launcher,通过fat-jar打包,然后直接使用java -jar -xxx app-fat-jar-1.0.jar 执行,但是嵌入Vert.x使用的时候,比如你写个Main 方法去调用Vert.x的API,这种就属于嵌入使用,这时候就需要手动创建Vert.x instance。

那么在Vertx vertx = Vertx.vertx(); 执行时到底发生了什么呢?

查看Vertx 接口的静态工厂方法,跟Future.future()有些类似

1
2
3
static Vertx vertx() {
return factory.vertx();
}

找到factory的初始化地方
VertxFactory factory = ServiceHelper.loadFactory(VertxFactory.class);
和Future一样,也是通过SPI接口暴露服务,找到真正的工厂实现类VertxFactoryImpl,返回的是VertxImpl实例。

阅读全文 »

深入理解Vert.x Core(3)-Verticle

发表于 2017-12-07 | 分类于 Vert.x

进入Verticle的世界

尽管官方文档说不会强迫你去使用Verticle这种模型,但是Verticle可以轻松给你带来scalability, 如果想进行水平扩展,增加部署的verticle instances就可以了。除此之外,使用Standard Verticle或者Worker Verticle能保证线程安全,而不需要你自己去处理一些多线程的问题,所以为什么不使用Verticle呢?

在进入Verticle之前,我们必须通过Vertx这一关,在概览中已经说过了,Vertx实例是Vert.x API的入口。那么我们接下来看看如何从Vertx中进入Verticle。

阅读全文 »

深入理解Vert.x Core(2)-Future

发表于 2017-12-04 | 分类于 Vert.x

异步调用

Vert.x中几乎所有的API都是异步的,并且它们不会阻塞当前所在的线程。异步使得你可以安排做一项任务(调用函数),并且在任务结束后得到通知,在任务处理的阶段(函数处理过程)不必等待它完成,你可以在这个过程中做点别的事情。但是在实现异步API时,必须有一种机制能通知调用者关于这个任务执行的信息。

在Vert.x中,你可以用以下这几种方式完成异步调用操作:

  • Callback
  • Vert.x提供的Future
  • RxJava
  • Coroutines in Kotlin

Callback即回调,这种方式并不难理解,比如利用Vertx实例部署一个Verticle 这种代码就是基于回调的风格:

1
void deployVerticle(String name, Handler<AsyncResult<String>> completionHandler);

但是,在异步调用的Callback里面连续进行异步调用时,很快就会陷入”回调地狱”,你会发现代码非常的难以阅读,假如我们需要链式部署两个MyVerticle1和MyVerticle2,只有在MyVerticle1成功部署后才会去部署MyVerticle2,使用基于回调的方式,下面的代码是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
vertx.deployVerticle(MyVerticle1.class.getName(), res -> {
if (res.succeeded()) {
// MyVerticle1 deploy succeeded...
vertx.deployVerticle(MyVerticle2.class.getName(), ar -> {
if (ar.succeeded()) {
// MyVerticle2 deploy succeeded...
} else {
// MyVerticle2 deploy failed...
}
});
} else {
// MyVerticle1 deploy failed...
}
});

通过Vert.x自己实现的Future,在一定程度上解决了代码阅读性的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
Future<String> future1 = Future.future();
vertx.deployVerticle(MyVerticle1.class.getName(), future1.completer());
future1.compose(deploymentId -> {
Future<String> future2 = Future.future();
vertx.deployVerticle(MyVerticle2.class.getName(), future2.completer());
return future2;
}).setHandler(ar -> {
if (ar.succeeded()) {
// both verticles are OK
} else {
// one of the verticles is down
}
});

上面这两种方式在Vert.x Core中都有使用,Rxjava与Coroutine属于Vert.x ext的高级特性,本文就省略了,但是他们都能使得你的代码阅读性与异步调用功能更加强大。

阅读全文 »

深入理解Vert.x Core(1)-Context

发表于 2017-11-27 | 分类于 Vert.x

Context

在Vert.x核心组件概览一文中已经谈到了Context的基本概念,在更加深入地了解VertxImpl,Verticle的内部原理等等之前,有必要了解Context的一些内部机制。本文将对Vert.x的Context进行深入分析。

先来看下Context的结构

ShowImage

最顶层的是Context接口,下面是ContextInternal接口,这个接口是提供给Vert.x内部使用的API,ContextImpl是一个抽象类,包含了Context的基本属性以及方法,再往下层就是Context的实现类,其实看到源码可以知道Vert.x中一共有四种Context,但是这个BenchmarkContext的目的不是给用户使用的,它主要被用来对Vert.x一些关键部分进行性能测试,比如Json的编解码、HTTP头部编码、HTTP Handler处理HttpRequest和HttpResponse的速度等等…可以参考这里。就是说,真正在Vert.x中应用的Context就三种。三种Context使用的场景在这里就不再介绍,可以参考对应Verticle的用途。

阅读全文 »
12…5
Billy Yuan

Billy Yuan

48 日志
6 分类
15 标签
GitHub E-Mail Twitter
© 2018 Billy Yuan
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.3