zookeeper作業
基於Zookeeper實現簡易版配置中心要求實現以下功能
- 創建一個Web項目,將數據庫連接信息交給Zookeeper配置中心管理,即:當項目Web項目啟動時,從Zookeeper進行MySQL配置參數的拉取
- 要求項目通過數據庫連接池訪問MySQL(連接池可以自由選擇熟悉的)
- 當Zookeeper配置信息變化后Web項目自動感知,正確釋放之前連接池,創建新的連接池
思路分析
-
定義一個用於發布數據庫連接信息到zookeeper的接口,用來修改數據庫連接信息
-
項目啟動時從zookeeper獲取數據庫連接信息,創建數據庫連接池
-
項目要時刻監聽zookeeper中數據庫連接信息的變化
-
當發布數據庫連接信息到zookeeper中時,如果連接信息有變化,項目會重新從zookeeper中獲取數據庫連接信息,釋放之前的連接池,並創建新的數據庫連接池
實現步驟
- 創建一個spring web項目,添加需要的依賴到pom文件。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lagou</groupId>
<artifactId>zookeeper-web</artifactId>
<version>1.0</version>
<packaging>war</packaging>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>5.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>5.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
</dependencies>
<!-- JVM 運行環境 -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
resources目錄下新建日志配置文件:log4j.properties
log4j.rootLogger=INFO, Console
# Console
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%-5p - %m%n
- 定義一個用於修改配置信息的測試類:PublishTest.java
package com.lagou;
import org.junit.Test;
import java.io.IOException;
/**
* 測試發布信息
*/
public class PublishTest {
@Test
public void publish1() throws InterruptedException, IOException {
String cfg = "driverClassName=com.mysql.jdbc.Driver\n" +
"url=jdbc:mysql://hadoop03:3306/test?useSSL=false\n" +
"username=root\n" +
"password=12345678\n" +
"initCount=5\n" +
"maxCount=10\n" +
"currentCount=5";
ZkConfig.publish(cfg);
System.out.println(ZkConfig.zkClient.readData("/mysql/con_config", true).toString());
}
@Test
public void publish2() throws InterruptedException, IOException {
String cfg = "driverClassName=com.mysql.jdbc.Driver\n" +
"url=jdbc:mysql://hadoop03:3306/test?useSSL=false\n" +
"username=hive\n" +
"password=12345678\n" +
"initCount=5\n" +
"maxCount=10\n" +
"currentCount=5";
ZkConfig.publish(cfg);
System.out.println(ZkConfig.zkClient.readData("/mysql/con_config", true).toString());
}
}
- 定義一個監聽器,用於監聽zk中保存配置信息的節點(mysql/con_config)
Listener.java
package com.lagou;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
/**
* 對配置文件節點添加監聽器,用於監聽配置文件的變化
*/
public class Listener {
private static ZkClient zkClient = new ZkClient("hadoop01:2181, hadoop02:2181");
private static String path = "/mysql/con_config";
// 監聽方法
public static void monitor() {
zkClient.subscribeDataChanges(path, new IZkDataListener() {
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("zookeeper中數據庫配置信息發生變化");
// 嘗試重新獲取配置文件
String config = zkClient.readData(path, true);
}
public void handleDataDeleted(String s) throws Exception {
ZkConfig.logger.error("zk中的數據庫配置信息已被刪除!");
}
});
}
}
- 數據庫連接管理器:ConnManager.java
package com.lagou;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
/**
* 數據庫連接管理類
*/
public class ConnManager {
// 數據庫連接池
public static List<Connection> pool = new LinkedList<>();
public static String url = "";
public static String username = "";
public static String password = "";
public static String driver = "";
// 初始化的數量
public static int initCount;
public static int maxCount;
public static int currentCount;
private static volatile ConnManager instance = null;
/**
* 在構造對象的時候就創建好指定數量的連接
*/
private ConnManager() {
init();
}
public static void init() {
addConnection();
}
public static void createPool(Properties pro) {
// 解析配置信息,創建數據庫連接池
driver = pro.getProperty("driverClassName");
url = pro.getProperty("url");
username = pro.getProperty("username");
password = pro.getProperty("password");
initCount = Integer.parseInt(pro.getProperty("initCount"));
maxCount = Integer.parseInt(pro.getProperty("maxCount"));
currentCount = Integer.parseInt(pro.getProperty("currentCount"));
init();
}
//獲取ConnManager唯一的實例對象
public static ConnManager getInstance() {
if (null == instance){
synchronized (ConnManager.class){
if (null == instance){
return new ConnManager();
}
}
}
return instance;
}
public static void addConnection() {
for (int i = 0; i < initCount; i++) {
pool.add(createConnection());
}
}
// 新建連接
public static Connection createConnection() {
Connection connection = null;
try {
Class.forName(driver);
connection = DriverManager.getConnection(url, username, password);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return connection;
}
// 從連接池中獲取連接
public static Connection getConnection() throws ClassNotFoundException, SQLException {
synchronized (pool) {
if (pool.size() > 0) {
System.out.println("當前連接的數量是:" + pool.size());
return pool.get(0);
}
else if (currentCount < maxCount) {
Class.forName(driver);
Connection conn = createConnection();
pool.add(conn);
currentCount++;
return conn;
}
else {
throw new SQLException("當前連接數是0");
}
}
}
// 清空連接池
public static void cleanPool() {
for (Connection connection : pool) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
pool = new LinkedList<Connection>();
}
// 釋放資源
public static void release(Connection conn) {
pool.remove(conn);
}
}
- 啟動時初始化數據庫連接池並接聽:StartUp.java
package com.lagou;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
/**
* 啟動時初始化連接池,並監聽
*/
@Component
public class StartUp {
//@PostConstruct修飾的方法會在服務器加載Servlet的時候運行,並且只會被服務器執行一次。PostConstruct在構造函數之后執行
@PostConstruct
public void init() throws SQLException, ClassNotFoundException {
String cfg = ZkConfig.zkClient.readData(ZkConfig.configPath, true);
Properties properties = new Properties();
try {
properties.load(new StringReader(cfg));
// 創建數據庫連接池
ConnManager.createPool(properties);
// 監聽節點數據變化
Listener.monitor();
}catch (IOException e) {
e.printStackTrace();
}
// 測試連接池
Connection connection = ConnManager.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement("select * from config");
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
System.out.println("username = " + resultSet.getString("username") +
", password = " + resultSet.getString("password"));
}
}
}
需要在applicationContext.xml中配置自動掃描和注解驅動
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd">
<context:component-scan base-package="com.lagou"></context:component-scan>
<context:annotation-config></context:annotation-config>
<mvc:default-servlet-handler></mvc:default-servlet-handler>
</beans>
-
將應用部署到Tomcat
-
執行PublishTest測試類中的publish1()方法,往zk中/mysql/con_config節點的發布數據庫配置信息
@Test
public void publish1() throws InterruptedException, IOException {
String cfg = "driverClassName=com.mysql.jdbc.Driver\n" +
"url=jdbc:mysql://hadoop03:3306/test?useSSL=false\n" +
"username=root\n" +
"password=12345678\n" +
"initCount=5\n" +
"maxCount=10\n" +
"currentCount=5";
ZkConfig.publish(cfg);
System.out.println(ZkConfig.zkClient.readData("/mysql/con_config", true).toString());
}
- 啟動應用,查看狀態

可以看到,成功獲取到了數據庫連接並查詢到了數據庫表中的信息。
去zookeeper查看節點內容

- 執行PublishTest測試類中的publish2()方法,修改zk中/webapp/dblinkcfg節點的數據:username=hive改為username=root
@Test
public void publish2() throws InterruptedException, IOException {
String cfg = "driverClassName=com.mysql.jdbc.Driver\n" +
"url=jdbc:mysql://hadoop03:3306/test?useSSL=false\n" +
"username=hive\n" +
"password=12345678\n" +
"initCount=5\n" +
"maxCount=10\n" +
"currentCount=5";
ZkConfig.publish(cfg);
System.out.println(ZkConfig.zkClient.readData("/mysql/con_config", true).toString());
}
控制台輸出

- 刪除節點/mysql/con_config
rmr /mysql/con_config
控制台輸出

