文章目录

  • 问题背景
  • 前言
  • 实现
    • 搭建Zookeeper容器
    • 引入依赖
    • ZK客户端的配置类
    • ZK客户端的工厂类
    • 注入bean
    • 构建测试类

问题背景

研究分布式锁,基于ZK实现,需要整合到SpringBoot使用

前言

  1. 参考自SpringBoot集成Curator实现Zookeeper基本操作,Zookeeper入门
  2. 本篇的代码笔者有自己运行过,需要注意组件的版本号是否兼容,否则会有比较多的坑

实现

搭建Zookeeper容器

采用Docker compose快速搭建ZK容器,很快,几分钟就好了,而且是集群方式搭建。详情见笔者的Docker搭建zookeeper

引入依赖

需要注意的点:Curator 2.x.x-兼容两个zk 3.4.xzk 3.5.xCurator 3.x.x-兼容兼容zk 3.5,根据搭建的zk的版本使用对应的curator依赖。引入的zk依赖,如果项目中有使用logback日志 ,需要排除zk中的log4j12依赖,详情见下面笔者给出的依赖:

<dependencies><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>2.12.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.12.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.12.0</version></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.7</version><exclusions><exclusion><artifactId>slf4j-log4j12</artifactId><groupId>org.slf4j</groupId></exclusion><exclusion><artifactId>slf4j-api</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency>

ZK客户端的配置类

配置ZK的参数,使用@ConfigurationProperties可以令配置热更新,比如搭配Apollo、Nacos,如果使用@Valid则无法热更新,必须重启项目才能生效

@Component@ConfigurationProperties(prefix = "curator")@Datapublic class ZKClientProps {private String connectString;private int retryCount;private int elapsedTimeMs;private int sessionTimeoutMs;private int connectionTimeoutMs;}

对应yml如下:

#curator配置curator:connectString: 192.168.163.128:2181,192.168.163.128:2182,192.168.163.128:2183 # zookeeper 地址retryCount: 1 # 重试次数elapsedTimeMs: 2000 # 重试间隔时间sessionTimeoutMs: 60000 # session超时时间connectionTimeoutMs: 10000 # 连接超时时间

ZK客户端的工厂类

定制ZK客户端:

@Componentpublic class ZKClientFactory {@Resourceprivate ZKClientProps zkClientProps;public CuratorFramework createSimple() {//重试策略:第一次重试等待1S,第二次重试等待2S,第三次重试等待4s//第一个参数:等待时间的基础单位,单位为毫秒//第二个参数:最大重试次数ExponentialBackoffRetry retry = new ExponentialBackoffRetry(zkClientProps.getElapsedTimeMs(), zkClientProps.getRetryCount());//获取CuratorFramework示例的最简单方式//第一个参数:zk的连接地址//第二个参数:重试策略return CuratorFrameworkFactory.newClient(zkClientProps.getConnectString(), retry);}public static CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {return CuratorFrameworkFactory.builder().connectString(connectionString).retryPolicy(retryPolicy).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs).build();}}

注入bean

创建ZK的客户端,详情如下:

@Component@Slf4jpublic class ZKClient {@Resourceprivate ZKClientFactory zkClientFactory;public static final ZKClient INSTANCE = new ZKClient();private ZKClient() {}public CuratorFramework getClient() {return zkClientFactory.createSimple();}public boolean isNodeExist(String path) {CuratorFramework client = getClient();try {client.start();Stat stat = client.checkExists().forPath(path);return stat != null;} catch (Exception e) {e.printStackTrace();} finally {CloseableUtils.closeQuietly(client);}return false;}public void createNode(String path, byte[] bytes) {CuratorFramework client = getClient();try {// 必须start,否则报错client.start();client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, bytes);} catch (Exception e) {e.printStackTrace();} finally {CloseableUtils.closeQuietly(client);}}public void deleteNode(String path) {CuratorFramework client = getClient();try {client.start();client.delete().forPath(path);} catch (Exception e) {e.printStackTrace();} finally {CloseableUtils.closeQuietly(client);}}public List<String> getChildren(String path) {List<String> result = new LinkedList<>();CuratorFramework client = getClient();try {client.start();result = client.getChildren().forPath(path);} catch (Exception e) {log.error("ZKClient getChildren error.");}return result;}}

构建测试类

测试基类,设置激活环境

@Slf4j@ActiveProfiles("test")@RunWith(SpringRunner.class)@SpringBootTest(classes = GmallZookeeperApplication.class)@ContextConfigurationpublic class BaseTest {}

创建节点、删除节点、获取节点信息、分布式锁的方法如下:@ActiveProfiles("company")是激活笔者一个application-company.yml文件

application.yml如下:

server:port: 8022spring:profiles:active: home

application-compay.yml如下:

#curator配置curator:connectString: 192.168.163.128:2181,192.168.163.128:2182,192.168.163.128:2183 # zookeeper 地址retryCount: 1 # 重试次数elapsedTimeMs: 2000 # 重试间隔时间sessionTimeoutMs: 60000 # session超时时间connectionTimeoutMs: 10000 # 连接超时时间

创建节点、删除节点、获取节点信息、分布式锁的方法如下:

@Slf4j@ActiveProfiles("company")public class ZKClientTest extends BaseTest{@Resourceprivate ZKClient zkClient;public static final int THREAD_NUM = 10;@Testpublic void distributedLock() throws InterruptedException, BrokenBarrierException {String lockPath = "/test/distributed2/lock";CuratorFramework client = zkClient.getClient();client.start();InterProcessMutex lock = new InterProcessMutex(client, lockPath);// 阻塞主线程,等待全部子线程执行完CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_NUM);for (int i = 0; i < THREAD_NUM; i++) {new Thread(() -> {log.info("{}->尝试竞争锁", Thread.currentThread().getName());try {lock.acquire(); // 阻塞竞争锁log.info("{}->成功获得锁", Thread.currentThread().getName());Thread.sleep(2000);cyclicBarrier.await();} catch (Exception e) {e.printStackTrace();} finally {try {lock.release(); //释放锁} catch (Exception e) {e.printStackTrace();}}}, "Thread-" + i).start();}// 目的是为了等子线程抢完锁再结束子线程,否则无法看到日志效果cyclicBarrier.await();log.info("全部子线程已执行完毕");}@Testpublic void createNode() {// 创建一个ZNode节点String data = "hello";byte[] payload = data.getBytes(StandardCharsets.UTF_8);String zkPath = "/test/CRUD/node-1";zkClient.createNode(zkPath, payload);log.info("createNode succeeded!");}@Testpublic void getChildren() {String zkPath = "/test/CRUD";List<String> children = zkClient.getChildren(zkPath);printList(children);}@Testpublic void deleteNode() {String parentPath = "/test";log.info("======================Before delete===================");List<String> before = zkClient.getChildren(parentPath);printList(before);String zkPath = "/test/CRUD/node-1";zkClient.deleteNode(zkPath);log.info("delete node secceeded!");log.info("======================After delete===================");List<String> after = zkClient.getChildren(parentPath);printList(after);}private void printList(List<String> data) {if (!CollectionUtils.isEmpty(data)) {for (String datum : data) {log.info("datum:{}", data);}}}}