微服务不是一个新事物,1970年就出现了,如今右变得流行。因为它可以快速构建或修改出有价值的产品。

我们都知道分布式系统很复杂很难构建,于是响应式应运而生。

但是什么是响应式(reactive)呢?牛津词典解释是“对刺激的反应”.

所以响应式系统根据外界变化调整自己的表现。(例如bootstrap根据屏幕的大小改变布局)

  • 响应式编程-数据驱动
  • 响应式系统-异步消息 


让我们看看RxJava(ReactiveXJava)是如何实现的。

observable.subscribe(   data -> { // onNext
    System.out.println(data);
  },
  error -> { // onError     error.printStackTrace();
  },
  () -> { // onComplete
    System.out.println("No more data");
  }
);

RxJava可以做什么,下边是一个上传下载的示例

// Asynchronous task downloading a document
Future<String> downloadTask = download(); // Create a single completed when the document is downloaded.
Single.from(downloadTask)
  // Process the content
  .map(content -> process(content))
  // Upload the document, this asynchronous operation   // just indicates its successful completion or failure.   .flatMapCompletable(payload -> upload(payload))
  .subscribe(
  () -> System.out.println("Document downloaded, updated
               and uploaded"),   t -> t.printStackTrace() );

自适应系统有两个特性:

  • 弹性-水平伸缩的能力(根据访问量增加减少实例)
  • 回调-处理失败的能力

当自适应系统面临负载峰值时,他会自动增加一个实例。

下一章我们将会看到Vert.x如果应对这些问题。

响应式微服务

我们构建一个微服务系统,每个服务器都可能出现各种问题,但是我们不能让它影响到整个系统。你的程序必须接受改变和有能力处理错误。

无源微服务是自治的。 他们可以适应周围服务的可用性或不可用性。 然而,自治与隔离配合。 活性微服务可以在本地处理失败,独立行动,并根据需要与其他人合作。 反应式微服务使用异步消息传递与其对等方进行交互。 它还接收消息并能够产生对这些消息的响应。

由于异步消息传递,反应式微服务可能会面临失败并相应地调整其行为。 不应该传播失败,而应该接近根本原因。 当微服务爆炸时,消费者微服务必须处理失败并且不传播它。 这种隔离原理是防止故障冒泡并破坏整个系统的关键特征。 恢复力不仅关乎管理失败,还关系到自我修复。 发生故障时,被动微服务应实施恢复或补偿策略。

最后,反应式微服务必须具有弹性,因此系统可以根据实例的数量来管理负载。 这意味着一系列限制,例如避免内存中状态,如果需要可以在实例之间共享状态,或者能够将消息路由到相同的实例以进行有状态服务。

什么是Vert.x

Vert.x是一个工具集,它可以用非阻塞的异步模型来开发响应式和分布式的系统。因为它是一个工具集不是一个框架,所以你可以像使用其他lib一样把它放到你的项目中。它不强制你如何构造或组织你的系统,你可以用任何方式使用它。它很灵活,你可以用它构建一个独立的应用,也可以把它嵌入到其他大型系统中。

站在开发者的角度,Vert.x是一套JAR文件。每一个Vert.x模块都是一个JAR文件你可以添加到你的CLASSPATH中。从HTTP服务到HTTP客户端到消息通信到更低层的协议例如TCP和UDP,Vert.x都提供了一整套模组来让你构建你想构建的应用。你可以选择任何模组作为Vert.x核心的插件来构建你的系统。下图抽象出一个Vert.x的生态系统。

Vert.x也提供了很多原料帮组你构建微服务系统。同时它也可以构建响应式微服务。

异步开发模式

所有用Vert.x构建的应用都是异步的。Vert.x应用是事件驱动和非阻塞性的。当关注的事情发生时你的应用会发生通知。让我们来看一个具体的示例。Vert.x提供了一个简单的方式来创建一个HTTP服务。每当收到HTTP请求这个HTTP服务都会接到通知。

vertx.createHttpServer()
    .requestHandler(request -> {
        // This handler will be called every time an HTTP
        // request is received at the server         request.response().end("hello Vert.x");
    })
    .listen(8080);

在例子中,我们设置了一个请求处理来接收HTTP请求(事件)和返回“hello Vert.x”(响应).

一个Handler是一个事件通知可以调用的方法。在示例中,这个方法被调用每当收到请求。需要注意的是这个方法没有返回结果。然后一个Handler可以提供一个结果。

除了很少的例外,没有API可以阻塞调用进程。如果一个结果可以立马被提供,它也将立马返回。否者,一个Handler将会过一会才会收到。Handler会被通知,当一个事件准备好被处理或者一个异步处理已经计算出了结果。

在传统编程中,你可能会这样写代码:

int res = compute(1,2);

在这个代码中你等待这个方法的结果。

当你切换到异步非阻塞开发模式,你可以在返回结果后调用Handler.

compute(1, 2, res -> {
    // Called with the result
});

这就像ajax那样。

在这个代码片段中,compute方法不返回任何结果,所以你不必等待,结果被计算出来后会自动调用Handler.

非常感谢异步开发模式,你可以处理高并发用很少的线程。在大多数情况,Vert.x调用你的Handler用一个叫做even loop的线程。如下图所示。它消费事件队列,并把事件发送到对应的Handler中进行处理。

线程模式的提出有一个很大的好处,简化了并发。因为只有一个线程,所以你每次只用调用一个线程而不用并发。同时,也有一个非常重要的规则你需要遵守:

      不要阻塞event loop

              --Vert.x 黄金准则


因为不会被阻塞,event loop可以在很短时间里转发大量事件。这被称作反应堆模式。

让我们想象一下,在一瞬间,你破坏了规则,在前边的代码片段中,Handler会一直被event loop调用。其他请求将会排队,等待这个Handler执行完成。这样做你将失去使用Vert.x带来的好处。

那么什么可以阻塞呢?

一个显然的例子是JDBC数据库连接。他们天生是阻塞的。长时间的计算也是阻塞的。举个例子,一个计算小数后点20万位π值的代码也是阻塞型的。不用担心,Vert.x也提供方法来处理这些阻塞型代码。

如下图,Vert.x提供了很多event loop

事件被不同的event loop 调度。然而,当一个Handler被某个event loop执行,它将会一直被这个event loop执行。

多个event loop可以被均衡到不同的CPU内核上。HTTP怎么工作呢?Vert.x只注册一次socket监听,然后把请求调度到不同的event loop上。

Verticles-基本代码块

Vert.x给你很大自由,如何实现你的应用和代码。但是他也提供了简单开始编写Vert.x的方法。Verticles是一大块可以被部署和运行的代码。一个应用,例如一个微服务,可能包含大量verticle 同时运行在同一个Vert.x实例中。

一个典型的verticle可以创建服务端或者客户端,注册一套Handles,或者包含一些业务逻辑。

常见的verticle在event loop上执行并且永远不会阻塞。Vert.x 确保verticle一直被同一个进程执行并且不会被并发,由此来避免同时构造。在java中,一个verticle是一个AbstractVerticle类的扩展

import io.vertx.core.AbstractVerticle;
public class MyVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        // Executed when the verticle is deployed
    }
    @Override
    public void stop() throws Exception {
        // Executed when the verticle is un-deployed
    }
}

verticles可以创建服务端客户端还可以调用其他verticles,也能部署其他verticles,配置他们,设置创建实例的数量。这些实例会被关联到不同的event loop上,同时vert.x平衡这些实例的负载。

从回调到观察

正如前边看到的,Vert.x开发模式使用回调。在编排多个异步操作时,这种基于回调的开发模型倾向于生成复杂的代码。

例如,让我们看看我们将如何从数据库中检索数据。 首先,我们需要一个到数据库的连接,然后我们发送一个查询到数据库,处理结果并释放连接。 所有这些操作都是异步的。 使用回调函数,您可以使用Vert.x JDBC客户端编写以下代码:

client.getConnection(conn -> {
    if (conn.failed()) {/* failure handling */}
    else {
        SQLConnection connection = conn.result();         connection.query("SELECT * from PRODUCTS", rs -> {             
if (rs.failed()) {/* failure handling */}
            else {
                List<JsonArray> lines =                     rs.result().getResults();                 
for (JsonArray l : lines) {
                    System.out.println(new Product(l));
                }
                connection.close(done -> {
                    if (done.failed()) {/* failure handling */}
                });
            }
        });
    }
});

虽然仍然易于管理,但该示例显示回调可能会很快导致无法读取的代码。 您也可以使用Vert.x Futures处理异步操作。 与Java期货不同,Vert.x期货不受阻碍。 期货提供更高层次的组合操作员来构建操作序列或并行执行操作。 通常情况下,如下面的代码片段所示,我们构建未来以构建异步操作序列:

Future<SQLConnection> future = getConnection();
future
    .compose(conn -> {         connection.set(conn);
        // Return a future of ResultSet         
      return selectProduct(conn);
    })
    // Return a collection of products by mapping
    // each row to a Product
    .map(result -> toProducts(result.getResults()))
    .setHandler(ar -> {
        if (ar.failed()) { /* failure handling */ }
        else {
            ar.result().forEach(System.out::println);
        }
        connection.get().close(done -> {
            if (done.failed()) { /* failure handling */ }
        });
    });

