springboot整合zookeeper实现分布式锁


 

01 安装并允许zookeeper

  1. 安装jdk
  2. 官网下载zookeeper的压缩包,我这里下载的是3.4.10版本
  3. 解压后进入到zookeeper-3.4.10/conf,修改zoo_sample.cfg文件修改为zoo.cfg文件
mv zoo_sample.cfg zoo.cfg
  • 1
  1. 打开zoo.cfg文件,修改dataDir路径。修改后在/usr/local/zookeeper-3.4.10目录创建文件夹mkdir zkData
dataDir=/usr/local/zookeeper-3.4.10/zkData
  • 1
  1. 启动zookeeper
/usr/local/zookeeper-3.4.10/bin/zkServer.sh start
  • 1

02 springboot应用配置CuratorFramework

  1. 导入maven依赖
<!-- zookeeper 客户端 --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency> 

 

  1. 配置CuratorFramework

zookeeper的默认端口是2181

import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.data.jpa.repository.config.EnableJpaAuditing; import org.springframework.scheduling.annotation.EnableScheduling; @EnableScheduling @EnableJpaAuditing @SpringBootApplication public class MyDemoApplication { public static void main(String[] args) { SpringApplication.run(MyDemoApplication.class, args); } @Bean public CuratorFramework curatorFramework() { return CuratorFrameworkFactory.newClient("127.0.0.1:2181", new RetryNTimes(5, 1000)); } } 

 

  1. 启动CuratorFramework客户端
import org.apache.curator.framework.CuratorFramework; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Service; /** * 实现了ApplicationRunner接口后,当容器启动后,会执行实现的run方法 * * @author 594781919 */ @Service public class StartService implements ApplicationRunner { @Autowired private CuratorFramework curatorFramework; @Autowired private ListenerService listenerService; @Override public void run(ApplicationArguments applicationArguments) { // 非常重要!!!Start the client. Most mutator methods will not work until the client is started curatorFramework.start(); System.out.println("zookeeper client start"); // 初始化监听方法 listenerService.listener(); } } 

 

 

03 使用zookeeper实现集群只一个应用实例执行定时任务

当我们启动多个实例时,需要其中一个实例执行定时任务,其它实例不执行。

import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.Date; /** * 实现多个应用实例只一个执行定时任务 * * @author 594781919@qq.com */ @Service public class TimerTaskService { @Autowired private CuratorFramework curatorFramework; @Value("${server.port}") private String port; @Scheduled(cron = "0/5 * * * * *") public void task() { LeaderLatch leaderLatch = new LeaderLatch(curatorFramework, "/timerTask"); try { leaderLatch.start(); // Leader选举需要一些时间,等待2秒 Thread.sleep(2000); // 判断是否为主节点 if (leaderLatch.hasLeadership()) { System.out.println(new Date() + " port=" + port + " 是领导"); // 定时任务的业务逻辑代码 } else { System.out.println(new Date() + " port=" + port + " 是从属"); } } catch (Exception e) { e.printStackTrace(); } finally { try { leaderLatch.close(); } catch (IOException e) { e.printStackTrace(); } } } } 

 

 

04 使用zookeeper实现分布式锁

import com.igola.domain.Employee; import com.igola.repository.EmployeeRepository; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 594781919@qq.com */ @RestController public class EmployeeController { @Autowired private EmployeeRepository employeeRepository; @Autowired private CuratorFramework curatorFramework; @GetMapping("/emp/save") public Employee save(String name) { // 获取锁 InterProcessSemaphoreMutex balanceLock = new InterProcessSemaphoreMutex(curatorFramework, "/zktest" + name); Employee employee = new Employee(); try { // 执行加锁操作 balanceLock.acquire(); System.out.println("已经加锁了, name=" + name); employee.setName(name); if ("abc".equals(name)) { Thread.sleep(30000); } employee.setAge((int) (Math.random() * 100)); employee.setSex(false); } catch (Exception e) { e.printStackTrace(); } finally { try { // 释放锁资源 balanceLock.release(); } catch (Exception e) { e.printStackTrace(); } } employeeRepository.save(employee); return employee; } } 
  • 1
  • 2

05 使用zookeeper实现调度任务

当我们在启动多个服务后,访问了其中一个服务,执行了一些方法。然后我们需要其它服务也要执行这些方法,就需要用到NodeCache。

比如我们把一些数据缓存到Map对象中,当需要更新这个Map对象的数据时,我们就可以用NodeCache将每个服务都更新自己的Map对象。

import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.data.Stat; import org.springframework.stereotype.Service; import javax.annotation.PreDestroy; import java.util.Date; /** * @author 594781919 */ @Service public class ListenerService { private final CuratorFramework curatorFramework; private NodeCache nodeCache; public static final String path = "/hello/world"; public ListenerService(CuratorFramework curatorFramework) { this.curatorFramework = curatorFramework; } public void listener() { try { // 创建路径 Stat stat = curatorFramework.checkExists().forPath(path); if (stat == null) { curatorFramework.create().creatingParentsIfNeeded().forPath(path); } } catch (Exception e) { e.printStackTrace(); } nodeCache = new NodeCache(curatorFramework, path); // 添加监听的路径改变后需要执行的任务 nodeCache.getListenable().addListener(this::run); try { nodeCache.start(); } catch (Exception e) { e.printStackTrace(); } System.out.println("开始监听......"); } @PreDestroy public void preDestroy() { CloseableUtils.closeQuietly(nodeCache); } public void notifyNodeCache() { try { curatorFramework.setData().forPath(path); } catch (Exception e) { e.printStackTrace(); } } // 需要执行的调度任务 private void run() { System.out.println(new Date().toLocaleString() + ", 开始执行监听任务"); } }


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM