前言

在上篇介绍了Eureka的原理,关于Eureka集群部署类似于ZK,需要把集群中每个节点信息都配置到配置文件中,在云原生时代,K8S部署Eureka时每个节点要有不同的配置文件,所以需要用statfulset的方式部署。上篇文章介绍过集群部署的三个方案及其优劣,如下所示:

  • 1双节点自动配置
    * 采用脚本方案,启动查数据库,设置defaultZone,但是此种方式挂了重启重新分配ip时集群其他节点不会动态更新
  • 2采用配置文件方式,使用服务名来配置集群,每个节点是一个服务,client也配置服务名
    * 使用K8S服务名配置,设置eureka为DNS注册方式
    * eureka可以动态加载配置文件,可实现扩缩容,但是扩缩容后微服务不感知,除非也修改微服务的配置文件
  • 3重新实现eureka获取defaultZone的方式
    * 重新实现获取defaultZone,让其读取数据库,实现动态加载
    本文详细介绍第三种方案

方案介绍

要想实现自动扩容,要考虑以下几点:

  • 1要有心跳更新,过期自动剔除,这样就需要eureka节点定期注册心跳
  • 2eureka server集群中的每个节点定期更新配置组成集群
  • 3每个微服务的eureka client也要定期更新server配置,防止server扩缩容后client不感知
  • 4既然不用配置文件,就需要在eureka server启动时就去连接数据库注册自己并获得其他节点配置。微服务中的client启动时去数据库拿server配置。

实现的目标:eureka server无论是扩缩容还是故障K8S飘逸节点后(更换IP)后都能重新组合成新的集群,无需更改任何eureka server和微服务的配置。把eureka server完全当做一个无状态节点部署,无需K8S做任何改动。

实现过程

创建表结构,用于自动注册并更新心跳

