zookeeper生成分布式自增ID


1. 環境

zookeeper: 3.6.0 windows
springboot 2.2.6
jdk 11

2. 依賴引入

<properties>
    <curator.version>4.2.0</curator.version>
</properties>
<!-- curator ZK 客戶端 -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>${curator.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>${curator.version}</version>
</dependency>

完整的pom.xml文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>zookeeper</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>zookeeper</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <curator.version>4.2.0</curator.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- curator ZK 客戶端 -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>${curator.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>${curator.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2. 配置文件

配置文件為\src\main\resources\zookeeper.properties,存儲內容如下:

# zk host地址
zk.host=127.0.0.1:2181
# zk自增存儲node
zk.sequence-path=/news/sequence/

3. 枚舉封裝

創建com.example.zookeeper.sequence.ZkSequenceEnum文件,用於定義通過Zk生成自增ID的枚舉,在項目中規范要求與物理表名項目,使用與當前項目階段的枚舉如下:

public enum ZkSequenceEnum {
    AP_LIKES, AP_READ_BEHAVIOR, AP_COLLECTION, AP_USER_FOLLOW, AP_USER_FAN
}

4. 序列封裝

創建com.example.zookeeper.sequence.ZkSequence文件,用於封裝程序在運行時每個表對應的自增器,這里主要通過分布式原子自增類(DistributedAtomicLong)實現,注意每500毫秒重試3次后仍然生成失敗則返回null,由上層處理,相關實現代碼如下:

public class ZkSequence {

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(500, 3);

    DistributedAtomicLong distAtomicLong;

    public ZkSequence(String sequenceName, CuratorFramework client) {
        distAtomicLong = new DistributedAtomicLong(client, sequenceName, retryPolicy);
    }

    /**
     * 生成序列
     *
     * @return
     * @throws Exception
     */
    public Long sequence() throws Exception {
        AtomicValue<Long> sequence = this.distAtomicLong.increment();
        if (sequence.succeeded()) {
            return sequence.postValue();
        } else {
            return null;
        }
    }

}

5. Client封裝

創建com.example.zookeeper.client.ZookeeperClient類,通過PostConstruct注解在內構器之后調用init方法初始化客戶端連接,並調用initZkSequence方法初始項目所定義的ZkSequence,並存儲在zkSequenceMap集合中,最終提供sequence方法來查詢對應zkSequence獲取自增ID,相關實現代碼如下:

@Data
public class ZookeeperClient {
    private static Logger logger = LoggerFactory.getLogger(ZookeeperClient.class);
    private String host;
    private String sequencePath;

    // 重試休眠時間
    private final int SLEEP_TIME_MS = 1000;
    // 最大重試1000次
    private final int MAX_RETRIES = 1000;
    //會話超時時間
    private final int SESSION_TIMEOUT = 30 * 1000;
    //連接超時時間
    private final int CONNECTION_TIMEOUT = 3 * 1000;

    //創建連接實例
    private CuratorFramework client = null;
    // 序列化集合
    private Map<String, ZkSequence> zkSequence = Maps.newConcurrentMap();

    public ZookeeperClient(String host, String sequencePath) {
        this.host = host;
        this.sequencePath = sequencePath;
    }

    @PostConstruct
    public void init() throws Exception {
        this.client = CuratorFrameworkFactory.builder()
                .connectString(this.getHost())
                .connectionTimeoutMs(CONNECTION_TIMEOUT)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES)).build();
        this.client.start();
        this.initZkSequence();
    }

    public void initZkSequence() {
        ZkSequenceEnum[] list = ZkSequenceEnum.values();
        for (int i = 0; i < list.length; i++) {
            String name = list[i].name();
            String path = this.sequencePath + name;
            ZkSequence seq = new ZkSequence(path, this.client);
            zkSequence.put(name, seq);
        }
    }

    /**
     * 生成SEQ
     *
     * @param name
     * @return
     * @throws Exception
     */
    public Long sequence(ZkSequenceEnum name) {
        try {
            ZkSequence seq = zkSequence.get(name.name());
            if (seq != null) {
                return seq.sequence();
            }
        } catch (Exception e) {
            logger.error("獲取[{}]Sequence錯誤:{}", name, e);
        }
        return null;
    }
}

注:在這里ZookeeperClient是一個BeanFactoryZkSequence是一個FactoryBean

6. Config封裝

創建com.example.zookeeper.config.ZkConfig類,用於自動化配置環境文件的導入,和zkClient定義Bean定義,其相關的實現代碼如下:

/**
 * 自動化配置核心數據庫的連接配置
 */
@Data
@Configuration
@ConfigurationProperties(prefix="zk")
@PropertySource("classpath:zookeeper.properties")
public class ZkConfig {

    String host;
    String sequencePath;

    /**
     * 這是最快的數據庫連接池
     * @return
     */
    @Bean
    public ZookeeperClient zookeeperClient(){
        return new ZookeeperClient(this.host,this.sequencePath);
    }

}

7. Sequences封裝

為便於程序中調用,以及對自增生成失敗的統一處理,項目中規范通過com.example.zookeeper.sequence.Sequences類統一暴露生成自增主鍵的功能,相關代碼如下:

@Component
public class Sequences {

    @Autowired
    private ZookeeperClient client;

    public Long sequenceApLikes() {
        return this.client.sequence(ZkSequenceEnum.AP_LIKES);
    }

    public Long sequenceApReadBehavior() {
        return this.client.sequence(ZkSequenceEnum.AP_READ_BEHAVIOR);
    }

    public Long sequenceApCollection() {
        return this.client.sequence(ZkSequenceEnum.AP_COLLECTION);
    }

    public Long sequenceApUserFollow() {
        return this.client.sequence(ZkSequenceEnum.AP_USER_FOLLOW);
    }

    public Long sequenceApUserFan() {
        return this.client.sequence(ZkSequenceEnum.AP_USER_FAN);
    }

}

8. 測試

@SpringBootTest
class ZookeeperApplicationTests {

    // 第一步,注入Sequences
    @Autowired
    private Sequences sequences;

    @Test
    void contextLoads() {
        for (int i = 0; i < 10; i++) {
            System.out.println("sequenceApCollection生成的自增id為:" + sequences.sequenceApCollection());
        }
    }

}

9. 擴展

如后期需要新增ZkSequence自增表,可參考以下操作步驟,快速實現:

  • 在`ZkSequenceEnum中定義對應的枚舉項,規范要求枚舉項與物理表名一致且大寫
  • Sequences中定義對應的調用方法,規范要求方法由sequence前綴+駝峰表名組成

10. 代碼

微雲下載


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM