Zookeeper–配置服务
配置服务是分布式应用中重要的服务,作用是使集群中的机器可以共享配置信息中公共的部分。ZooKeeper可作为一个具有高可用,全局一致的配置服务器,允许客户端获取和更新配置文件。使用ZooKeeper中的观察机制,可以建立一个活跃的配置服务,客户端监控自己感兴趣的配置节点,在第一时间得到配置信息修改的通知。
下面是一个简单的例子:
配置服务类:
public class ZkConfigService implements Watcher{
private final Charset CHARSET = Charset.forName("UTF-8");
private final int SESSION = 5000;
public final String CONFIG_PATH = "/__config__";
private ZooKeeper zk;
private CountDownLatch latch = new CountDownLatch(1);
private static ZkConfigService ser = new ZkConfigService();
public static ZkConfigService getInstance(){
return ser;
}
private ZkConfigService() {
try {
zk = new ZooKeeper("localhost:2181", SESSION, this);
latch.await();
if (zk.exists(CONFIG_PATH, false) == null) {
zk.create(CONFIG_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
private void close() throws InterruptedException {
zk.close();
}
public void write(String path, String value) {
path = CONFIG_PATH + "/" + path;
Stat stat = null;
try {
stat = zk.exists(path, false);
if (stat == null) {
zk.create(path, value.getBytes(CHARSET), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
zk.setData(path, value.getBytes(CHARSET), -1);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String read(String path, Watcher watcher) {
path = CONFIG_PATH + "/" + path;
byte[] data = new byte[0];
try {
data = zk.getData(path, watcher, null);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return new String(data, CHARSET);
}
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
}
配置更新类:
public class ConfigUpdater {
private String path;
private Random random = new Random();
private ZkConfigService configSer = ZkConfigService.getInstance();
public ConfigUpdater(String path) {
this.path = path;
}
public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
ConfigUpdater up = new ConfigUpdater("db.url");
up.run();
}
public void run() throws KeeperException, InterruptedException {
int i=0;
while (true) {
String value = i++ + "";
configSer.write(path, value);
System.out.printf("Set %s to %s\n", path, value);
TimeUnit.SECONDS.sleep(random.nextInt(5));
}
}
}
配置获取类:
public class ConfigWatcher implements Watcher {
private String path;
private ZkConfigService configSer = ZkConfigService.getInstance();
public ConfigWatcher(String path) {
this.path = path;
}
public static void main(String[] args) throws InterruptedException, KeeperException, IOException {
ConfigWatcher w = new ConfigWatcher("db.url");
w.display();
Thread.sleep(Long.MAX_VALUE);
}
public void display() throws KeeperException, InterruptedException {
String value = configSer.read(path, this);
System.out.printf("Read %s as %s\n", path, value);
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
display();
} catch (InterruptedException e) {
System.err.println(e);
Thread.currentThread().interrupt();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
}
—每次获取时都设置一个监控
分别启动配置更新和获取类,控制台打印如下:
- 作者:luangeng
- 主页:https://wawazhua.cn
- 本文出处:https://wawazhua.cn/post/java/zookeeper/zookeeper-config/
- 版权声明:禁止转载-非商用-非衍生