Warning : Trying to access array offset on value of type bool in
/www/wwwroot/wql_luoqin_ltd/wp-content/themes/Sakura/inc/theme_plus.php on line
286
常见的Zookeeper javaAPI有三种:
原生 Java API:官方提供的API(本身不好用)
ZkClient:在原生的API上面进行了简化和封装(用起来较原生更为简单)
Curator:Apache的顶级项目目标就是为了简化Zookeeper客户端操作(常用)
一,原生Zookeeper API操作
一,POM依赖
pom依赖:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
二,获取zk客户端对象
ZK的原生API的主要提供Zookeeper类获取客户端对象
一, Zookeeper类的构造方法
参数:
connectString:单机或者集群的IP地址和端口(多个IP和端口用逗号分隔)
sessionTimeout:会话的超时时间(单位:毫秒)
watcher:监听
三,创建节点
一,通过Create方法创建各种类型节点
参数:
path:创建节点的路径
data:节点数据(需要是byte数组类型)
acl:主要做权限控制
createMode:创建节点的类型(常量)
二,createMode枚举类
createMode类主要通过枚举声明创建节点的类型
① 枚举对象(分别代表不同的节点类型)
注:主要使用上面四种,其他的在原基础上使用了不同的策略
② createMode的方法
isEphemeral:是否为短暂节点目录
isSequential:是否为带序号节点目录
values:返回枚举类型的常量
四,获取节点信息
一,获取节点的值
通过getData方法获取节点的值
参数:
path:节点路径
boolean watch:是否开启监听
stat:节点的统计
Watch watch:可以指定一个轮询监听器
二,获取子节点的信息
通过getChildren方法获取节点默认只获取节点名
返回值是一个String类型的list数组
参数:
path:节点的路径
boolean watch:是否开启监听
Stat:统计节点数
Watch watch:可以指定一个轮询监听器
五,节点的修改删除判断是否存在
一,节点的修改
通过setData方法修改节点的数据
参数:
path:节点路径
data:要重新设置的节点数据
version:设置版本号
二,节点的删除
通过delete方法删除节点
三,判断节点是否操作
通过exists方法判断节点是否存在
参数:
path:节点的路径
boolean watch:是否开启监听
Watch watch:可以指定一个轮询监听器
注:其实有一些参数比如stat我看不到它的具体类,不知道它到底是什么东西,原生的API有很多类,但客户端操作就这么几个
原生API存在的问题:
会话连接是异步的,需要手动处理,比如使用CountDownLatch
Watch需要重复注册,不然不能生效
开发的复杂度比较高
不支持多节点删除和创建,需要递归
案例:节点的增删改查和监听
public class Zk_Protogenesis {
//连接的集群IP和端口
private static String connectString = "192.168.68.151:2181,192.168.68.152:2181,192.168.68.153:2181";
private static int sessiontimeout = 2000;
private static ZooKeeper zkcli;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
//ZK客户端分别有三个参数connectString,sessionTimeout,watcher
zkcli= new ZooKeeper(connectString, sessiontimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//将监听方法放入process中,实现持续监听(zk默认监听只有一次)
try {
GetStatic();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println("连接成功!");
DeleteNode();
CreateNode();
GetStatic();
}
//创建节点目录
public static void CreateNode() throws KeeperException, InterruptedException {
//建一个持久节点
String node = zkcli.create("/WQL","FQ_ZK".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(node);
}
//删除节点
public static void DeleteNode() throws KeeperException, InterruptedException {
zkcli.delete("/WQL",0);
}
//获取节点信息
public static void GetStatic() throws KeeperException, InterruptedException {
//获取所有节点并开启监听
List<String> child = zkcli.getChildren("/",true);
//打印节点
for(String node:child){
System.out.println(node);
}
//延时
Thread.sleep(Integer.MAX_VALUE);
}
}
二,zkclient API操作
一,POM依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
二,获取zk客户端对象
通过ZkClient类获取客户端对象
构造方法:
参数:
String zkServers:服务器IP地址和端口号
int sessionTimeout:会话的超时时间
int connectionTimeout:连接超时时间
ZkSerializer zkSerializer:序列化方式(一般使用java默认序列化方式SerializableSerializer)
三,创建节点
Zkclient创建节点的方式比原生的Zookeeper API更加细化,它提供了单独创建持久和短暂节点的方法,而不需要指定CreateMolde的类型
一,指定CreateMolde创建
通过指定Create创建(和原生API差不多)
参数:
path:节点路径
data:节点数据
mode:节点类型(枚举类型)
acl:权限
二,创建持久和持久序号节点
主要是在原生API上做了一层封装
方法:
1,持久节点:createPersisent()
2,持久序号节点:createPersistentSeqential()
参数:
path:节点路径
data:节点数据
acl:权限
三,创建短暂和短暂序号节点
方法:
1,短暂节点:createEphemeral()
2,短暂序号节点:createEphemeralSequential()
参数:
path:节点路径
data:节点数据
acl:权限
四,查询节点
一,获取节点下的所有子节点
获取子节点方法:getChildren()
统计子节点个数:countChildren()
参数:
path:节点路径
watch:是否开启监听
二,获取某个节点的数据
原生API使用的是getData,zkClient使用的是readData
注:和原生不同,序列化的读写必须是一样的,加入写入是java序列化的对象,读取时也必须是序列化对象,假如是字符串会报错
方法:readData()
参数:
path:节点路径
watch:是否开启监听
stat:查看节点的状态信息(选择性)
五,修改和删除节点
1,修改节点:
① 修改节点数据:writeData()
② 修改节点属性信息:writeDataReturnStat()
2,普通删除:delete()
3,递归删除:deleteRecursive()
六,节点的监听
zkClient的监听方法内部封装了原生API中的Watch的process方法,它是持久监听,而不需要像原生API一样要持久监听必须将方法放入process()中
一,IZkDataListener类
节点监听后的假如发生改变,会触发的事件:
1,handleDataChange():节点发生改变触发
2,handleDataDeleted():节点删除触发
二,节点监听
1,监听节点数据变化
方法:subscribeDataChanges()
2,监听节点目录变化
方法:subscribeChildChanges()
3,监听节点属性的变化
方法:subscribeStateChanges()
参数:
path:监听的节点路径
IZkDataListener:触发器
案例:节点的增删改查和监听
public class Zk_Client {
private static String connectString = "192.168.68.151:2181,192.168.68.152:2181,192.168.68.153:2181";//服务器IP地址和端口号
private static int sessiontimeout = 20000;//会话的超时时间
private static int connectionTimeout =20000;//连接超时时间
private static ZkClient zkClient;//客户端对象(成员静态变量)
public static void main(String[] args) {
zkClient = new ZkClient(connectString,sessiontimeout,connectionTimeout,new SerializableSerializer());
System.out.println(zkClient);
}
//创建节点(创建成功会返回节点名称的字符串)
public static void CreateNode(){
//创建持久节点
zkClient.create("/FQ","6666", CreateMode.PERSISTENT);
//创建持久序号节点
zkClient.createPersistentSequential("/FQ/WQL","9999");
//创建短暂节点
zkClient.createEphemeral("/FQ/LingShi","1111");
//创建短暂序号节点
zkClient.createEphemeralSequential("/FQ/LingShi_Seq","2222");
}
//删除节点
public static void DeleteNode(){
zkClient.deleteRecursive("/FQ");
}
//查询节点下的子节点并获取数据
public static void SelectStat(){
//获取子节点列表
List<String> children = zkClient.getChildren("/FQ");
//获取节点数据
for(String data:children){
String NodeData = zkClient.readData("/FQ/" + data);
System.out.println(NodeData);
}
}
//修改节点
public static void AlterNode(){
zkClient.writeData("/FQ","LOVE");
}
//节点的监听
public static void WatchNode(){
zkClient.subscribeDataChanges("/FQ", new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("数据被修改!");
}
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println("数据被删除!");
}
});
}
}
三,Curator框架操作Zookeeper
Curator是Apache Zookeeper的java客户端库,它相比与原生API和ZkClient更加灵活,更加易于操作,解决了原生API开发分布式的难点,是现在主流操作Zookeeper的框架
Curator和Zookeeper的兼容问题:
pom依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
一,获取客户端对象
Curator的客户端对象为CuratorFramework
三种API分别的客户端名:
Curator获取客户端CuratorFramework对象有两种方式:
通过工厂CurtaorFrameworkFactory直接创建
通过工厂CurtaorFrameworkFactory链式Builder创建
一,工厂CurtaorFrameworkFactory直接创建客户端对象
方法:
直接创建客户端对象方法:newClient()
参数:
二,工厂CurtaorFrameworkFactory链式Builder创建客户端对象
通过builder()得到链式对象Builder
Builder类的方法:
connectString:连接Zk的IP地址和端口号
sessionTimeoutMs:会话超时时间
connectionTimeoutMs:连接超时时间
retryPolicy:重试策略
namespace:设置命名空间(就是一个父节点目录)
getNameSpace:获取命名空间
getRetryPolicy:获取重试策略的类型
……
注:名称空间就是在创建客户端的时候创建一个节点目录,之后使用的节点操作都在这个目录下完成
三,重试的策略retryPolicy
顶级接口retryPolicy
实现类:
ExponentialBackoffRetry: 在重试之间增加(最多限定)睡眠时间,重试一定次数
RetryForever: 总是允许重新尝试,直到连接成功
RetryNTimes: 重试最多次数的重试策略
RetryOneTime: 只重试一次的重试策略
RetryUntilElapsed: 在给定时间过去之前完成重试,否则抛异常
具体方法不详细写,看API
二,创建节点
Curtaor通过客户端Create创建节点时对节点做了二次封装并不会直接创建,底层通过CreateBuilder实现节点创建
① CuratorFramework客户端对象create()返回CreateBuilder对象
② 进入CreateBuilder对象看实现类(责任链模式)
只有一个实现CreateBuilderImpl
③ 进入CreateBuilderImpl看他的方法
这个类是真正实现创建的类
CreateBuilderImpl的常用API:
1,forPath():创建节点
参数:不像之前的API创建,它的参数中没有类型选项
path:节点路径
data:节点数据(必须转化为byte数组)
注:默认data为IP地址
2,withMold():声明节点类型
参数:
3,creatingParentsIfNeeded():创建多级节点,需要就加上即可(这是Curtaor框架独有多级创建)
4,withAcl:设置权限
三,获取节点信息
一,获取节点数据
查询节点数据和创建节点一样,也是进行了二次封装,实际底层通过GetDataBuilder实现
① CuratorFramework客户端对象getData()返回GetDataBuilder对象
② 进入GetDataBuilder对象查看实现类(责任链模式)
③ 进入GetDataBuilderImpl看他的方法
GetDataBuilderImpl的常用API:
二,获取子节点
和之前一样的设计
① CuratorFramework客户端对象getChildren()返回GetChildrenBuilder对象
② 进入GetChildrenBuilder对象查看实现类(责任链模式)
③ 进入GetChildrenBuilderImpl看他的方法
GetChildrenBuilderImpl的常用API:
三,获取节点属性信息
也是一样,不详说
① 客户端getState()方法
② CurtaorFrameworkState接口具体实现
四,修改删除节点
一,修改节点
① CuratorFramework客户端对象setData()返回SetDataBuilder对象
② 进入SetDataBuilder对象查看实现类(责任链模式)
③ 进入SetDataBuilderImpl看他的方法
二,节点的删除
① CuratorFramework客户端对象delete()返回DeleteBuilder对象
② 进入DeleteBuilder对象查看实现类(责任链模式)
③ 进入DeleteBuilderImpl看他的方法
五,Watch事件监听
Zookeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,Zookeeper服务器端会将事件通知到感兴趣的客户端上,该机制是Zookeeper实现分布式协调服务的重要特性
原生API中Watch存在的问题:Zookeeper原生API支持通过注册Watch来进行事件监听,但是一次只能监听一次,要循环监听需要将方法放入到peocess()中进行反复注册Watch,比较浪费性能,使用也很麻烦
Curator引入了Cache缓存来实现对Zookeeper服务端事件的监听
Curator提供了三种Watch:
NodeCache:只是监听某一个特定的节点
PathChilrenCache:监控一个ZNode的子节点
TreeCache:可以监控整个树上的所有节点,类似于NodeCache和PathChildrenCache的组合
cache在org.apache.curator.framework.recipes.cache包下
一,NodeCache监听
一,NodeCache类
NodeCache的构造:
传入参数:
client:客户端对象
path:监听的节点路径
dataIsCompressed:是否对传输的数据进行压缩,默认是不压缩
NodeCache的方法:
start():启动方法,内有一个boolean,true时启动时加载缓冲区开启缓存
getClient():反向获取客户端
getCurrentData():返回当前节点数据
getListenable():返回一个监听列表,可以add添加监听事件
二,Listenable监听列表
在监听列表中可以增加监听事件,当Node目录被修改,监听事件被触发
二,PathChildrenCache监听
PathChildrenCache的构造方法:
参数:
PathChildrenCache的方法:
三,TreeCache监听
TreeCache的构造只有一个:
参数:
client:客户端对象
path:监听的节点路径
TreeCache的方法:
方法:
size:返回结点数
iterator:返回节点迭代器
案例:节点的增删改查和监听
public class Zk_Curator {
//创建一个重试策略类型
static RetryPolicy retryPolicy =new ExponentialBackoffRetry(3000,10);
//连接的IP和端口
static String connectString = "192.168.68.151:2181,192.168.68.152:2181,192.168.68.153:2181";
//会话超时时间(连接有时需要时间,不能把时间设置太短)
static int sessionTimeoutMs = 20000;
//连接超时时间
static int connectTimeoutMs = 20000;
//客户端对象
static CuratorFramework curatorFramework;
public static void main(String[] args) {
//获取客户端对象
curatorFramework = CuratorFrameworkFactory.newClient(connectString,sessionTimeoutMs,connectTimeoutMs,retryPolicy);
//开启客户端(和其他API有些不一样,它是同步的)
curatorFramework.start();
}
//创建节点
public static void CreateNode() throws Exception {
//创建临时节点
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/WQL","FQ_6".getBytes());
}
//修改节点
public static void AlterNode() throws Exception {
curatorFramework.setData().forPath("/WQL","Alter".getBytes());
}
//删除节点
public static void DeleteNode() throws Exception {
curatorFramework.delete().inBackground(new BackgroundCallback(){//添加删除回调方法
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("删除/WQL节点成功!");
}
}).forPath("/WQL");
}
//NodeCache监听节点
public static void WatchNode() throws Exception {
//创建NodeCache
NodeCache nodeCache = new NodeCache(curatorFramework,"/WQL",false);
//获取监听事件列表
ListenerContainer<NodeCacheListener> list = nodeCache.getListenable();
//注册监听事件
list.addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点被修改!");
}
});
//开启监听
nodeCache.start(false);
}
//关闭客户端
public static void ColseCurator(){
curatorFramework.close();
}
}
Comments | NOTHING
Warning: Undefined variable $return_smiles in /www/wwwroot/wql_luoqin_ltd/wp-content/themes/Sakura/functions.php on line 1109