階段一模塊四(zookeeper)


zookeeper作業

基於Zookeeper實現簡易版配置中心要求實現以下功能

  1. 創建一個Web項目,將數據庫連接信息交給Zookeeper配置中心管理,即:當項目Web項目啟動時,從Zookeeper進行MySQL配置參數的拉取
  2. 要求項目通過數據庫連接池訪問MySQL(連接池可以自由選擇熟悉的)
  3. 當Zookeeper配置信息變化后Web項目自動感知,正確釋放之前連接池,創建新的連接池

思路分析

  1. 定義一個用於發布數據庫連接信息到zookeeper的接口,用來修改數據庫連接信息

  2. 項目啟動時從zookeeper獲取數據庫連接信息,創建數據庫連接池

  3. 項目要時刻監聽zookeeper中數據庫連接信息的變化

  4. 當發布數據庫連接信息到zookeeper中時,如果連接信息有變化,項目會重新從zookeeper中獲取數據庫連接信息,釋放之前的連接池,並創建新的數據庫連接池

實現步驟

  1. 創建一個spring web項目,添加需要的依賴到pom文件。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.lagou</groupId>
    <artifactId>zookeeper-web</artifactId>
    <version>1.0</version>
    <packaging>war</packaging>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.2.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>5.2.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>5.2.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.14</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.2</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
        </dependency>
    </dependencies>

    <!-- JVM 運行環境 -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

resources目錄下新建日志配置文件:log4j.properties

log4j.rootLogger=INFO, Console
# Console
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%-5p - %m%n
  1. 定義一個用於修改配置信息的測試類:PublishTest.java
package com.lagou;

import org.junit.Test;

import java.io.IOException;

/**
 * 測試發布信息
 */
public class PublishTest {

    @Test
    public void publish1() throws InterruptedException, IOException {
        String cfg = "driverClassName=com.mysql.jdbc.Driver\n" +
                "url=jdbc:mysql://hadoop03:3306/test?useSSL=false\n" +
                "username=root\n" +
                "password=12345678\n" +
                "initCount=5\n" +
                "maxCount=10\n" +
                "currentCount=5";
        ZkConfig.publish(cfg);
        System.out.println(ZkConfig.zkClient.readData("/mysql/con_config", true).toString());
    }

    @Test
    public void publish2() throws InterruptedException, IOException {
        String cfg = "driverClassName=com.mysql.jdbc.Driver\n" +
                "url=jdbc:mysql://hadoop03:3306/test?useSSL=false\n" +
                "username=hive\n" +
                "password=12345678\n" +
                "initCount=5\n" +
                "maxCount=10\n" +
                "currentCount=5";
        ZkConfig.publish(cfg);
        System.out.println(ZkConfig.zkClient.readData("/mysql/con_config", true).toString());
    }
}
  1. 定義一個監聽器,用於監聽zk中保存配置信息的節點(mysql/con_config)

Listener.java

package com.lagou;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

/**
 * 對配置文件節點添加監聽器,用於監聽配置文件的變化
 */
public class Listener {

    private static ZkClient zkClient = new ZkClient("hadoop01:2181, hadoop02:2181");
    private static String path = "/mysql/con_config";

    // 監聽方法
    public static void monitor() {
        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            public void handleDataChange(String s, Object o) throws Exception {
                System.out.println("zookeeper中數據庫配置信息發生變化");
                // 嘗試重新獲取配置文件
                String config = zkClient.readData(path, true);
            }

            public void handleDataDeleted(String s) throws Exception {
                ZkConfig.logger.error("zk中的數據庫配置信息已被刪除!");
            }
        });
    }
}
  1. 數據庫連接管理器:ConnManager.java
package com.lagou;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;

/**
 * 數據庫連接管理類
 */
public class ConnManager {

    // 數據庫連接池
    public static List<Connection> pool = new LinkedList<>();
    public static String url = "";
    public static String username = "";
    public static String password = "";
    public static String driver = "";
    // 初始化的數量
    public static int initCount;
    public static int maxCount;
    public static int currentCount;
    private static volatile ConnManager instance = null;

    /**
     * 在構造對象的時候就創建好指定數量的連接
     */
    private ConnManager() {
        init();
    }

    public static void init() {
        addConnection();
    }

    public static void createPool(Properties pro) {
        // 解析配置信息,創建數據庫連接池
        driver = pro.getProperty("driverClassName");
        url = pro.getProperty("url");
        username = pro.getProperty("username");
        password = pro.getProperty("password");
        initCount = Integer.parseInt(pro.getProperty("initCount"));
        maxCount = Integer.parseInt(pro.getProperty("maxCount"));
        currentCount = Integer.parseInt(pro.getProperty("currentCount"));
        init();
    }

