5.1 案例1:监听服务器动态上下线
需求
某分布式系统中,节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到节点服务器的上下线
需求分析
这个案例涉及到 3 个角色:
- 分布式服务器
A
.(我们需要监视动态上下线的机器) - Zookeeper 服务器
B
- 客户端(需要实时的感受到分布式服务器上下线情况的)
C
分析:
分布式服务器 A
上下线的时候, 我们需要在客户端 C
上收到分布式服务器 A
上下线的通知.
很显眼, 分布式服务器 A
上下线的通知应该由 Zookeeper 服务器 B
来发出.
让客户端 C
监听 Zookeeper 服务器 B
中节点(znode
)的变化, 当节点发生变化的, C
可以得到通知.
znode
怎么变化呢? 当分布式服务器 A
上线的时候在 Zookeeper 服务器上注册暂时节点, 当分布式服务器下线的时候暂时节点会自动删除.
对 Zookeeper 服务器B
来说, 分布式服务器 A
和客户端 C
来说, A
和C
他们都是客户端.
创建一个节点/servers
, 临时节点作为它的子节点.
运行在分布式服务器 A
上的代码:
package com.atguigu.zkcase;
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer {
private static final String connString = "hadoop201:2181,hadoop202:2181,hadoop203:2181";
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
/*1. 创建 zookeeper 客户端对象 通过参数来模拟哪个服务器上线了*/
ZooKeeper zk = new ZooKeeper(connString, 2000, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
/*2. 在zookeeper服务器中创建临时节点, 包含这个服务器的信息*/
zk.create("/servers/" + args[0], args[0].getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
//进程休眠状态, 模拟当前客户端一直在连接到zookeeper服务器.
Thread.sleep(Long.MAX_VALUE);
}
}
运行中客户端 C
上的代码
package com.atguigu.zkcase;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
public class DistributeClient {
private static final String connString = "hadoop201:2181,hadoop202:2181,hadoop203:2181";
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
/*1. 创建 zookeeper客户端对象*/
ZooKeeper zk = new ZooKeeper(connString, 2000, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
/*2. 监听 /servers/hadoop201... 节点来感知分布式服务器的上线或下线, 需要循环监听*/
for (int i = 1; i < 204; i++) {
String znodePath = "/servers/hadoop20" + i;
zk.exists(znodePath, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
if(event.getType() == Event.EventType.NodeCreated){
System.out.println(event.getPath() + " 上线了");
}else if(event.getType() == Event.EventType.NodeDeleted){
System.out.println(event.getPath() + " 下线了");
}
zk.exists(znodePath, this); // 循环监听
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
Thread.sleep(Long.MAX_VALUE);
}
}