Zookeeper:实现“分布式锁”的 Demo
迪丽瓦拉
2024-02-08 08:54:37
0

Zookeeper 能保证数据的强一致性,用户任何时候都可以相信集群中每个节点的数据都是相同的。一个用户创建一个节点作为锁,另一个用户检测该节点,如果存在,代表别的用户已经锁住,如果不存在,则可以创建一个节点,代表拥有一个锁。

本篇内容包括:Demo 概述、代码实现、测试结果


文章目录

    • 一、Demo 概述
        • 1、关于 zookeeper “命名服务协调”
        • 2、Demo 设计
        • 3、Demo 前提
    • 二、代码实现
        • 1、引用 Maven 依赖
        • 2、ConnectionWatcher 类创建 Zookeeper 连接
        • 3、ActiveKeyValueStore 类读写 Zookeeper 数据
        • 4、ZkLock 类实现分布式锁
    • 三、测试结果


一、Demo 概述

1、关于 zookeeper “命名服务协调”

Zookeeper 能保证数据的强一致性,用户任何时候都可以相信集群中每个节点的数据都是相同的。一个用户创建一个节点作为锁,另一个用户检测该节点,如果存在,代表别的用户已经锁住,如果不存在,则可以创建一个节点,代表拥有一个锁。

2、Demo 设计

分布式锁本质,就是多个资源竞争者对一份资源的排他占有

  • 我们设置多个线程,分别在同一 path 下创建节点
  • 没个线程获取当前 path 下子节点,看最小子节点是否为自身,是则加锁成功(更好的方式是用 Watcher 对前一个地址监控,这里图方便用子节点排序取最小的方式 )
  • 线程加锁成功后,执行任务,执行完毕后解锁

3、Demo 前提

参考:Mac通过Docker安装Zookeeper集群


二、代码实现

1、引用 Maven 依赖

        org.apache.zookeeperzookeeper3.7.0

2、ConnectionWatcher 类创建 Zookeeper 连接

import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;public class ConnectionWatcher implements Watcher {private final CountDownLatch connectedSignal = new CountDownLatch(1);private static final int SESSION_TIMEOUT = 5000;protected ZooKeeper zk;public void connect(String hosts) throws IOException, InterruptedException {zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);connectedSignal.await();}@Overridepublic void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {connectedSignal.countDown();}}public void close() throws InterruptedException {zk.close();}}

3、ActiveKeyValueStore 类读写 Zookeeper 数据

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;public class ActiveKeyValueStore extends ConnectionWatcher {private static final Charset CHARSET = StandardCharsets.UTF_8;int state = 0;/*** 写入节点数据** @param path  节点地址* @param value 数据值* @throws InterruptedException 中断异常* @throws KeeperException      ZooKeeper异常*/public void write(String path, String value) throws InterruptedException, KeeperException {Stat stat = zk.exists(path, false);if (stat == null) {if (value == null) {zk.create(path, null,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);} else {zk.create(path, value.getBytes(CHARSET),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} else {if (value == null) {zk.setData(path, null, -1);} else {zk.setData(path, value.getBytes(CHARSET), -1);}}}public boolean lock(String path, String name) throws InterruptedException, KeeperException {boolean flag = tryLock(path, name);if (flag) {state++;}return flag;}public boolean tryLock(String path, String name) throws InterruptedException, KeeperException {String lockPath = path + "/" + name;zk.create(lockPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);List waits = readChildren(path, null);Collections.sort(waits);if (waits.get(0).equals(name)) {return true;}CountDownLatch latch = new CountDownLatch(1);for (int i = 0; i < waits.size(); i++) {String cur = waits.get(i);if (!cur.equalsIgnoreCase(name)) {continue;}String prePath = path + "/" + waits.get(i - 1);zk.exists(prePath, new Watcher() {@Overridepublic void process(WatchedEvent event) {latch.countDown();}});break;}latch.await();return true;}public boolean unlock(String path, String name) {if (state > 1) {state--;return true;}String lockPath = path + "/" + name;try {Stat stat = zk.exists(lockPath, false);int version = stat.getVersion();zk.delete(lockPath, version);state--;return true;} catch (Exception e) {System.out.println("unlock:" + lockPath + " ,exception,");}return false;}/*** 获取所有子节点** @param path    节点地址* @param watcher watcher* @return 所有子节点* @throws InterruptedException 中断异常* @throws KeeperException      ZooKeeper异常*/public List readChildren(String path, Watcher watcher) throws InterruptedException, KeeperException {List childrens = null;if (watcher == null) {childrens = zk.getChildren(path, false);} else {childrens = zk.getChildren(path, watcher, null);}return childrens;}
}

4、ZkLock 类实现分布式锁

import lombok.SneakyThrows;
import org.apache.zookeeper.KeeperException;import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;public class ZkLock {/*** 开启的线程数,模拟多客户端操作*/private static final int CLIENTS_NUM = 3;private final ActiveKeyValueStore store;public ZkLock(String hosts) throws IOException, InterruptedException {//定义一个类store = new ActiveKeyValueStore();//连接Zookeeperstore.connect(hosts);}public static void testLock() {//线程计数器控制业务的执行final CountDownLatch countDownLatch = new CountDownLatch(CLIENTS_NUM);for (int i = 0; i < CLIENTS_NUM; i++) {new Thread() {@Overridepublic void run() {}}.start();}try {// 堵塞线程,任务执行完后释放countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws IOException, InterruptedException, KeeperException {String hosts = "localhost:2181";ZkLock zkLock = new ZkLock(hosts);// 创建父节点zkLock.store.write("/lock4", "父亲节点");//CountDownLatch latch = new CountDownLatch(CLIENTS_NUM);for (int i = 0; i < CLIENTS_NUM; i++) {int finalI = i;new Thread() {@SneakyThrows@Overridepublic void run() {String name = "Thread-" + String.valueOf(finalI);zkLock.store.lock("/lock4", name);TimeUnit.SECONDS.sleep(2);System.out.println("线程-" + name + "执行完毕");latch.countDown();zkLock.store.unlock("/lock4", name);}}.start();}latch.await();System.out.println("end ...");}}

三、测试结果

ZkLock 代码测试结果如下:

线程-Thread-0执行完毕
线程-Thread-1执行完毕
线程-Thread-2执行完毕
end ...

通过 ZkLock 打印的信息可以看出,已经成功模拟实现分布式锁

相关内容