利用Redis实现集群或开发环境下SnowFlake自动配置机器号

admin / 开发 / ... / Reads: 1074

前言:

SnowFlake 雪花ID 算法是推特公司推出的著名分布式ID生成算法。利用预先分配好的机器ID,工作区ID,机器时间可以生成全局唯一的随时间趋势递增的Long类型ID.长度在17-19位。随着时间的增长而递增,在MySQL数据库中,InnoDB存储引擎可以更快的插入递增的主键。而不像UUID那样因为写入是乱序的,InnoDB不得不频繁的做页分裂操作,耗时且容易产生碎片。

对于SnowFlake 的原理介绍,可以参考该文章:理解分布式id生成算法SnowFlake

理解了雪花的基本原理之后,我们试想:在分布式集群或者开发环境下,不同服务之间/相同服务的不同机器之间应该如何产生差异呢?有以下几种方案:

1, 通过在 yml 文件中配置不同的参数,启动 spring 容器时通过读取该参数来实现不同服务与不同机器的workerId不同。但是这里不方便新增机器/新同事的自动化配置

2, 向第三方应用如zookeeper、Redis中注册ID,以获得唯一的ID。

3, 对于开发环境,可以取机器的IP后三位。因为大家在一个办公室的话IP后三位肯定是0-255之前不重复。但是这样机器ID需要8个Bit,留给数据中心的位数就只有4个了。

本方案结合了以上方案的优点,按照业务的实际情况对雪花中的数据中心和机器ID所占的位数进行调整:数据中心占4Bit,范围从0-15。机器ID占6Bit,范围从0-63 。对不同的服务在yml中配置服务名称,以服务编号作为数据中心ID。如果按照开发+测试+生产环境区分的话,可以部署5个不同的服务。application.yml 中配置如下的参数

# 分布式雪花ID不同机器ID自动化配置
snowFlake:
  dataCenter: 1 # 数据中心的id
  appName: test # 业务类型名称

而机器ID采用以下的策略实现:

1, 获取当前机器的IP地址 localIp,模32,获得0-31的整数 machineId

2, 向Redis中注册,使用 appName + dataCenter + machineId 作为key ,以本机IP localIp 作为 value。

3, 注册成功后,设置键过期时间 24 h,并开启一个计时器,在 23h 后更新注册的 key 4, 如果注册失败,可能有以下两个原因:

  • 上次服务异常中断,没有来得及删除key。这里的解决方案是通过key获取value,如果value和localIp一致,则仍然视为注册成功

  • IP和别人的IP模32的结果一样,导致机器ID冲突。这是就遍历 0-31 获取其中为注册的数字作为本机的机器号

5, 如果不幸Redis连接失败,系统将从32-63之间随机获取ID,并使用 log.error() 打印醒目的提示消息这里建议IDEA + Grep Console 实现不同级别的日志不同前景色显示,方便及时获取错误信息

6, 当服务停止前,向Redis发送请求,删除该Key的占用。

具体的代码如下:

自动配置机器ID,并在容器启动时放入SnowFlake实例对象

package cn.keats.util;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

@Configuration
@Slf4j
public class MachineIdConfig {
    @Resource
    private JedisPool jedisPool;

    @Value("${snowFlake.dataCenter}")
    private Integer dataCenterId;

    @Value("${snowFlake.appName}")
    private String APP_NAME;

    /**
     * 机器id
     */
    public static Integer machineId;
    /**
     * 本地ip地址
     */
    private static String localIp;

    /**
     * 获取ip地址
     *
     * @return
     * @throws UnknownHostException
     */
    private String getIPAddress() throws UnknownHostException {
        InetAddress address = InetAddress.getLocalHost();
        return address.getHostAddress();
    }

    /**
     * hash机器IP初始化一个机器ID
     */
    @Bean
    public SnowFlake initMachineId() throws Exception {
        localIp = getIPAddress(); // 192.168.0.233

        Long ip_ = Long.parseLong(localIp.replaceAll("\\.", ""));// 1921680233
        //
        machineId = ip_.hashCode() % 32;// 0-31
        // 创建一个机器ID
        createMachineId();

        log.info("初始化 machine_id :{}", machineId);
        return new SnowFlake(machineId, dataCenterId);
    }

    /**
     * 容器销毁前清除注册记录
     */
    @PreDestroy
    public void destroyMachineId() {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.del(APP_NAME + dataCenterId + machineId);
        }
    }


    /**
     * 主方法:首先获取机器 IP 并 % 32 得到 0-31
     * 使用 业务名 + 组名 + IP 作为 Redis 的 key,机器IP作为 value,存储到Redis中
     *
     * @return
     */
    public Integer createMachineId() {
        try {
            // 向redis注册,并设置超时时间
            log.info("注册一个机器ID到Redis " + machineId + " IP:" + localIp);
            Boolean flag = registerMachine(machineId, localIp);
            // 注册成功
            if (flag) {
                // 启动一个线程更新超时时间
                updateExpTimeThread();
                // 返回机器Id
                log.info("Redis中端口没有冲突 " + machineId + " IP:" + localIp);
                return machineId;
            }
            // 注册失败,可能原因 Hash%32 的结果冲突
            if (!checkIfCanRegister()) {
                // 如果 0-31 已经用完,使用 32-64之间随机的ID
                getRandomMachineId();
                createMachineId();
            } else {
                // 如果存在剩余的ID
                log.warn("Redis中端口冲突了,使用 0-31 之间未占用的Id " + machineId + " IP:" + localIp);
                createMachineId();
            }
        } catch (Exception e) {
            // 获取 32 - 63 之间的随机Id
            // 返回机器Id
            log.error("Redis连接异常,不能正确注册雪花机器号 " + machineId + " IP:" + localIp, e);
            log.warn("使用临时方案,获取 32 - 63 之间的随机数作为机器号,请及时检查Redis连接");
            getRandomMachineId();
            return machineId;
        }
        return machineId;
    }

    /**
     * 检查是否被注册满了
     *
     * @return
     */
    private Boolean checkIfCanRegister() {
        // 判断0~31这个区间段的机器IP是否被占满
        try (Jedis jedis = jedisPool.getResource()) {
            Boolean flag = true;
            for (int i = 0; i < 32; i++) {
                flag = jedis.exists(APP_NAME + dataCenterId + i);
                // 如果不存在。设置机器Id为这个不存在的数字
                if (!flag) {
                    machineId = i;
                    break;
                }
            }
            return !flag;
        }
    }

    /**
     * 1.更新超時時間
     * 注意,更新前检查是否存在机器ip占用情况
     */
    private void updateExpTimeThread() {
        // 开启一个线程执行定时任务:
        // 每23小时更新一次超时时间
        new Timer(localIp).schedule(new TimerTask() {
            @Override
            public void run() {
                // 检查缓存中的ip与本机ip是否一致, 一致则更新时间,不一致则重新获取一个机器id
                Boolean b = checkIsLocalIp(String.valueOf(machineId));
                if (b) {
                    log.info("IP一致,更新超时时间 ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
                    try (Jedis jedis = jedisPool.getResource()) {
                        jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24 );
                    }
                } else {
                    // IP冲突
                    log.info("重新生成机器ID ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
                    // 重新生成机器ID,并且更改雪花中的机器ID
                    getRandomMachineId();
                    // 重新生成并注册机器id
                    createMachineId();
                    // 更改雪花中的机器ID
                    SnowFlake.setWorkerId(machineId);
                    // 结束当前任务
                    log.info("Timer->thread->name:{}", Thread.currentThread().getName());
                    this.cancel();
                }
            }
        }, 10 * 1000, 1000 * 60 * 60 * 23);
    }

    /**
     * 获取32-63随机数
     */
    public void getRandomMachineId() {
        machineId = (int) (Math.random() * 31) + 31;
    }


    /**
     * 检查Redis中对应Key的Value是否是本机IP
     *
     * @param mechineId
     * @return
     */
    private Boolean checkIsLocalIp(String mechineId) {
        try (Jedis jedis = jedisPool.getResource()) {
            String ip = jedis.get(APP_NAME + dataCenterId + mechineId);
            log.info("checkIsLocalIp->ip:{}", ip);
            return localIp.equals(ip);
        }
    }

    /**
     * 1.注册机器
     * 2.设置超时时间
     *
     * @param machineId 取值为0~31
     * @return
     */
    private Boolean registerMachine(Integer machineId, String localIp) throws Exception {
        // try with resources 写法,出异常会释放括号内的资源 Java7特性
        try (Jedis jedis = jedisPool.getResource()) {
            // key 业务号 + 数据中心ID + 机器ID value 机器IP
            Long result = jedis.setnx(APP_NAME + dataCenterId + machineId, localIp);
            if(result == 1){
                // 过期时间 1 天
                jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24);
                return true;
            } else {
                // 如果Key存在,判断Value和当前IP是否一致,一致则返回True
                String value = jedis.get(APP_NAME + dataCenterId + machineId);
                if(localIp.equals(value)){
                    // IP一致,注册机器ID成功
                    jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24);
                    return true;
                }
                return false;
            }
        }
    }
}

雪花ID:

import org.springframework.context.annotation.Configuration;

/**
 * 功能:分布式ID生成工具类
 *
 */
@Configuration
public class SnowFlake {
    /**
     * 开始时间截 (2019-09-08) 服务一旦运行过之后不能修改。会导致ID生成重复
     */
    private final long twepoch = 1567872000000L;

    /**
     * 机器Id所占的位数 0 - 64
     */
    private final long workerIdBits = 6L;

    /**
     * 工作组Id所占的位数 0 - 16
     */
    private final long dataCenterIdBits = 4L;

    /**
     * 支持的最大机器id,结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
     */
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

