curator-recipes模块提供了zookeeper的一些典型应用场景的使用参考。在curator中,事件监听的支持由curator-recipes模块提供。下面将对这些监听器进行介绍。
监听客户端连接状态
Curator客户端与zookeeper连接的过程其实是一个异步过程,Curator为我们提供了一个监听器监听连接的状态,根据连接的状态做相应的处理。
private CountDownLatch countDownLatch = new CountDownLatch(1); @Test public void testClientConnStateListener() throws InterruptedException { int retryIntervalMs = 1000; RetryPolicy retryPolicy = new RetryForever(retryIntervalMs); CuratorFramework testConnStateListenerClient = CuratorFrameworkFactory.builder() .connectString(ZookeeperHelper.zkAddress) .sessionTimeoutMs(ZookeeperHelper.sessionTimeout) .retryPolicy(retryPolicy) .build(); //添加监听器 testConnStateListenerClient.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if (newState == ConnectionState.CONNECTED) { try { System.out.println("connected established"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); //释放锁 } else { System.out.println("connection state : " + newState.name()); } } }); testConnStateListenerClient.start(); //加锁,暂不往下执行 countDownLatch.await(); testConnStateListenerClient.close(); }
NodeCache
可以监到当前节点数据的变化
@Test public void testNodeDataListener() throws Exception { String node_to_listen = "/listened_node"; client.create() .creatingParentContainersIfNeeded() //自动递归创建父节点 .withMode(CreateMode.PERSISTENT) .forPath(node_to_listen); NodeCache nodeCache = new NodeCache(client, node_to_listen, false); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("Node data is changed, new data: " + new String(nodeCache.getCurrentData().getData())); } }); nodeCache.start(); Thread.sleep(1000); client.setData() .forPath(node_to_listen, "new data".getBytes());//更新节点的数据 Thread.sleep(1000); nodeCache.close(); client.delete().deletingChildrenIfNeeded().forPath(node_to_listen); }
PathChildrenCache
(1)可以监听到当前节点下的孩子节点的变化,但是孩子节点下面的孩子节点的事情不能监听。 (2)可以监听到的事件:节点创建、节点数据的变化、节点删除等
@Test public void testChildrenNodeListener() throws Exception { String parent_node = "/parent_node"; String child_node = parent_node + "/child"; PathChildrenCache pathChildrenCache = new PathChildrenCache(client, parent_node, false); PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: //子节点被添加 System.out.println("CHILD_ADDED: " + event.getData().getPath()); break; case CHILD_REMOVED: //子节点被删除 System.out.println("CHILD_REMOVED: " + event.getData().getPath()); break; case CHILD_UPDATED: //子节点数据变化 System.out.println("CHILD_UPDATED: " + event.getData().getPath()); break; default: break; } } }; pathChildrenCache.getListenable().addListener(pathChildrenCacheListener); pathChildrenCache.start(); client.create() .creatingParentContainersIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath(child_node); Thread.sleep(1000); client.setData() .forPath(child_node, "new data".getBytes());//更新节点的数据 Thread.sleep(1000); pathChildrenCache.close(); client.delete().deletingChildrenIfNeeded().forPath(parent_node); }
TreeCache
(1)可以监听到指定节点下所有节点的变化。比如当前节点是”/node”,添加了TreeCacheListener后,不仅可以监听节点 "/node/child" 节点的变化,还能监听孙子节点 "/node/child/grandson"的变化。
(2)可以监听到的事件:节点创建、节点数据的变化、节点删除等@Test public void testTreeListener() throws Exception { String parent_path = "/tree_node_parent"; client.create() .creatingParentContainersIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath(parent_path); Thread.sleep(1000); TreeCache treeCache = new TreeCache(client, parent_path); treeCache.start(); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception { switch (event.getType()) { case NODE_ADDED: System.out .println("TreeNode added: " + event.getData() .getPath() + " , data: " + new String(event.getData().getData())); break; case NODE_UPDATED: System.out .println("TreeNode updated: " + event.getData() .getPath() + " , data: " + new String(event.getData().getData())); break; case NODE_REMOVED: System.out .println("TreeNode removed: " + event.getData() .getPath()); break; default: break; } } }); //创建孩子节点 String child_path = parent_path + "/child"; client.create() .creatingParentContainersIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath(child_path); Thread.sleep(1000); //创建孙子节点 String grandson_path = child_path + "/grandson"; client.create() .creatingParentContainersIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath(grandson_path); Thread.sleep(1000); //更新孙子节点数据 client.setData().forPath(grandson_path, "new_data".getBytes()); Thread.sleep(1000); //删除孙子节点 client.delete().deletingChildrenIfNeeded().forPath(grandson_path); Thread.sleep(1000); treeCache.close(); client.delete().deletingChildrenIfNeeded().forPath(parent_path); }