一、什么是zookeeper,有什么用
ZooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,是Google的Chubby一個開源的實現,它是集群的管理者,監視着集群中各個節點的狀態根據節點提交的反饋進行下一步合理操作。最終,將簡單易用的接口和性能高效、功能穩定的系統提供給用戶(來自百度百科)。
其主要的功能有
1.命名服務 2.配置管理 3.集群管理 4.分布式鎖 5.隊列管理
二、zookeeper的單機部署
1.下載並解壓 zookeeper-3.4.10.tar.gz
2.將conf目錄下zoo_sample.cfg配置文件修改為zoo.conf。
3.修改zoo.conf。
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes.
#數據存放的位置(自主配置) dataDir=/tmp/zookeeper/data
#日志存放的位置(新加的配置,默認在zookeeper的bin目錄下的zookeeper.out)
dataLogDir=/tmp/zookeeper/logs # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 ~
三、zookeeper集群搭建(偽集群)
1.在服務器上解壓三份zookeeper,
2.分別將conf目錄下zoo_sample.cfg配置文件修改為zoo.conf,並修改zoo.conf配置文件
由於是在一台服務器上做測試,所以為了避免端口沖突,修改了端口
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/tmp/zookeeper0/data # the port at which the clients will connect clientPort=2180 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 server.0=localhost:2287:3387 server.1=localhost:2288:3388 server.2=localhost:2289:3389
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/tmp/zookeeper1/data # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 server.0=localhost:2287:3387 server.1=localhost:2288:3388 server.2=localhost:2289:3389
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/tmp/zookeeper2/data # the port at which the clients will connect clientPort=2182 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 server.0=localhost:2287:3387 server.1=localhost:2288:3388 server.2=localhost:2289:3389
4.分別啟動幾個zookeeper,這樣集群就搭建完成。
四、命令行操作zookeeper
1.啟動zookeeper : ./zkServer.sh start
2.關閉zookeeper: ./zkServer.sh stop
3.客戶端連接 ./zkCli.sh -server localhost:2181
4.查詢當前zookeeper的狀態 ./zkServer.sh status
5.客戶端連接上zookeeper后,可以使用help命令來查詢zookeeper的命令
6.關閉與服務端的連接 : close
7.連接服務端:connect 127.0.0.1:2181
8.創建節點 create /name value
9.獲取節點的信息 get /name
10.列出節點 ls /name
11.刪除節點信息 delete /name
12.列出節點 ls2 /name 是ls的加強版
13.列出歷史執行命令 history
14.重新執行某個命令和history結合使用 redo 20
15.sync 強制同步
16.stat 查看節點信息
17.顯示配額 listquota /name
18.設置配額 setquota /name
19.刪除配額 delquota /name
20.addauth命令用於節點認證,使用方式:如addauth digest username:password
21.setAcl命令用於設置節點Acl
Acl由三部分構成:1為scheme,2為user,3為permission,一般情況下表示為scheme:id:permissions
22. 獲取節點的Acl,如getAcl /node1
12.退出客戶端 quit
五、zookeeper的使用
package testzookeeper; import java.lang.management.ManagementFactory; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; /** * 模擬集群服務器連接 */ public class ClusterClient implements Watcher, Runnable { private static String membershipRoot = "/Members"; final ZooKeeper zk; public ClusterClient(String hostPort, String processId) throws Exception { zk = new ZooKeeper(hostPort, 2000, this); if (zk != null) { zk.create(membershipRoot + '/' + processId, processId.getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL); } } public synchronized void close() { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { } public void run() { try { synchronized (this) { while (true) { wait(); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { this.close(); } } public static void main(String[] args) throws Exception { String hostPort = "111.230.239.152:2180,111.230.239.152:2181,111.230.239.152:2182"; String name = ManagementFactory.getRuntimeMXBean().getName(); String processId = name.substring(0, name.indexOf('@')); new ClusterClient(hostPort, processId).run(); } }
package testzookeeper; import java.io.IOException; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; /** *模擬集群監控 * */ public class ClusterMonitor { private static String membershipRoot = "/Members"; private final Watcher connectionWatcher; private final Watcher childrenWatcher; private ZooKeeper zk; boolean alive = true; public ClusterMonitor(String HostPort) throws Exception { connectionWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Watcher.Event.EventType.None && event.getState() == Watcher.Event.KeeperState.SyncConnected) { System.out.println("Client connect success !!!"); } } }; childrenWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { List<String> children; try { children = zk.getChildren(membershipRoot, this); System.out.println("Cluster Membership Change!!!"); System.out.println("Members: " + children); }catch (Exception e) { e.printStackTrace(); } } } }; zk = new ZooKeeper(HostPort, 2000, connectionWatcher); if (zk.exists(membershipRoot, false) == null) { zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } List<String> children = zk.getChildren(membershipRoot, childrenWatcher); System.err.println("Members: " + children); } public synchronized void close() { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } public void run() { try { synchronized (this) { while (alive) { wait(); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { this.close(); } } public static void main(String[] args) throws Exception { String hostPort = "111.230.239.152:2180,111.230.239.152:2181,111.230.239.152:2182"; new ClusterMonitor(hostPort).run(); } }