一 介绍
在很多次场景下,外部请求需要查询Zuul后端的多个微服务。举个例子,一个电影售票手机APP,在购票订单页上,既需要查询“电影微服务”获得电影相关信息,又需要查询“用户微服务”获得当前用户的信息。如果让手机端直接请求各个微服务(即使使用Zuul进行转发),那么网络开销、流量耗费、耗费时长可能都无法令人满意。那么对于这种场景,可使用Zuul聚合微服务请求——手机APP只需发送一个请求给Zuul,由于Zuul请求用户微服务以及电影微服务,并组织好数据给手机APP。
使用这种方式,手机端只须发送一次请求即可,简化了客户端侧的开发;不仅如此,由于Zuul、用户微服务、电影微服务一般都在同一局域网,因此速度非常快,效率会非常高。
下面围绕以上这个场景,来编写代码。
在本例中,使用了RxJava结合Zuul来实现微服务请求的聚合。
二 新建项目microservice-gateway-zuul-aggregation
三 修改启动类
package com.itmuch.cloud.study;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.zuul.EnableZuulProxy;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
@EnableZuulProxy
public class ZuulApplication {
  public static void main(String[] args) {
    SpringApplication.run(ZuulApplication.class, args);
  }

  @Bean
  @LoadBalanced
  public RestTemplate restTemplate() {
    return new RestTemplate();
  }
}
四 创建实体类
package com.itmuch.cloud.study;
import java.math.BigDecimal;
public class User {
  private Long id;
  private String username;
  private String name;
  private Integer age;
  private BigDecimal balance;
  public Long getId() {
    return id;
  }
  public void setId(Long id) {
    this.id = id;
  }
  public String getUsername() {
    return username;
  }
  public void setUsername(String username) {
    this.username = username;
  }
  public String getName() {
    return name;
  }
  public void setName(String name) {
    this.name = name;
  }
  public Integer getAge() {
    return age;
  }
  public void setAge(Integer age) {
    this.age = age;
  }
  public BigDecimal getBalance() {
    return balance;
  }
  public void setBalance(BigDecimal balance) {
    this.balance = balance;
  }
}
五 创建聚合服务
package com.itmuch.cloud.study;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import rx.Observable;

@Service
public class AggregationService {
  @Autowired
  private RestTemplate restTemplate;

  @HystrixCommand(fallbackMethod = "fallback")
  public Observable<User> getUserById(Long id) {
    // 创建一个被观察者
    return Observable.create(observer -> {
      // 请求用户微服务的/{id}端点
      User user = restTemplate.getForObject("http://microservice-provider-user/{id}", User.class, id);
      observer.onNext(user);
      observer.onCompleted();
    });
  }

  @HystrixCommand(fallbackMethod = "fallback")
  public Observable<User> getMovieUserByUserId(Long id) {
    return Observable.create(observer -> {
      // 请求电影微服务的/user/{id}端点
      User movieUser = restTemplate.getForObject("http://microservice-consumer-movie/user/{id}", User.class, id);
      observer.onNext(movieUser);
      observer.onCompleted();
    });
  }

  public User fallback(Long id) {
    User user = new User();
    user.setId(-1L);
    return user;
  }
}
六 创建Controller,在Controller中聚合多个请求
package com.itmuch.cloud.study;

import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import rx.Observable;
import rx.Observer;

import java.util.HashMap;

@RestController
public class AggregationController {
  public static final Logger LOGGER = LoggerFactory.getLogger(ZuulApplication.class);

  @Autowired
  private AggregationService aggregationService;

  @GetMapping("/aggregate/{id}")
  public DeferredResult<HashMap<String, User>> aggregate(@PathVariable Long id) {
    Observable<HashMap<String, User>> result = this.aggregateObservable(id);
    return this.toDeferredResult(result);
  }

  public Observable<HashMap<String, User>> aggregateObservable(Long id) {
    // 合并两个或者多个Observables发射出的数据项,根据指定的函数变换它们
    return Observable.zip(
            this.aggregationService.getUserById(id),
            this.aggregationService.getMovieUserByUserId(id),
            (user, movieUser) -> {
              HashMap<String, User> map = Maps.newHashMap();
              map.put("user", user);
              map.put("movieUser", movieUser);
              return map;
            }
    );
  }

  public DeferredResult<HashMap<String, User>> toDeferredResult(Observable<HashMap<String, User>> details) {
    DeferredResult<HashMap<String, User>> result = new DeferredResult<>();
    // 订阅
    details.subscribe(new Observer<HashMap<String, User>>() {
      @Override
      public void onCompleted() {
        LOGGER.info("完成...");
      }

      @Override
      public void onError(Throwable throwable) {
        LOGGER.error("发生错误...", throwable);
      }

      @Override
      public void onNext(HashMap<String, User> movieDetails) {
        result.setResult(movieDetails);
      }
    });
    return result;
  }
}
七 微服务聚合测试
1 启动eureka
2 启动user微服务
3 启动movie微服务
4 启动聚合微服务
5 访问http://localhost:8040/aggregate/1
{"movieUser":{"id":1,"username":"account1","name":"张三","age":20,"balance":100.00},"user":{"id":1,"username":"account1","name":"张三","age":20,"balance":100.00}}
说明已成功聚合了用户微服务以及电影微服务的RESTful API
八 Hystrix容错测试
1 停止user微服务
2 停止movie微服务
{"movieUser":{"id":-1,"username":null,"name":null,"age":null,"balance":null},"user":{"id":-1,"username":null,"name":null,"age":null,"balance":null}}
说明fallback方法正常被触发,能够正常回退。
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