/* for eureka */
CREATE TABLE `paas_eureka` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'pk',
  `gmt_create` datetime NOT NULL COMMENT '创建时间',
  `gmt_modified` datetime NOT NULL COMMENT '修改时间',
  `ip` varchar(64) NOT NULL DEFAULT '' COMMENT 'eureka pod ip',
  `server_port` int(10) unsigned DEFAULT NULL COMMENT 'eureka port',
  `env_type` varchar(64) NOT NULL DEFAULT '' COMMENT '环境信息',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_ip_port` (`ip`,`server_port`)
) ENGINE=InnoDB AUTO_INCREMENT=0 CHARSET=utf8mb4 COMMENT='服务发现地址表';

eureka心跳更新

以下在eureka server中开发

  • 添加定时程序,用于注册自身和心跳更新
/**
 *  @author: zhangjikuan
 *  @Date: 2020/12/22 11:14
 *  @Description:
 */
@Service
@EnableScheduling
public class PaasEurekaHeartBeatDaemon {

    @Value("${paas.eureka.heartbeat.update.intervalSecond}")
    private Long intervalSecond;
    @Value("${paas.eureka.heartbeat.delete.intervalMin}")
    private Long intervalMin;
    @Value("${server.port}")
    private Integer port;

    @Autowired
    private PaasEurekaService paasEurekaService;

    private static final Logger logger = LoggerFactory.getLogger(PaasEurekaHeartBeatDaemon.class);

    /**
     * @Description  定时更新heartbeat执行器
     * @Author  zhangjikuan
     * @Date   2020/12/22 11:26
     * @Param  []
     * @Return      void
     * @Exception
     */
    public void heartbeatDaemonExecutor() {
        logger.info("set heartbeat executor: intervalSecond={}", intervalSecond);
        ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1,
                new BasicThreadFactory.Builder().namingPattern("eureka-heartbeat-thread-pool-%d").daemon(true).build());
        // sync heartbeat
        executor.scheduleWithFixedDelay(this::paasEurekaHeartbeatTask, 0, intervalSecond, TimeUnit.SECONDS);
    }

    /**
    * @Description 更新heartbeat,删除过期数据
    * @Author  zhangjikuan
    * @Date   2020/12/22 17:42
    * @Param
    * @Return
    * @Exception
    */
    private void paasEurekaHeartbeatTask() {
        String envType = System.getenv("ENV_TYPE");
        String ip = System.getenv("POD_IP");
        if (StringUtils.isBlank(envType) || StringUtils.isBlank(ip)) {
            logger.info("ENV_TYPE or POD_IP not in env, not update");
            return;
        }
        logger.info("update paas_eureka heartbeat, ip={}, port={}, envType={}", ip, port, envType);
        paasEurekaService.setPaasEureka(ip, port, envType);
        paasEurekaService.deletePaasEurekaUseEnvInterval(envType, intervalMin);
    }

}
  • 关于paasEurekaService是普通的代码数据更新代码,此处忽略
  • application里添加定时程序的调用
@EnableEurekaServer
@SpringBootApplication
@EnableScheduling
public class EurekaServerApplication implements CommandLineRunner {

    @Autowired
    private PaasEurekaHeartBeatDaemon paasEurekaHeartBeatDaemon;

    public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        paasEurekaHeartBeatDaemon.heartbeatDaemonExecutor();
    }
}

eureka peer动态更新

以上操作只完成了eureka启动后注册自身到心跳表并定时更新心跳,下面介绍eureka server重新实现配置更新代码,实现配置自动更新

  • 重新实现EurekaClientConfigBean.getEurekaServerServiceUrls
@Override
    public List<String> getEurekaServerServiceUrls(String myZone) {
        String serviceUrls = this.serviceUrl.get(myZone);
        // add by zhangjikuan
        String zone = myZone;
        if (serviceUrls == null || serviceUrls.isEmpty()) {
            serviceUrls = this.serviceUrl.get(DEFAULT_ZONE);
            zone = DEFAULT_ZONE;
        }
        // get url from metadata, add by zhangjikuan
        try {
            String newUrls = paasEurekaService.getEurekaServerServiceUrlsExceptSelves();
            if (!StringUtils.isEmpty(newUrls) && !newUrls.equals(serviceUrls)) {
                logger.info("SourceCode: ServiceUrls={}, metaUrls={}", serviceUrls, newUrls);
                serviceUrls = newUrls;
                Map<String, String> map = new HashMap<>(4);
                map.put(zone, serviceUrls);
                setServiceUrl(map);
            } else {
                logger.info("SourceCode: No set, ServiceUrls={}, metaUrls={}", serviceUrls, newUrls);
            }
        } catch (Exception e) {
            logger.error("SourceCode: set new ServiceUrls error");
            e.printStackTrace();
        }
        // end
        if (!StringUtils.isEmpty(serviceUrls)) {
            final String[] serviceUrlsSplit = StringUtils
                    .commaDelimitedListToStringArray(serviceUrls);
            List<String> eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length);
            for (String eurekaServiceUrl : serviceUrlsSplit) {
                if (!endsWithSlash(eurekaServiceUrl)) {
                    eurekaServiceUrl += "/";
                }
                eurekaServiceUrls.add(eurekaServiceUrl.trim());
            }
            return eurekaServiceUrls;
        }

        return new ArrayList<>();
    }

注意在上面的get方法中如果url有变化会去setServiceUrl(map);防止其他地方使用到了this.url

  • setService里加点log便于调试
public void setServiceUrl(Map<String, String> serviceUrl) {
        logger.info("SourceCode: set new ServiceUrls={}", serviceUrl);
        this.serviceUrl = serviceUrl;
    }

eureka client动态更新

  • 以上代码在eureka client上,也就是每个微服务中从配置文件拿server集群是相同的,所以以上代码的改动也要在每个微服务里做相同改造。这样每个我服务也会动态从数据库中拿server配置,实现集群自动联动。

eureka server的相关配置如下

eureka.instance.preferIpAddress=true
eureka.client.registerWithEureka=true
eureka.client.fetchRegistry=true
eureka.server.enableSelfPreservation=false
eureka.server.peerEurekaNodesUpdateIntervalMs=120000

eureka client的相关配置如下

eureka.instance.preferIpAddress=true
eureka.client.healthcheck.enabled=true
eureka.client.registryFetchIntervalSeconds=30
eureka.client.eureka-service-url-poll-interval-seconds=120
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