然而,虽然期货使代码更具说明性,但我们正在检索一批中的所有行并处理它们。 这个结果可能很大,需要很多时间来检索。 同时,您不需要整个结果就可以开始处理它。 我们可以一行一行地处理每一行,只要你有它们。 幸运的是,Vert.x为这种开发模式挑战提供了答案,并为您提供了一种使用响应式编程开发模型实现响应式微服务的方法。 Vert.x提供RxJava API来:

•组合和协调异步任务

•将输入消息作为输入流进行响应

Let’s rewrite the previous code using theRxJava APIs:


// We retrieve a connection and cache it, // so we can retrieve the value later. 
Single<SQLConnection> connection = client.rxGetConnection();
 connection.flatMapObservable(conn ->
    conn
      // Execute the query
      .rxQueryStream("SELECT * from PRODUCTS")
      // Publish the rows one by one in a new Observable
      .flatMapObservable(SQLRowStream::toObservable)
      // Don't forget to close the connection
      .doAfterTerminate(conn::close)
  )
  // Map every row to a Product
  .map(Product::new)
  // Display the result one by one
  .subscribe(System.out::println);
除了提高可读性以外,响应式编程允许您在可用时立即订阅结果流和处理项。 借助Vert.x,您可以选择您喜欢的开发模式。 在这份报告中,我们将使用回调和RxJava。

我们开始编程吧!

是时候让你的手变脏了(落灰好久了,我的键盘早已饥渴难耐)。我们来用maven来开发我们的第一个Vert.x应用。当然你还可以用其他工具,例如Gradle,Ant.

键入命令:

mkdir my-first-vertx-app 
cd my-first-vertx-app
mvn io.fabric8:vertx-maven-plugin:1.0.5:setup \
  -DprojectGroupId=io.vertx.sample \
  -DprojectArtifactId=my-first-vertx-app \
  -Dverticle=io.vertx.sample.MyFirstVerticle

这个命令会自动生成vert.xmaven项目。

编写你的第一个verticle

修改MyFirstVerticle.java

package io.vertx.sample; import io.vertx.core.AbstractVerticle;
/**  * A verticle extends the AbstractVerticle class.
 */
public class MyFirstVerticle extends AbstractVerticle {
  @Override
  public void start() throws Exception {     // We create a HTTP server object     vertx.createHttpServer()
      // The requestHandler is called for each incoming
      // HTTP request, we print the name of the thread
      .requestHandler(req -> {         req.response().end("Hello from "
          + Thread.currentThread().getName());
      })
      .listen(8080); // start the server on port 8080
  }
}
mvn compile vertx:run

访问地址localhost:8080

当你运行了vertx:run每当你修改了代码,程序都会自动发布,你刷新浏览器就可以看大。

当前请求被eventloop 0处理,你可以打开新的页面,都是同一个event loop在处理。

使用RxJava

在pom文件中加入
<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-rx-java</artifactId>
</dependency>

修改pom


创建MyFirstRXverticle

package io.vertx.sample;
// We use the .rxjava. package containing the RX-ified APIs
import io.vertx.rxjava.core.AbstractVerticle; import io.vertx.rxjava.core.http.HttpServer; public class MyFirstRXVerticle extends AbstractVerticle {
  @Override
  public void start() {
    HttpServer server = vertx.createHttpServer();     
    // We get the stream of request as Observable     
    server.requestStream().toObservable()
      .subscribe(req ->
        // for each HTTP request, this method is called
        req.response().end("Hello from "
          + Thread.currentThread().getName())
      );
    // We start the server using rxListen returning a
    // Single of HTTP server. We need to subscribe to
    // trigger the operation
    server
      .rxListen(8090)
      .subscribe();
  }
}

把你的应用打包成Fat Jar

Vert.x Maven插件 支持把你的应用打包为fat jar.这样你可以很容易用命令

java -jar name.jar 来运行你的应用。

mvn clean package
cd target
java -jar my-first-vertx-app-1.0-SNAPSHOT.jar

这样你的应用又可以运行了。

fat jar自带依赖jar运行起来很方便。

日志,警告和其他生产要素

打包成fat jar 是一个很好的打包模式对微服务来讲,因为很方便部署和启动。但是部署到生产环境我们还需要做什么呢?一般来说我们还需要记录日志,加载外部配置,运行状态监控等等。

不用担心Vert.x提供了所有这些功能。你可以用Log4等来记录日志,可以用JMX来监控Vert.x运行。

总结

在这一章我们学习了响应式微服务和Vert.x.你同时也创建了你的第一个Vert.x应用。
本章绝非全面的指南, 只是简要介绍了主要概念。
如果您想进一步了解这些主题, 请查看以下资源:

•   Reactiveprogramming vs. Reactive systems

•   The Reactive Manifesto

•   RxJava website

•   ReactiveProgramming with RxJava

•   The Vert.xwebsite



Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