    //獲取ConnManager唯一的實例對象
    public static ConnManager getInstance() {
        if (null == instance){
            synchronized (ConnManager.class){
                if (null == instance){
                    return new ConnManager();
                }
            }

        }
        return instance;
    }

    public static void addConnection() {
        for (int i = 0; i < initCount; i++) {
            pool.add(createConnection());
        }
    }

    // 新建連接
    public static Connection createConnection() {
        Connection connection = null;

        try {
            Class.forName(driver);
            connection  = DriverManager.getConnection(url, username, password);

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        }

        return connection;

    }

    // 從連接池中獲取連接
    public static Connection getConnection() throws ClassNotFoundException, SQLException {
        synchronized (pool) {
            if (pool.size() > 0) {
                System.out.println("當前連接的數量是:" + pool.size());
                return pool.get(0);
            }
            else if (currentCount < maxCount) {
                Class.forName(driver);
                Connection conn = createConnection();
                pool.add(conn);
                currentCount++;
                return conn;
                }
            else {
                throw new SQLException("當前連接數是0");
            }
        }
    }

    // 清空連接池
    public static void cleanPool() {
        for (Connection connection : pool) {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        pool = new LinkedList<Connection>();
    }

    // 釋放資源
    public static void release(Connection conn) {
        pool.remove(conn);
    }
}
  1. 啟動時初始化數據庫連接池並接聽:StartUp.java
package com.lagou;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;

/**
 * 啟動時初始化連接池,並監聽
 */

@Component
public class StartUp {

    //@PostConstruct修飾的方法會在服務器加載Servlet的時候運行,並且只會被服務器執行一次。PostConstruct在構造函數之后執行
    @PostConstruct
    public void init() throws SQLException, ClassNotFoundException {
        String cfg = ZkConfig.zkClient.readData(ZkConfig.configPath, true);
        Properties properties = new Properties();
        try {
            properties.load(new StringReader(cfg));
            // 創建數據庫連接池
            ConnManager.createPool(properties);
            // 監聽節點數據變化
            Listener.monitor();
        }catch (IOException e) {
            e.printStackTrace();
        }

        // 測試連接池
        Connection connection = ConnManager.getConnection();
        PreparedStatement preparedStatement = connection.prepareStatement("select * from config");
        ResultSet resultSet = preparedStatement.executeQuery();
        while (resultSet.next()) {
            System.out.println("username = " + resultSet.getString("username") +
                    ", password = " + resultSet.getString("password"));
        }
    }
}

需要在applicationContext.xml中配置自動掃描和注解驅動

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:mvc="http://www.springframework.org/schema/mvc"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context
		http://www.springframework.org/schema/context/spring-context.xsd
		http://www.springframework.org/schema/mvc
		http://www.springframework.org/schema/mvc/spring-mvc.xsd">

    <context:component-scan base-package="com.lagou"></context:component-scan>
    <context:annotation-config></context:annotation-config>
    <mvc:default-servlet-handler></mvc:default-servlet-handler>
</beans>
  1. 將應用部署到Tomcat

  2. 執行PublishTest測試類中的publish1()方法,往zk中/mysql/con_config節點的發布數據庫配置信息

@Test
    public void publish1() throws InterruptedException, IOException {
        String cfg = "driverClassName=com.mysql.jdbc.Driver\n" +
                "url=jdbc:mysql://hadoop03:3306/test?useSSL=false\n" +
                "username=root\n" +
                "password=12345678\n" +
                "initCount=5\n" +
                "maxCount=10\n" +
                "currentCount=5";
        ZkConfig.publish(cfg);
        System.out.println(ZkConfig.zkClient.readData("/mysql/con_config", true).toString());
    }
  1. 啟動應用,查看狀態

可以看到,成功獲取到了數據庫連接並查詢到了數據庫表中的信息。

去zookeeper查看節點內容

  1. 執行PublishTest測試類中的publish2()方法,修改zk中/webapp/dblinkcfg節點的數據:username=hive改為username=root
@Test
public void publish2() throws InterruptedException, IOException {
    String cfg = "driverClassName=com.mysql.jdbc.Driver\n" +
        "url=jdbc:mysql://hadoop03:3306/test?useSSL=false\n" +
        "username=hive\n" +
        "password=12345678\n" +
        "initCount=5\n" +
        "maxCount=10\n" +
        "currentCount=5";
    ZkConfig.publish(cfg);
    System.out.println(ZkConfig.zkClient.readData("/mysql/con_config", true).toString());
}

控制台輸出

  1. 刪除節點/mysql/con_config
rmr /mysql/con_config

控制台輸出


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM