snowflake 需要配置workerId dataCenterId, 可以用k8s的配置文件,zookeeper唯一节点实现。

本文基于redis setnx  redisTemplate.opsForValue().setIfAbsent 实现

通过心跳 设置有效时间 保持服务,服务停止则释放资源

set(key,val,expire) 

同一个服务最多可注册 1023个服务实例

/**
 * 
 * 服务是指的 同一个服务不同实例
 * 当前服务连接不到redis, 当重新链接时可能当前workerId已释放或被其他服务实例占用
 * 默认 心跳时间30s  过期时间90s 过期时间>2*心跳时间,断开链接redis中的workerId不会被其他服务实例占用
 * 断开链接: 重置uuid (使redis中的key-uuid 失效), 重置snowflake(当前服务生成id服务不可用), 重新获取workId
 * 缩短重试时间, 当恢复链接 快速生成workerId
 *
 * redis 数据丢失  flushDB 后至少需等待一个心跳时间 才可以注册其他服务实例
 * 若没有等待一个心跳时间有其他服务实例注册,可能多个服务实例公用一个workerId,下一个心跳由于uuid不一样则自动重新注册
 */

gitee: https://gitee.com/tg_seahorse/demo-cloud/tree/develop/demo-snowflake

server:
  port: 8091


spring:
  application:
    name: demo-snowflake


  redis:
    host: 192.168.116.128
    port: 6379
<dependency>
                <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

 

主要实现类

package com.tg.demo.snowflake.service;

import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.IdUtil;
import com.tg.demo.snowflake.config.SnowflakeProperties;
import com.tg.demo.snowflake.beatheart.SnowflakeBeatHeart;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

/**
 * @author Rubble
 * Created on 2020/5/23
 */
@Slf4j
@Service
public class SnowflakeRegisterService {

    @Autowired
    private RedisTemplate redisTemplate;


    @Value("${spring.application.name}")
    private String applicationName;


//    private static Snowflake snowflake;

    @Autowired
    private SnowflakeProperties snowflakeProperties;

    private static final int MAX_VALUE = 1024;

    private static final int MAX_WORKER_ID = 31;

    public static final int EXPIRE_TIME = 100;


    @PostConstruct
    public void postInit(){

        registerWorker();

        SnowflakeBeatHeart snowflakeBeatHeart = new SnowflakeBeatHeart(redisTemplate,snowflakeProperties,this);
        snowflakeBeatHeart.startSchedule();
    }


    public void resetWorker(){
        snowflakeProperties.setUuid("");
        IdGeneratorUtil.setSnowflake(null);
    }

    public void registerWorker(){
        resetWorker();
        String uuid = IdUtil.simpleUUID();
        String prefix =  "sw:"+applicationName+":";
        String snowflakeKey = "";
        int val = 0;
        Boolean success = false;
        while(true){
            // 尝试对 0-1023的值进行锁定
            for (int i = 0; i <MAX_VALUE ; i++) {
                snowflakeKey = prefix + i;
                success = redisTemplate.opsForValue().setIfAbsent(snowflakeKey, uuid, EXPIRE_TIME, TimeUnit.SECONDS);
                if(success){
                    val = i;
                    break;
                }
            }

            if(success){
                break;
            }
            try {
                TimeUnit.MICROSECONDS.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        int workId = val;
        int dataCenterId = 0;
        if(val> MAX_WORKER_ID){
            workId = val & 31;
            dataCenterId = val>>5;
        }
        log.info("val: {} workerId: {} dataCenterId: {} uuid: {}", val, workId, dataCenterId,uuid);

        snowflakeProperties.setUuid(uuid);
        snowflakeProperties.setSnowflakeKey(snowflakeKey);
        snowflakeProperties.setWorkerId(workId);
        snowflakeProperties.setDataCenterId(dataCenterId);
        IdGeneratorUtil.setSnowflake( IdUtil.createSnowflake(workId,dataCenterId));

    }

}
package com.tg.demo.snowflake.service;

import cn.hutool.core.lang.Snowflake;

/**
 * @author Rubble
 * Created on 2020/5/26
 */
public class IdGeneratorUtil {

    private static Snowflake snowflake;

    static void setSnowflake(Snowflake snowflake) {
        IdGeneratorUtil.snowflake = snowflake;
    }

    public static long nextId(){
        return snowflake.nextId();
    }

    public static String nextIdStr(){
        return snowflake.nextIdStr();
    }
}
@Data
@ConfigurationProperties(prefix = "snowflake")
public class SnowflakeProperties {

    private String uuid;

    private String snowflakeKey;

    private int workerId = 0;

    private int dataCenterId = 0;

}

心跳 延长过期时间

 

package com.tg.demo.snowflake.beatheart;

import com.tg.demo.snowflake.config.SnowflakeProperties;
import com.tg.demo.snowflake.service.SnowflakeRegisterService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
 * @author Rubble
 * Created on 2020/5/23
 */
@Slf4j
public class SnowflakeBeatHeart {

    public static final int NEXT_TIME = 30;

    public static final int RECONNECT_TIME = 3;

    private RedisTemplate redisTemplate;

    private SnowflakeProperties snowflakeProperties;

    private  int threadCount = 5;

    private ScheduledExecutorService executorService;

    private SnowflakeRegisterService snowflakeRegisterService;

    public SnowflakeBeatHeart(RedisTemplate redisTemplate, SnowflakeProperties snowflakeProperties, SnowflakeRegisterService snowflakeRegisterService){
        this.redisTemplate = redisTemplate;
        this.snowflakeProperties = snowflakeProperties;
        this.snowflakeRegisterService = snowflakeRegisterService;
        executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.tg.demo.snowflake.beat.sender");
                return thread;
            }
        });
    }

    public void startSchedule(){
        executorService.schedule(new BeatTask(), 60, TimeUnit.SECONDS);
    }

    class BeatTask implements Runnable {

        public BeatTask(){
        }

        @Override
        public void run() {
            log.debug("SnowflakeBeatHeart run ");
            String key = snowflakeProperties.getSnowflakeKey();
            String uuid = snowflakeProperties.getUuid();

            /**
             *
             * 服务是指的 同一个服务 不同实例
             * 当前服务连接不到redis, 当重新链接时可能当前workerId已释放或被其他服务实例占用
             * 默认 心跳时间30s  过期时间90s 过期时间>2*心跳时间,断开链接redis中的workerId不会被其他服务实例占用
             * 断开链接: 重置uuid (使redis中的key-uuid 失效), 重置snowflake(当前服务生成id服务不可用), 重新获取workId
             * 缩短重试时间, 当恢复链接 快速生成workerId
             *
             * redis 数据丢失  flushDB 后至少需等待一个心跳时间 才可以注册其他服务实例
             * 若没有等待一个心跳时间有其他服务实例注册,可能多个服务实例公用一个workerId,下一个心跳由于uuid不一样则自动重新注册
             */
            int nextTime = NEXT_TIME;
            try{
                String value = (String) redisTemplate.opsForValue().get(key);
                if(value!=null && value.equals(uuid)){
                    // 确认为当前持有的值 续期
                    redisTemplate.expire(snowflakeProperties.getSnowflakeKey(), SnowflakeRegisterService.EXPIRE_TIME, TimeUnit.SECONDS);
                }else{
                    // 重新生成 workerId
                    snowflakeRegisterService.registerWorker();
                }
            }catch (Exception ex){
                // redis 链接异常 清除当前workerId, ID生成器不可用, 重置uuid, 缩短重试时间 RECONNECT_TIME
                snowflakeRegisterService.resetWorker();
                nextTime = RECONNECT_TIME;
                log.error("SnowflakeBeatHeart error ", ex);
            }finally {
                // 保持心跳
                executorService.schedule(new BeatTask(), nextTime, TimeUnit.SECONDS);
            }



        }
    }
}

 

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