本文最后更新于:2025年2月11日 晚上
zookeeper 常用命令
1.查询
2.创建节点
1 2 3 4
| create [-s] [-e] znode名称 znode数据
# -s: sequence:有序节点 # -e: ephemeral:临时节点
|
3.修改节点数据
4.删除节点数据
1 2 3 4
| delete znode名称
deleteall znode名称
|
zookeeper 集群
1.zookeeper 投票策略
1.每一个 zookeeper 服务都会被分配一个全局唯一分配的 myid
2.zookeeper 在执行写数据时,每一个节点都有一个自己的 FIFO 队列。保证写每一个数据的时候,顺序是不会乱的,zookeeper 还会给每一个数据分配一个全局唯一的 zxid,数据越新,zxid 就越大
选举 Leader:
1.选举出 zxid 最大的节点作为 Leader
2.在 zxid 相同的节点中,选举出一个 myid 最大的节点,作为 Leader
2.zooker 集群
docker-compose.yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| version: "3.1" services: zk1: image: zookeeper restart: always container_name: zk1 ports: - 2181:2181 environment: ZOO_MY_ID: 1 ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181 zk2: image: zookeeper restart: always container_name: zk2 ports: - 2182:2181 environment: ZOO_MY_ID: 2 ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181 zk3: image: zookeeper restart: always container_name: zk3 ports: - 2183:2181 environment: ZOO_MY_ID: 3 ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
|
java 操作 zookeeper
1.java 连接 zookeeper
1.创建 Maven 工程
2.导入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| <dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.7.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.1.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
|
3.编写连接 zookeeper 集群的工具类
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class ZkUtil { public static CuratorFramework cf(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,2); CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString("192.168.3.6:2181,192.168.3.6:2182,192.168.3.6:2183") .retryPolicy(retryPolicy) .build(); cf.start(); return cf; } }
|
4.测试
1 2 3 4 5 6
| public class Demo1 { @Test public void connect(){ CuratorFramework cf = ZkUtil.cf(); } }
|
2.java 操作 Znode 节点
1.查询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class Demo2 { CuratorFramework cf = ZkUtil.cf();
@Test public void getChildren() throws Exception{ List<String> strings=cf.getChildren().forPath("/"); for(String string : strings){ System.out.println(string); } }
@Test public void getData() throws Exception { byte[] bytes = cf.getData().forPath("/test"); System.out.println(new String(bytes,"UTF-8")); } }
|
2.添加
1 2 3
| @Test public void create() throws Exception { cf.create().withMode(CreateMode.PERSISTENT).forPath("/test2","uuu".getBytes()); }
|
3.修改
1 2 3 4
| @Test public void update() throws Exception { cf.setData().forPath("/test2","oooo".getBytes()); }
|
4.删除
1 2 3 4
| @Test public void delete() throws Exception { cf.delete().deletingChildrenIfNeeded().forPath("/test2"); }
|
5.查看 znode 的状态
1 2 3 4 5
| @Test public void stat() throws Exception { Stat stat = cf.checkExists().forPath("/test"); System.out.println(stat); }
|
3.监听通知机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class Demo3 { CuratorFramework cf = ZkUtil.cf();
@Test public void listen() throws Exception{ NodeCache nodeCache=new NodeCache(cf,"/test"); nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { byte[] data = nodeCache.getCurrentData().getData(); Stat stat = nodeCache.getCurrentData().getStat(); String path = nodeCache.getCurrentData().getPath(); System.out.println("监听的节点是:"+path); System.out.println("节点现在的数据是:"+new String(data,"UTF-8")); System.out.println("节点状态是:"+stat); } }); System.out.println("开始监听"); System.in.read(); } }
|