Skip to content
Sev7e0

基于zookeeper活动规则变更事件监听

zk1 min read

Watcher 事件处理器

在 zookeeper 中,Watcher 接口用于定义一个标准的事件处理器,其内部定义了方法 process 和内部类 Event 包含了 KeeperState 和 EventType 两个枚举类,分别代表了通知状态和事件类型。

1@Public
2public interface Watcher {
3 void process(WatchedEvent var1);
4
5 @Public
6 public interface Event {
7 @Public
8 public static enum EventType {
9 None(-1),
10 NodeCreated(1),
11 NodeDeleted(2),
12 NodeDataChanged(3),
13 NodeChildrenChanged(4);
14
15 private final int intValue;
16
17 private EventType(int intValue) {
18 this.intValue = intValue;
19 }
20
21 public int getIntValue() {
22 return this.intValue;
23 }
24
25 public static Watcher.Event.EventType fromInt(int intValue) {
26 switch(intValue) {
27 case -1:
28 return None;
29 case 0:
30 default:
31 throw new RuntimeException("Invalid integer value for conversion to EventType");
32 case 1:
33 return NodeCreated;
34 case 2:
35 return NodeDeleted;
36 case 3:
37 return NodeDataChanged;
38 case 4:
39 return NodeChildrenChanged;
40 }
41 }
42 }
43
44 @Public
45 public static enum KeeperState {
46 /** @deprecated */
47 @Deprecated
48 Unknown(-1),
49 Disconnected(0),
50 /** @deprecated */
51 @Deprecated
52 NoSyncConnected(1),
53 SyncConnected(3),
54 AuthFailed(4),
55 ConnectedReadOnly(5),
56 SaslAuthenticated(6),
57 Expired(-112);
58
59 private final int intValue;
60
61 private KeeperState(int intValue) {
62 this.intValue = intValue;
63 }
64
65 public int getIntValue() {
66 return this.intValue;
67 }
68
69 public static Watcher.Event.KeeperState fromInt(int intValue) {
70 switch(intValue) {
71 case -112:
72 return Expired;
73 case -1:
74 return Unknown;
75 case 0:
76 return Disconnected;
77 case 1:
78 return NoSyncConnected;
79 case 3:
80 return SyncConnected;
81 case 4:
82 return AuthFailed;
83 case 5:
84 return ConnectedReadOnly;
85 case 6:
86 return SaslAuthenticated;
87 default:
88 throw new RuntimeException("Invalid integer value for conversion to KeeperState");
89 }
90 }
91 }
92 }
93}

使用 zookeeper 原生的 Watcher 监听器的方式也很简单,代码如下:

1zk = new ZooKeeper(connectString, sessionTimeout, event -> {
2 log.info("变更状态----{}", event.getState());
3 log.info("变更类型----{}", event.getType());
4});

当一个 Watcher 完成连接并注册后,服务端的事件触发 Watcher,那么就会向指定客户端发送一个事件通知,来实现分布式的通知功能。客户收到服务器的通知后,执行 process 方法。

但此监听操作只能监听一次,也就是说每次时间触发后,你都需要再次注册。

Curator 事件监听

curator 是由 Netflix 基于原生 zk 开发的客户端工具,基于更多的场景进行了封装(分布式锁、分布式选举),以及相比于原生客户端更加容易理解的接口。简化了原生的事件和方法。

在 curator 中使用事件监听有两种方式,第一种基于观察者模式(Watcher 监听器),另一种是自己缓存监听模式。标准的监听模式是使用 Watcher 监听器。第二种缓存监听模式引入了一种本地缓存视图的 Cache 机制,来实现对 Zookeeper 服务端事件监听。

Cache 事件监听可以理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能。Cache 是一种缓存机制,可以借助 Cache 实现监听。简单来说,Cache 在客户端缓存了 znode 的各种状态,当感知到 zk 集群的 znode 状态变化,会触发 event 事件,注册的监听器会处理这些事件。

Watcher 监听器比较简单,只有一种。Cache 事件监听的种类有 3 种 Path Cache,Node Cache,Tree Cache。

Curator-Watcher 监听器使用

1client.getData()
2 // 每次选举失败,重新注册节点监听事件
3 .usingWatcher((CuratorWatcher) event -> {
4 log.info("leader node was changed, will start election");
5 // 递归调用,解决只能生效一次的问题
6 electionMaster(data);
7 })
8 .forPath(LOCKNODE);

Curator 中同样也定义了 CuratorWatcher-process 方法,且参数是 zk 客户端封装的 WatcherEvent 对象,需要注意的是该对象并不是 zk 服务端传递的对象,而是 zk 客户端自己封装的事件。zk 服务端传递过来的仅仅是一个 WatcherEvent 的类型。

Watcher 监听器是一次性的,如果要反复使用,就需要反复的使用 usingWatcher 提前注册。

所以,Watcher 监听器不能应用于节点的数据变动或者节点变动这样的一般业务场景。而是适用于一些特殊的,比如会话超时、授权失败等这样的特殊场景。

既然 Watcher 监听器是一次性的,在开发过程中需要反复注册 Watcher,比较繁琐。Curator 引入了 Cache 来监听 ZooKeeper 服务端的事件。Cache 对 ZooKeeper 事件监听进行了封装,能够自动处理反复注册监听。

NodeCache 节点缓存的监听

Curator 引入的 Cache 缓存实现,是一个系列,包括了 Node Cache 、Path Cache、Tree Cache 三组类。其中 Node Cache 节点缓存可以用于 ZNode 节点的监听,Path Cache 子节点缓存用于 ZNode 的子节点的监听,而 Tree Cache 树缓存是 Path Cache 的增强,不光能监听子节点,也能监听 ZNode 节点自身。

Node Cache,可以用于监控本节点的新增,删除,更新。

Node Cache 使用的第一步,就是构造一个 NodeCache 缓存实例。

有两个构造方法,具体如下:

1NodeCache(CuratorFramework client, String path)
2
3NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)

第一个参数就是传入创建的 Curator 的框架客户端,第二个参数就是监听节点的路径,第三个重载参数 dataIsCompressed 表示是否对数据进行压缩。具体使用如下:

1try {
2 maCache = new NodeCache(client, maZkPath);
3 maCache.getListenable().addListener(() -> {
4 // 接收到事件后进行相应的操作
5 });
6 maCache.start(true);
7} catch (Exception e) {
8 LOGGER.error("callbackMAPublish exception", e);
9}

需要注意的是,在 NodeCache 的 start 方法中有两种:

1void start()//Start the cache.
2void start(boolean buildInitial) //true 代表缓存当前节点

唯一的一个参数 buildInitial 代表着是否将该节点的数据立即进行缓存。如果设置为 true 的话,在 start 启动时立即调用 NodeCache 的 getCurrentData 方法就能够得到对应节点的信息 ChildData 类,如果设置为 false 的就得不到对应的信息。

此种方式即可以解决 Watcher 监听器带来的相关问题,同时 curator 还提供了其他两种类似的 Cache,Tree Cache 节点树监听、PathChildrenCache 子节点监听, 使用方式大同小异,只不过监听节点的范围不同,可以在使用时根据自己的需求进行调整,在目前项目中只需要借助其一个单 node 监听能力,及时更新下游规则引擎中的活动 规则即可,所以是用的 NodeCache。