博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
zookeeper - curator的监听事件
阅读量:5780 次
发布时间:2019-06-18

本文共 6930 字,大约阅读时间需要 23 分钟。

hot3.png

curator-recipes模块提供了zookeeper的一些典型应用场景的使用参考。在curator中,事件监听的支持由curator-recipes模块提供。下面将对这些监听器进行介绍。

监听客户端连接状态

Curator客户端与zookeeper连接的过程其实是一个异步过程,Curator为我们提供了一个监听器监听连接的状态,根据连接的状态做相应的处理。

private CountDownLatch countDownLatch = new CountDownLatch(1);    @Test    public void testClientConnStateListener() throws InterruptedException {        int retryIntervalMs = 1000;        RetryPolicy retryPolicy = new RetryForever(retryIntervalMs);        CuratorFramework testConnStateListenerClient = CuratorFrameworkFactory.builder()                .connectString(ZookeeperHelper.zkAddress)                .sessionTimeoutMs(ZookeeperHelper.sessionTimeout)                .retryPolicy(retryPolicy)                .build();        //添加监听器        testConnStateListenerClient.getConnectionStateListenable().addListener(new ConnectionStateListener() {            @Override            public void stateChanged(CuratorFramework client, ConnectionState newState) {                if (newState == ConnectionState.CONNECTED) {                    try {                        System.out.println("connected established");                        Thread.sleep(1000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    countDownLatch.countDown(); //释放锁                } else {                    System.out.println("connection state : " + newState.name());                }            }        });        testConnStateListenerClient.start();        //加锁,暂不往下执行        countDownLatch.await();        testConnStateListenerClient.close();    }

NodeCache

可以监到当前节点数据的变化

@Test    public void testNodeDataListener() throws Exception {        String node_to_listen = "/listened_node";        client.create()                .creatingParentContainersIfNeeded() //自动递归创建父节点                .withMode(CreateMode.PERSISTENT)                .forPath(node_to_listen);        NodeCache nodeCache = new NodeCache(client, node_to_listen, false);        nodeCache.getListenable().addListener(new NodeCacheListener() {            @Override            public void nodeChanged() throws Exception {                System.out.println("Node data is changed, new data: " +                        new String(nodeCache.getCurrentData().getData()));            }        });        nodeCache.start();        Thread.sleep(1000);        client.setData()                .forPath(node_to_listen, "new data".getBytes());//更新节点的数据        Thread.sleep(1000);        nodeCache.close();        client.delete().deletingChildrenIfNeeded().forPath(node_to_listen);    }

PathChildrenCache

(1)可以监听到当前节点下的孩子节点的变化,但是孩子节点下面的孩子节点的事情不能监听。 (2)可以监听到的事件:节点创建、节点数据的变化、节点删除等

@Test    public void testChildrenNodeListener() throws Exception {        String parent_node = "/parent_node";        String child_node = parent_node + "/child";        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, parent_node, false);        PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {            @Override            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {                switch (event.getType()) {                    case CHILD_ADDED: //子节点被添加                        System.out.println("CHILD_ADDED: " + event.getData().getPath());                        break;                    case CHILD_REMOVED: //子节点被删除                        System.out.println("CHILD_REMOVED: " + event.getData().getPath());                        break;                    case CHILD_UPDATED: //子节点数据变化                        System.out.println("CHILD_UPDATED: " + event.getData().getPath());                        break;                    default:                        break;                }            }        };        pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);        pathChildrenCache.start();        client.create()                .creatingParentContainersIfNeeded()                .withMode(CreateMode.PERSISTENT)                .forPath(child_node);        Thread.sleep(1000);        client.setData()                .forPath(child_node, "new data".getBytes());//更新节点的数据        Thread.sleep(1000);        pathChildrenCache.close();        client.delete().deletingChildrenIfNeeded().forPath(parent_node);    }

TreeCache

(1)可以监听到指定节点下所有节点的变化。比如当前节点是”/node”,添加了TreeCacheListener后,不仅可以监听节点 "/node/child" 节点的变化,还能监听孙子节点 "/node/child/grandson"的变化。

(2)可以监听到的事件:节点创建、节点数据的变化、节点删除等

@Test    public void testTreeListener() throws Exception {        String parent_path = "/tree_node_parent";        client.create()                .creatingParentContainersIfNeeded()                .withMode(CreateMode.PERSISTENT)                .forPath(parent_path);        Thread.sleep(1000);        TreeCache treeCache = new TreeCache(client, parent_path);        treeCache.start();        treeCache.getListenable().addListener(new TreeCacheListener() {            @Override            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {                switch (event.getType()) {                    case NODE_ADDED:                        System.out                                .println("TreeNode added: " + event.getData()                                        .getPath() + " , data: " + new String(event.getData().getData()));                        break;                    case NODE_UPDATED:                        System.out                                .println("TreeNode updated: " + event.getData()                                        .getPath() + " , data: " + new String(event.getData().getData()));                        break;                    case NODE_REMOVED:                        System.out                                .println("TreeNode removed: " + event.getData()                                        .getPath());                        break;                    default:                        break;                }            }        });        //创建孩子节点        String child_path = parent_path + "/child";        client.create()                .creatingParentContainersIfNeeded()                .withMode(CreateMode.PERSISTENT)                .forPath(child_path);        Thread.sleep(1000);        //创建孙子节点        String grandson_path = child_path + "/grandson";        client.create()                .creatingParentContainersIfNeeded()                .withMode(CreateMode.PERSISTENT)                .forPath(grandson_path);        Thread.sleep(1000);        //更新孙子节点数据        client.setData().forPath(grandson_path, "new_data".getBytes());        Thread.sleep(1000);        //删除孙子节点        client.delete().deletingChildrenIfNeeded().forPath(grandson_path);        Thread.sleep(1000);        treeCache.close();        client.delete().deletingChildrenIfNeeded().forPath(parent_path);    }

转载于:https://my.oschina.net/thinwonton/blog/1036321

你可能感兴趣的文章
NIS MAP
查看>>
差异分析定位Ring 3保护模块
查看>>
2013年7月12日“修复 Migration 测试发现的 Bug”
查看>>
vim文本编辑器详解
查看>>
学习vue中遇到的报错,特此记录下来
查看>>
CentOS7 编译安装 Mariadb
查看>>
32位系统和64位系统的选择
查看>>
01配置管理过程指南
查看>>
python 搭配 及目录结构
查看>>
设计模式总结
查看>>
os/exec
查看>>
iOS后台任务,争取一段时间处理后事
查看>>
如何在linux下修改组权限
查看>>
把jpg转换成pdf软件
查看>>
RestTemplate
查看>>
build
查看>>
nutch2.1+mysql报错及解决
查看>>
spring boot打jar包发布
查看>>
《JavaScript高级程序设计》节点层次和DOM操作技术
查看>>
form 提交多个对象及springMVC接收
查看>>