利用Redis實現集群或開發環境下SnowFlake自動配置機器號

前言:

SnowFlake 雪花ID 算法是推特公司推出的著名分布式ID生成算法。利用預先分配好的機器ID,工作區ID,機器時間可以生成全局唯一的隨時間趨勢遞增的Long類型ID.長度在17-19位。隨著時間的增長而遞增,在MySQL數據庫中,InnoDB存儲引擎可以更快的插入遞增的主鍵。而不像UUID那樣因為寫入是亂序的,InnoDB不得不頻繁的做頁分裂操作,耗時且容易產生碎片。

對于SnowFlake 的原理介紹,可以參考該文章:理解分布式id生成算法SnowFlake

理解了雪花的基本原理之后,我們試想:在分布式集群或者開發環境下,不同服務之間/相同服務的不同機器之間應該如何產生差異呢?有以下幾種方案:

  1. 通過在 yml 文件中配置不同的參數,啟動 spring 容器時通過讀取該參數來實現不同服務與不同機器的workerId不同。但是這里不方便新增機器/新同事的自動化配置
  2. 向第三方應用如zookeeper、Redis中注冊ID,以獲得唯一的ID。
  3. 對于開發環境,可以取機器的IP后三位。因為大家在一個辦公室的話IP后三位肯定是0-255之前不重復。但是這樣機器ID需要8個Bit,留給數據中心的位數就只有4個了。

本方案結合了以上方案的優點,按照業務的實際情況對雪花中的數據中心和機器ID所占的位數進行調整:數據中心占4Bit,范圍從0-15?;鱅D占6Bit,范圍從0-63
。對不同的服務在yml中配置服務名稱,以服務編號作為數據中心ID。如果按照開發+測試+生產環境區分的話,可以部署5個不同的服務。application.yml 中配置如下的參數

# 分布式雪花ID不同機器ID自動化配置
snowFlake:
  dataCenter: 1 # 數據中心的id
  appName: test # 業務類型名稱

而機器ID采用以下的策略實現:

  1. 獲取當前機器的IP地址 localIp,模32,獲得0-31的整數 machineId
  2. 向Redis中注冊,使用 appName + dataCenter + machineId 作為key ,以本機IP localIp 作為 value。
  3. 注冊成功后,設置鍵過期時間 24 h,并開啟一個計時器,在 23h 后更新注冊的 key
  4. 如果注冊失敗,可能有以下兩個原因:
    1. 上次服務異常中斷,沒有來得及刪除key。這里的解決方案是通過key獲取value,如果value和localIp一致,則仍然視為注冊成功
    2. IP和別人的IP模32的結果一樣,導致機器ID沖突。這是就遍歷 0-31 獲取其中為注冊的數字作為本機的機器號
  5. 如果不幸Redis連接失敗,系統將從32-63之間隨機獲取ID,并使用 log.error() 打印醒目的提示消息這里建議IDEA + Grep Console 實現不同級別的日志不同前景色顯示,方便及時獲取錯誤信息
  6. 當服務停止前,向Redis發送請求,刪除該Key的占用。

具體的代碼如下:

自動配置機器ID,并在容器啟動時放入SnowFlake實例對象

package cn.keats.util;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

@Configuration
@Slf4j
public class MachineIdConfig {
    @Resource
    private JedisPool jedisPool;

    @Value("${snowFlake.dataCenter}")
    private Integer dataCenterId;

    @Value("${snowFlake.appName}")
    private String APP_NAME;

    /**
     * 機器id
     */
    public static Integer machineId;
    /**
     * 本地ip地址
     */
    private static String localIp;

    /**
     * 獲取ip地址
     *
     * @return
     * @throws UnknownHostException
     */
    private String getIPAddress() throws UnknownHostException {
        InetAddress address = InetAddress.getLocalHost();
        return address.getHostAddress();
    }

    /**
     * hash機器IP初始化一個機器ID
     */
    @Bean
    public SnowFlake initMachineId() throws Exception {
        localIp = getIPAddress(); // 192.168.0.233

        Long ip_ = Long.parseLong(localIp.replaceAll("\\.", ""));// 1921680233
        //
        machineId = ip_.hashCode() % 32;// 0-31
        // 創建一個機器ID
        createMachineId();

        log.info("初始化 machine_id :{}", machineId);
        return new SnowFlake(machineId, dataCenterId);
    }

