Appearance
Hello Zookeeper
数据模型
Zookeeper 是一个树形目录服务,其数据模型和 Unix 的文件系统目录树很类似,拥有一个层次化结构。这里面的每一个节点都被称为 ZNode,每个节点上都会保存自己的数据和节点信息。节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。
节点可以分为四大类:
- PERSISTENT 持久化节点
- EPHEMERAL 临时节点:
-e - PERSISTENT_SEQUENTIAL 持久化顺序节点:
-s - EPHEMERAL_SEQUENTIAL 临时顺序节点:
-es
服务端常用命令
./zkServer.sh start:启动服务./zkServer.sh status:查看服务状态./zkServer.sh stop:停止服务./zkServer.sh restart:重启服务
客户端常用命令
基础命令:
./zkCli.sh -server ip:port:连接 Zookeeper 服务端quit:断开连接help:查看命令帮助ls [-s] /PATH:查看节点列表[-s]:查看额外的、更详细的元数据信息字段名 示例值 描述 cZxid0x0创建该 ZNode 的事务 ID ctimeThu Jan 01 00:00:00 UTC 1970创建时间 mZxid0x0最后一次修改该 ZNode 的事务 ID mtimeThu Jan 01 00:00:00 UTC 1970最后修改时间 pZxid0x0最后一次修改该 ZNode 子节点的事务 ID cversion-1ZNode 子节点的版本号 dataVersion0数据版本 aclVersion0权限版本 ephemeralOwner0x0创建该临时节点的会话 ID;非临时节点为 0 dataLength0ZNode 中存储数据的字节数 numChildren1ZNode 当前拥有的子节点数量
create [-e] [-s] /PATH [VALUE]:创建节点-e:创建临时节点-s:创建顺序节点
get /PATH:获取节点值set /PATH VALUE:设置节点值delete /PATH:删除单个节点deleteall /PATH:删除带有子节点的节点
Curator
简介
Curator 是 Apache Zookeeper 的 Java 客户端库。
常见的 Zookeeper Java API 有:
- 原生 Java API
- ZkClient
- Curator
Curator 项目的目标是简化 Zookeeper 客户端的使用。Curator 最初是 Netflix 研发的,后来捐献给了 Apache 基金会,目前是 Apache 的顶级项目。
版本兼容性
需要注意 Curator 客户端与 Zookeeper 服务端版本间的兼容性,具体各版本之间兼容性可以通过 Curator 官网确认。
基础示例
Maven 依赖
XML
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.9.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.9.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.9.4</version>
</dependency>建立连接
Java
CuratorFramework client;
client = CuratorFrameworkFactory.builder()
.connectString("192.168.80.31:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(new ExponentialBackoffRetry(300, 10))
.namespace("namespace")
.build();
client.start();添加节点
添加节点,并设置节点数据:
Java
// 如果创建节点没有指定数据,则默认将当前客户端的 IP 作为数据
String path = client.create().forPath("/app", "hello".getBytes());如果父节点不存在,则自动创建所有必要的父节点:
Java
String path = client.create().creatingParentsIfNeeded().forPath("/app1/child", "hi".getBytes());查询节点
查询数据:
Java
byte[] data = client.getData().forPath("/app");
System.out.println(new String(data));查询子节点:
Java
List<String> children = client.getChildren().forPath("/app1");
System.out.println(children);查询节点状态:
Java
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/app");
System.out.println(status);修改节点
直接修改:
Java
Stat status = client.setData().forPath("/app", "你好".getBytes());
System.out.println(status);带版本号校验的修改:
Java
Stat status = client.setData().withVersion(1).forPath("/app", "很好".getBytes());
System.out.println(status);删除节点
删除单个节点(当目标节点有子节点时报错):
Java
client.delete().forPath("/app1");删除带有子节点的节点:
Java
client.delete().deletingChildrenIfNeeded().forPath("/app1");边缘情况处理:操作在服务器上可能已经成功,但在响应成功返回给客户端之前发生了连接失败。
Java
client.delete().guaranteed().forPath("/app");在后台执行删除并执行回调:
Java
client.delete().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println(event);
}
}).forPath("/app");Watch 事件监听
Watcher 是 Zookeeper 提供的一种事件监听机制,用于在 ZNode 状态发生变化时,向客户端发送通知。 其核心作用是实现 分布式系统中的事件驱动与状态感知,避免客户端频繁轮询。
典型应用场景包括:
- 配置中心(配置变更通知)
- 服务注册与发现
- 分布式锁(后续说明)
- Leader 选举
- 集群成员感知
Watcher 监听的是 ZNode 级别的变化,主要包括以下几类:
节点状态事件(EventType)
NodeCreated:节点被创建NodeDeleted:节点被删除NodeDataChanged:节点数据变化NodeChildrenChanged:子节点列表变化
会话事件(State)
SyncConnected:客户端成功连接Disconnected:网络中断Expired:会话过期
Watcher 通知是 异步的,不保证实时性(但通常延迟很低),不保证每一次变化都被 “逐一通知”(多次连续修改,可能只收到一次通知),因此 Watcher 不适合用于事件计数。
Watcher 事件中不包含最新数据,只告诉你 “发生了什么变化”。正确使用方式是,收到通知后主动再读一次数据。
Zookeeper 原生的 Water 是一次性的,也就是在事件触发一次后会自动移除该 Watcher。
Curator 中提供了更高层的监听器 Cache,它们的监听器对使用者来说是 “持续的”(在内部会自动重新注册 Watcher)。Curator 提供了如下 Cache:
NodeCache:单个节点PathChildrenCache:子节点变化TreeCache:整个子树CuratorCache(新):统一替代方案
NodeCache Deprecated
Java
NodeCache nodeCache = new NodeCache(client, "/app");
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
nodeCache.start(true);PathChildrenCache Deprecated
Java
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app1", true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
PathChildrenCacheEvent.Type type = event.getType();
if (type.equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
byte[] data = event.getData().getData();
System.out.println(new String(data));
}
}
});
pathChildrenCache.start();TreeCache Deprecated
Java
TreeCache treeCache = new TreeCache(client, "/app1");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println(event);
}
});
treeCache.start();CuratorCache New
Java
CuratorCache curatorCache = CuratorCache.build(client, "/app1");
curatorCache.listenable().addListener(new CuratorCacheListener() {
@Override
public void event(Type type, ChildData oldData, ChildData data) {
System.out.println("节点变化:类型=" + type +
",路径=" + data.getPath() +
",新数据=" + (data.getData() != null ? new String(data.getData()) : "null") +
",旧数据=" + (oldData != null && oldData.getData() != null ? new String(oldData.getData()) : "null"));
}
});
curatorCache.start();分布式锁实现
实现原理
客户端尝试获取锁时,会在指定的 Lock 节点下创建一个临时顺序节点;
节点的临时特性保证了客户端会话失效时锁能够被自动释放。
节点创建成功后,客户端获取 Lock 节点下的所有子节点列表,并对这些子节点按照顺序号进行排序。如果发现自己创建的节点序号最小,则认为当前客户端成功获取到锁。锁使用完成后,客户端需要主动删除该临时节点以释放锁;
如果客户端发现自己创建的节点并不是所有子节点中序号最小的节点,则说明锁已被其他客户端持有。此时,客户端需要找到序号紧邻且小于自己节点的那个子节点,并对该节点注册 Watcher,监听其删除事件;
该策略可有效避免对锁根节点的集中监听,从而避免 “羊群效应”。
当被监听的前驱节点被删除时,客户端的 Watcher 会收到通知。此时,客户端需要重新获取 Lock 节点下的子节点列表,并再次判断自己创建的节点是否为序号最小的节点:
- 如果是,则成功获取锁;
- 如果仍然不是,则重复上述步骤,继续查找新的前驱节点并注册监听,直到获取到锁为止。
使用示例
Curator 中有五种锁方案:
InterProcessSemaphoreMutex:分布式排他锁(非可重入)InterProcessMutex:分布式可重入排它锁InterProcessReadWriteLock:分布式读写锁InterProcessMultiLock:将多个锁作为单个实体管理的容器InterProcessSemaphoreV2:共享信号量
Java
InterProcessMutex lock = new InterProcessMutex(client, "/lock");
lock.acquire();
try {
// do your job
} finally {
lock.release();
}