    /**
     * 支持的最大数据标识id,结果是15
     */
    private final long maxDatacenterId = -1L ^ (-1L << dataCenterIdBits);

    /**
     * 序列在id中占的位数
     */
    private final long sequenceBits = 12L;

    /**
     * 机器ID向左移12位
     */
    private final long workerIdShift = sequenceBits;

    /**
     * 数据标识id向左移17位(12+5)
     */
    private final long datacenterIdShift = sequenceBits + workerIdBits;

    /**
     * 时间截向左移22位(5+5+12)
     */
    private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;

    /**
     * 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)
     */
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    /**
     * 工作机器ID(0~63)
     */
    private static long workerId;

    /**
     * 数据中心ID(0~16)
     */
    private long datacenterId;

    /**
     * 毫秒内序列(0~4095)
     */
    private long sequence = 0L;

    /**
     * 上次生成ID的时间截
     */
    private long lastTimestamp = -1L;

    //==============================Constructors=====================================

    /**
     * 构造函数
     *
     * @param workerId     工作ID (0~63)
     * @param datacenterId 数据中心ID (0~15)
     */
    public SnowFlake(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("机器ID必须小于 %d 且大于 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("工作组ID必须小于 %d 且大于 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }

    /**
     * 构造函数
     *
     */
    public SnowFlake() {
        this.workerId = 0;
        this.datacenterId = 0;
    }

    /**
     * 获得下一个ID (该方法是线程安全的)
     *
     * @return SnowFlakeId
     */
    public synchronized long nextId() {
        long timestamp = timeGen();

        //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(
                    String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }

        // 如果是同一时间生成的,则进行毫秒内序列
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            // 毫秒内序列溢出
            if (sequence == 0) {
                // 阻塞到下一个毫秒,获得新的时间戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        }
        //时间戳改变,毫秒内序列重置
        else {
            sequence = 0L;
        }

        // 上次生成ID的时间截
        lastTimestamp = timestamp;

        // 移位并通过或运算拼到一起组成64位的ID
        return ((timestamp - twepoch) << timestampLeftShift) //
                | (datacenterId << datacenterIdShift) //
                | (workerId << workerIdShift) //
                | sequence;
    }

    /**
     * 阻塞到下一个毫秒,直到获得新的时间戳
     *
     * @param lastTimestamp 上次生成ID的时间截
     * @return 当前时间戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    /**
     * 返回以毫秒为单位的当前时间
     *
     * @return 当前时间(毫秒)
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }

    public long getWorkerId() {
        return workerId;
    }

    public static void setWorkerId(long workerId) {
        SnowFlake.workerId = workerId;
    }

    public long getDatacenterId() {
        return datacenterId;
    }

    public void setDatacenterId(long datacenterId) {
        this.datacenterId = datacenterId;
    }
}

Redis 配置

public class RedisConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port:6379}")
    private Integer port;

    @Value("${spring.redis.password:-1}")
    private String password;

    @Bean
    public JedisPool jedisPool() {
        // 1.设置连接池的配置对象
        JedisPoolConfig config = new JedisPoolConfig();
        // 设置池中最大连接数
        config.setMaxTotal(50);
        // 设置空闲时池中保有的最大连接数
        config.setMaxIdle(10);
        config.setMaxWaitMillis(3000L);
        config.setTestOnBorrow(true);
        log.info(password);
        // 2.设置连接池对象
        if("-1".equals(password)){
            log.info("Redis不通过密码连接");
            return new JedisPool(config, host, port,0);
        } else {
            log.info("Redis通过密码连接" + password);
            return new JedisPool(config, host, port,0, password);
        }
    }
}

使用方法

1, 项目中引入 Redis 、 Jedis 依赖

2, 复制上面两个类到项目until包下

3, application.yml 配置服务名称,机器序号,Redis账号,密码

4, 配置Jedis,使得项目启动时池中有Redis连接对象

5, 启动项目

6, 在需要生成ID的类中注入

@Autowired
private SnowFlake snowFlake;
// 生产ID
snowFlake.nextId(); 方法生产ID

关于作者

王硕,十年软件开发经验,业余产品经理,精通Java/Python/Go等,喜欢研究技术,著有《PyQt 5 快速开发与实战》《Python 3.* 全栈开发》,多个业余开源项目托管在GitHub上,欢迎微博交流:

Comments

Make a comment

Author: admin

Publish at: ...

关于作者

王硕,网名信平,十多年软件开发经验,架构师,熟悉 Java/Python/Go 等,喜欢研究技术,读书,音乐和宅在家里。
专注于研究互联网产品和技术,提供中文精品教程。 本网站与其它任何公司及/或商标无任何形式关联或合作。
Email: xujieiata@163.com

www.ultrapower.com ,王硕的博客,专注于研究互联网产品和技术,提供中文精品教程。 本网站与其它任何公司及/或商标无任何形式关联或合作。