目录
1.客户端API
2.服务器动态上下线
3.分布式锁
1.客户端API
1.1导入依赖
junitjunit4.13.2testorg.apache.logging.log4jlog4j-api2.14.1org.apache.zookeeperzookeeper3.5.7
1.2代码实现
public class zkClient {//一定不要有空格private String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181";private int sessionTimeOut = 2000;private ZooKeeper zkClient;/** * 初始话zookeeper * 参数1:连接地址 * 参数2:超时时间 * 参数3:监听器 */@Beforepublic void init() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {List children = null;try {children = zkClient.getChildren("/", true);} catch (Exception e) {e.printStackTrace();}System.out.println("========================");for (String child : children) {System.out.println(child);}System.out.println("========================");}});}/** * 创建子节点 * 参数1:创建节点的路径 * 参数2:节点的数据(转化为字节) * 参数3:节点的权限 * 参数4:节点的类型(临时/永久) **/@Testpublic void create() throws InterruptedException, KeeperException {String nodeCreate = zkClient.create("/class", "s1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}/** * 监控子节点变化 * 参数1:要监控的节点目录 * 参数2:监听器(true:使用初始化是的监听器) */@Testpublic void getChildren() throws InterruptedException, KeeperException {List children = zkClient.getChildren("/", true);for (String child : children) {System.out.println(child);}//延时Thread.sleep(Long.MAX_VALUE);}/** * 判断节点是否存在 * 参数1:判断的节点路径 * 参数:是否使用监听器 */@Testpublic void isExist() throws InterruptedException, KeeperException {Stat stat = zkClient.exists("/class", false);System.out.println(stat == null " />
1.3写数据原理
1.写入请求直接发送给Leader
- 1.客户端发请求给Leader
- 2.leader执行请求并应答,然后把请求分发给下一个follower
- 3.follower会执行请求并应答。
- 4.当应答数超过半数,Leader就会回复客户端,完成了写请求
- 5.leader会继续发送写请求给剩下的follower
2.写入请求发送给Follower
- 1.客户端发请求给follower,follower没有写权限,立即把写请求发给leader
- 2.leader执行写请求并应答,然后把写请求分发给follower
- 3.follower会执行请求并应答。
- 4.当应答数超过半数,Leader回复follower,由follower回复客户端,完成了写请求
- 5.leader会继续发送写请求给剩下的follower
2.服务器动态上下线
2.1客户端
- 1.获取zookeeper连接
- 2.监听节点的变化
- 3.业务逻辑(睡眠)
public class DisClient {private String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181";private int sessionTimeOut = 2000;private ZooKeeper zooKeeper;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DisClient client = new DisClient();//1.获取zk连接client.getConnect();//2.监听/servers下面的节点变化client.getServerList();//3.业务逻辑client.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}/** * 监听服务端(获取节点信息) * @throws InterruptedException * @throws KeeperException */private void getServerList() throws InterruptedException, KeeperException {List children = zooKeeper.getChildren("/servers", true);//服务器地址存放到集合中ArrayList list = new ArrayList();for (String child : children) {byte[] data = zooKeeper.getData("/servers/" + child, false, null);list.add(new String(data));}System.out.println(list);}/** * 初始话zookeeper * @throws IOException */private void getConnect() throws IOException {zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {getServerList();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}});}}
2.2服务端
- 1.获取zookeeper连接
- 2.创建节点(服务端注册到zookeeper)
- 3.业务逻辑(睡眠)
/** * 服务端注册zookeeper */public class DisServer {private String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181";private int sessionTimeOut = 2000;private ZooKeeper zooKeeper;public static void main(String[] args) throws Exception {DisServer dIsServer = new DisServer();//1.获取zk连接dIsServer.getConnect();//2.注册服务器到zk节点(创建节点)dIsServer.register(args[0]);//3.启动业务逻辑dIsServer.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}/** * 注册服务器(创建节点) * @param hostname * @throws InterruptedException * @throws KeeperException */private void register(String hostname) throws InterruptedException, KeeperException {String create = zooKeeper.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname + "已经上线");}/** * 初始化zookeeper * @throws IOException */private void getConnect() throws IOException {zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}}
3.分布式锁
Zookeeper的分布式锁实现基于其znode(zk节点)功能。每个znode都可以有数据和子节点,并且每个znode都有一个版本号。Zookeeper的分布式锁利用了znode的版本号特性,同时使用watcher机制实现分布式锁的互斥
3.1执行流程
- 当一个客户端需要获取锁时,它会在Zookeeper上创建一个临时且有序的znode节点。
- 客户端通过获取Zookeeper上的znode列表,并判断自己创建的节点是否是所有节点中最小的那个,如果是,则表示客户端获得了锁。
- 如果客户端没有获得锁,则监听它前面(比它序号小的)的节点,等待锁的释放。
- 当客户端释放锁时,它会删除自己创建的znode节点,此时,Zookeeper会通知正在等待前面的节点上的watcher机制,让等待锁的客户端尝试重新获取锁
3.2代码实现
/** * 分布式锁 */public class ZkLock {private final String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181";private final int sessionTimeOut = 2000;private final ZooKeeper zooKeeper;private String path;private CountDownLatch countDownLatch = new CountDownLatch(1);private CountDownLatch countPathLatch = new CountDownLatch(1);private String currentNode;//构造器初始化public ZkLock() throws IOException, InterruptedException, KeeperException {//1.获取链接zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//countDownLatch 连接上zookeeper,释放if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {countDownLatch.countDown();}//countPathLatch 释放if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(path)) {countPathLatch.countDown();}}});//等待zookeeper正常连接后,往下执行程序countDownLatch.await();//2.判断根节点locks是否存在Stat stat = zooKeeper.exists("/locks", false);if (stat == null) {//说明不存在,创建根节点zooKeeper.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}//3.加锁public void zkLock() {//创建对应的临时带序号的节点try {currentNode = zooKeeper.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//判断创建的节点是否是最小的序号节点,如果是,获取到锁,如果不是,监听他前一个结点List children = zooKeeper.getChildren("/locks", false);//如果只有一个值,直接获取锁,如果不是,则判断if (children.size() == 1) {//直接枷锁return;} else {//对节点进行排序Collections.sort(children);//获取节点名称String thisNode = currentNode.substring("/locks/".length());//通过界节点名称,获取在集合中的下标int index = children.indexOf(thisNode);//判断下标if (index == -1) {System.out.println("数据异常");} else if (index == 0) {//第一个数据//直接枷锁return;} else {//说明多个节点,进行监听前一个节点path = "/locks/" + children.get(index - 1);zooKeeper.getData(path, true, new Stat());//等待监听countPathLatch.await();return;}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}//4.解锁public void unZkLock() {//删除节点try {zooKeeper.delete(currentNode, -1);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}}
3.3线程测试
public class ZkLockTest {public static void main(String[] args) throws IOException, InterruptedException, KeeperException {ZkLock zkLock1 = new ZkLock();ZkLock zkLock2 = new ZkLock();ZkLock zkLock3 = new ZkLock();new Thread(new Runnable() {@Overridepublic void run() {try {zkLock1.zkLock();System.out.println("线程1,获取到锁");Thread.sleep(3 * 1000);zkLock1.unZkLock();System.out.println("线程1,释放锁");} catch (InterruptedException e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {zkLock2.zkLock();System.out.println("线程2,获取到锁");Thread.sleep(3 * 1000);zkLock2.unZkLock();System.out.println("线程2,释放锁");} catch (InterruptedException e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {zkLock3.zkLock();System.out.println("线程3,获取到锁");Thread.sleep(3 * 1000);zkLock3.unZkLock();System.out.println("线程3,释放锁");} catch (InterruptedException e) {e.printStackTrace();}}}).start();}}
4.Curator框架
Curator是Apache ZooKeeper的一个高级客户端库,旨在使开发人员更容易编写可靠的分布式系统。它为ZooKeeper提供了许多有用的功能,包括连接管理,分布式锁和选举,缓存和观察。Curator还提供了一组易于使用的API,可以轻松管理ZooKeeper的节点和数据。
4.1添加依赖
org.apache.curatorcurator-framework5.5.0org.apache.curatorcurator-client5.5.0org.apache.curatorcurator-recipes5.5.0
4.2代码实现
/** * 客户端连接 * @return */private static CuratorFramework getCuratorFramework() {//创建zookeeper的客户端:重试策略,初始化每次重试之间需要等待的时间,基准等待时间为3秒ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.20.129:2181,192.168.20.131:2181,192.168.20.130:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy).build();client.start();System.out.println("zookeeper启动~");return client;}
4.3创建线程测试
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println("线程一获取到锁");Thread.sleep(3000);lock1.release();System.out.println("线程一释放锁");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println("线程二获取到锁");Thread.sleep(3000);lock2.release();System.out.println("线程二释放锁");} catch (Exception e) {e.printStackTrace();}}}).start();}