    /**
     * 容器銷毀前清除注冊記錄
     */
    @PreDestroy
    public void destroyMachineId() {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.del(APP_NAME + dataCenterId + machineId);
        }
    }


    /**
     * 主方法:首先獲取機器 IP 并 % 32 得到 0-31
     * 使用 業務名 + 組名 + IP 作為 Redis 的 key,機器IP作為 value,存儲到Redis中
     *
     * @return
     */
    public Integer createMachineId() {
        try {
            // 向redis注冊,并設置超時時間
            log.info("注冊一個機器ID到Redis " + machineId + " IP:" + localIp);
            Boolean flag = registerMachine(machineId, localIp);
            // 注冊成功
            if (flag) {
                // 啟動一個線程更新超時時間
                updateExpTimeThread();
                // 返回機器Id
                log.info("Redis中端口沒有沖突 " + machineId + " IP:" + localIp);
                return machineId;
            }
            // 注冊失敗,可能原因 Hash%32 的結果沖突
            if (!checkIfCanRegister()) {
                // 如果 0-31 已經用完,使用 32-64之間隨機的ID
                getRandomMachineId();
                createMachineId();
            } else {
                // 如果存在剩余的ID
                log.warn("Redis中端口沖突了,使用 0-31 之間未占用的Id " + machineId + " IP:" + localIp);
                createMachineId();
            }
        } catch (Exception e) {
            // 獲取 32 - 63 之間的隨機Id
            // 返回機器Id
            log.error("Redis連接異常,不能正確注冊雪花機器號 " + machineId + " IP:" + localIp, e);
            log.warn("使用臨時方案,獲取 32 - 63 之間的隨機數作為機器號,請及時檢查Redis連接");
            getRandomMachineId();
            return machineId;
        }
        return machineId;
    }

    /**
     * 檢查是否被注冊滿了
     *
     * @return
     */
    private Boolean checkIfCanRegister() {
        // 判斷0~31這個區間段的機器IP是否被占滿
        try (Jedis jedis = jedisPool.getResource()) {
            Boolean flag = true;
            for (int i = 0; i < 32; i++) {
                flag = jedis.exists(APP_NAME + dataCenterId + i);
                // 如果不存在。設置機器Id為這個不存在的數字
                if (!flag) {
                    machineId = i;
                    break;
                }
            }
            return !flag;
        }
    }

    /**
     * 1.更新超時時間
     * 注意,更新前檢查是否存在機器ip占用情況
     */
    private void updateExpTimeThread() {
        // 開啟一個線程執行定時任務:
        // 每23小時更新一次超時時間
        new Timer(localIp).schedule(new TimerTask() {
            @Override
            public void run() {
                // 檢查緩存中的ip與本機ip是否一致, 一致則更新時間,不一致則重新獲取一個機器id
                Boolean b = checkIsLocalIp(String.valueOf(machineId));
                if (b) {
                    log.info("IP一致,更新超時時間 ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
                    try (Jedis jedis = jedisPool.getResource()) {
                        jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24 );
                    }
                } else {
                    // IP沖突
                    log.info("重新生成機器ID ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
                    // 重新生成機器ID,并且更改雪花中的機器ID
                    getRandomMachineId();
                    // 重新生成并注冊機器id
                    createMachineId();
                    // 更改雪花中的機器ID
                    SnowFlake.setWorkerId(machineId);
                    // 結束當前任務
                    log.info("Timer->thread->name:{}", Thread.currentThread().getName());
                    this.cancel();
                }
            }
        }, 10 * 1000, 1000 * 60 * 60 * 23);
    }

    /**
     * 獲取32-63隨機數
     */
    public void getRandomMachineId() {
        machineId = (int) (Math.random() * 31) + 31;
    }


    /**
     * 檢查Redis中對應Key的Value是否是本機IP
     *
     * @param mechineId
     * @return
     */
    private Boolean checkIsLocalIp(String mechineId) {
        try (Jedis jedis = jedisPool.getResource()) {
            String ip = jedis.get(APP_NAME + dataCenterId + mechineId);
            log.info("checkIsLocalIp->ip:{}", ip);
            return localIp.equals(ip);
        }
    }

    /**
     * 1.注冊機器
     * 2.設置超時時間
     *
     * @param machineId 取值為0~31
     * @return
     */
    private Boolean registerMachine(Integer machineId, String localIp) throws Exception {
        // try with resources 寫法,出異?;崾頭爬ê拍詰淖試?Java7特性
        try (Jedis jedis = jedisPool.getResource()) {
            // key 業務號 + 數據中心ID + 機器ID value 機器IP
            Long result = jedis.setnx(APP_NAME + dataCenterId + machineId, localIp);
            if(result == 1){
                // 過期時間 1 天
                jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24);
                return true;
            } else {
                // 如果Key存在,判斷Value和當前IP是否一致,一致則返回True
                String value = jedis.get(APP_NAME + dataCenterId + machineId);
                if(localIp.equals(value)){
                    // IP一致,注冊機器ID成功
                    jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24);
                    return true;
                }
                return false;
            }
        }
    }
}

雪花ID:

import org.springframework.context.annotation.Configuration;

/**
 * 功能:分布式ID生成工具類
 *
 */
@Configuration
public class SnowFlake {
    /**
     * 開始時間截 (2019-09-08) 服務一旦運行過之后不能修改?;岬賈翴D生成重復
     */
    private final long twepoch = 1567872000000L;

    /**
     * 機器Id所占的位數 0 - 64
     */
    private final long workerIdBits = 6L;

    /**
     * 工作組Id所占的位數 0 - 16
     */
    private final long dataCenterIdBits = 4L;

    /**
     * 支持的最大機器id,結果是63 (這個移位算法可以很快的計算出幾位二進制數所能表示的最大十進制數)
     */
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

    /**
     * 支持的最大數據標識id,結果是15
     */
    private final long maxDatacenterId = -1L ^ (-1L << dataCenterIdBits);

    /**
     * 序列在id中占的位數
     */
    private final long sequenceBits = 12L;

    /**
     * 機器ID向左移12位
     */
    private final long workerIdShift = sequenceBits;

    /**
     * 數據標識id向左移17位(12+5)
     */
    private final long datacenterIdShift = sequenceBits + workerIdBits;

    /**
     * 時間截向左移22位(5+5+12)
     */
    private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;

    /**
     * 生成序列的掩碼,這里為4095 (0b111111111111=0xfff=4095)
     */
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    /**
     * 工作機器ID(0~63)
     */
    private static long workerId;

    /**
     * 數據中心ID(0~16)
     */
    private long datacenterId;

    /**
     * 毫秒內序列(0~4095)
     */
    private long sequence = 0L;

    /**
     * 上次生成ID的時間截
     */
    private long lastTimestamp = -1L;

    //==============================Constructors=====================================

    /**
     * 構造函數
     *
     * @param workerId     工作ID (0~63)
     * @param datacenterId 數據中心ID (0~15)
     */
    public SnowFlake(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("機器ID必須小于 %d 且大于 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("工作組ID必須小于 %d 且大于 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }

    /**
     * 構造函數
     *
     */
    public SnowFlake() {
        this.workerId = 0;
        this.datacenterId = 0;
    }

    /**
     * 獲得下一個ID (該方法是線程安全的)
     *
     * @return SnowFlakeId
     */
    public synchronized long nextId() {
        long timestamp = timeGen();

        //如果當前時間小于上一次ID生成的時間戳,說明系統時鐘回退過這個時候應當拋出異常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(
                    String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }

        // 如果是同一時間生成的,則進行毫秒內序列
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            // 毫秒內序列溢出
            if (sequence == 0) {
                // 阻塞到下一個毫秒,獲得新的時間戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        }
        //時間戳改變,毫秒內序列重置
        else {
            sequence = 0L;
        }

        // 上次生成ID的時間截
        lastTimestamp = timestamp;

        // 移位并通過或運算拼到一起組成64位的ID
        return ((timestamp - twepoch) << timestampLeftShift) //
                | (datacenterId << datacenterIdShift) //
                | (workerId << workerIdShift) //
                | sequence;
    }

    /**
     * 阻塞到下一個毫秒,直到獲得新的時間戳
     *
     * @param lastTimestamp 上次生成ID的時間截
     * @return 當前時間戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    /**
     * 返回以毫秒為單位的當前時間
     *
     * @return 當前時間(毫秒)
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }

    public long getWorkerId() {
        return workerId;
    }

    public static void setWorkerId(long workerId) {
        SnowFlake.workerId = workerId;
    }

    public long getDatacenterId() {
        return datacenterId;
    }

    public void setDatacenterId(long datacenterId) {
        this.datacenterId = datacenterId;
    }
}

Redis 配置

public class RedisConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port:6379}")
    private Integer port;

    @Value("${spring.redis.password:-1}")
    private String password;

    @Bean
    public JedisPool jedisPool() {
        // 1.設置連接池的配置對象
        JedisPoolConfig config = new JedisPoolConfig();
        // 設置池中最大連接數
        config.setMaxTotal(50);
        // 設置空閑時池中保有的最大連接數
        config.setMaxIdle(10);
        config.setMaxWaitMillis(3000L);
        config.setTestOnBorrow(true);
        log.info(password);
        // 2.設置連接池對象
        if("-1".equals(password)){
            log.info("Redis不通過密碼連接");
            return new JedisPool(config, host, port,0);
        } else {
            log.info("Redis通過密碼連接" + password);
            return new JedisPool(config, host, port,0, password);
        }
    }
}

使用方法

  1. 項目中引入 Redis 、 Jedis 依賴
  2. 復制上面兩個類到項目until包下
  3. application.yml 配置服務名稱,機器序號,Redis賬號,密碼
  4. 配置Jedis,使得項目啟動時池中有Redis連接對象
  5. 啟動項目
  6. 在需要生成ID的類中注入
    @Autowired
    private SnowFlake snowFlake;
    // 生產ID
    snowFlake.nextId(); 方法生產ID
posted @ 2020-01-01 17:07  后青春期的Keats  閱讀(...)  評論(...編輯  收藏