Zookeeper
Zookeeper是一个分布式开源的协调服务(感觉有点像nacos)
目录
(55条消息) Zookeeper安装与配置_Murmure__的博客-CSDN博客
1.在bin目录在执行这个工具(连接server服务端口)开始客户端
我们对于Zookeeper的服务端事件监听也是用的Curator来实现的——>引入了Cache
zk安装
(55条消息) Zookeeper安装与配置_Murmure__的博客-CSDN博客
介绍
简介:Zookeeper是一个分布式开源的协调服务(感觉有点像nacos)
功能:配置管理,分布式锁,集群管理
配置管理:比如三个服务节点都用同一个配置文件,一个个去配置就很麻烦,我们直接在配置中心配置,abc节点连接即可(Nacos一站式解决)
(51条消息) Nacos注册中心_Fairy要carry的博客-CSDN博客_nacos注册中心
(51条消息) nacos实战项目中的配置_Fairy要carry的博客-CSDN博客_nacos配置有哪些
(51条消息) 面试-SpringCloud常见组件和注册表结构+nacos_Fairy要carry的博客-CSDN博客
分布式锁:
主要是解决数据一致性,比如我们的seata就是解决分布式情况下数据一致性问题
(51条消息) 缓存同步-Canal_Fairy要carry的博客-CSDN博客_canal同步缓存(51条消息) Seata的几种事务模式_Fairy要carry的博客-CSDN博客_seata 模式(51条消息) 缓存同步-Canal_Fairy要carry的博客-CSDN博客_canal同步缓存
当多个节点对数据进行更改,你加synchornized这种锁只是在这个当前节点修改,比较JVM不一样了,这时候就要引入我们的分布式锁去保证数据的一致性,话说seata还跟这个有点不一样,它主要是应用在数据库,分布式下,各个服务数据库不一样,要保证数据一致性(引入了XA,AT,TCC等模式)
集群管理
就是类似于注册中心,拉取服务用的
Zookeeper的数据模型
节点也可以有子节点,允许少量的数据(1MB)存储在该节点下
节点四大类:
1.PERSISENT: 持久化节点——>就是宕机客户端与服务端断开再连时,数据还是保持一致性的
2.EPHEMERAL:临时节点——>断开之后,数据节点就没了,相当于一次会话
服务端常见命令操作
#在bin目录下
./zkServer.sh restart
客户端命令操作
客户端工具zkCli.sh
1.在bin目录在执行这个工具(连接server服务端口)开始客户端
./zkCli.sh -server localhost:2181
2.常见增删改查命令
3.对节点的CRUD
创建临时节点
create -e /test2
创建顺序节点
create -s /test2
创建临时顺序节点
create -es /test2
发现临时节点消失了
4.查看节点信息
ls -s /Path
Java API的操作
1.建立连接
/**
* 1.建立连接
*/
@Before
public void testConnect() throws Exception {
//1.1第一种方式
// CuratorFrameworkFactory client = CuratorFrameworkFactory.class.getConstructor().newInstance();
/**
* 1.2创建一个新的客户端
* 参数: connectString - 连接到服务器的列表
* sessionTimeoutMs - 会话超时 ,多久没连就挂
* connectionTimeoutMs - 连接超时 ,连了多久
* retryPolicy -使用重试策略返回:客户端
*/
RetryPolicy retry = new ExponentialBackoffRetry(3000, 10);//重试策略:3s重试1次,最多10次
// client = CuratorFrameworkFactory.newClient(
// "192.168.184.129:2181",
// 60 * 1000,
// 15 * 1000, retry);
开启连接
// client.start();
/**
* 1.3第二种方式
* namespace(“xxx”)相当于是一个命名空间,方便区分,在根目录前加前缀
*/
client = CuratorFrameworkFactory.builder().connectString("192.168.184.129:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retry).namespace("itheima").build();
client.start();
}
2.创建节点并且赋值
/**
* 2.创建节点:持久 临时 顺序 数据
* 2.1基本创建
* 2.2创建带有数据的
* 2.3多级节点+设置节点类型
*/
@Test
public void testCreate() throws Exception {
//1.基本创建持久化
String path = client.create()
.withMode(CreateMode.EPHEMERAL)
.forPath("/app3", "hehe".getBytes(StandardCharsets.UTF_8));
System.out.println(path);
//2.让他不结束
while (true) {
}
}
3.创建子节点
/**
* 创建子节点
*
* @throws Exception
*/
@Test
public void testMulti() throws Exception {
//creatingParentsIfNeeded():如果父节点不存在就进行创建
String path = client.create()
.creatingParentsIfNeeded()
.forPath("/app4/p1");
System.out.println(path);
}
4.查询节点数据
/**
* 查询节点数据
*/
@Test
public void testGet() throws Exception {
byte[] bytes = client.getData().forPath("/app3");
System.out.println(new String(bytes));
}
5.查询当前路径下所有子节点
/**
* 查询子节点
* getChildren().forPath():得到目录下所有子节点
* 当前命名空间下的子目录节点
*/
@Test
public void testChildren() throws Exception {
List<String> path = client.getChildren().forPath("/");
System.out.println(path);
}
6.查询节点状态
/**
* 查询节点状态信息
* new Stat()
* client.getData():查询数据
* .storingStateIn(stat).forPath():查询节点状态值,将状态值给到Stat
*/
@Test
public void testState() throws Exception {
Stat stat = new Stat();
System.out.println(stat);
//查询节点状态信息ls -s
client.getData().storingStatIn(stat).forPath("/app3");
System.out.println(stat);
}
7.利用版本version修改节点数据
/**
* 根据版本判断修改节点状态值
* setData().withVersion().forPath()
* 如果版本不一致就不允许更改,保证了数据一致性,操作的原子性
*/
@Test
public void testSetForVersion() throws Exception {
Stat stat = new Stat();
//1.查询节点状态
client.getData().storingStatIn(stat).forPath("/app3");
//2.得到查询出来节点状态中的版本version
int version = stat.getVersion();
System.out.println("该节点数据的版本:"+version);
client.setData().withVersion(version).forPath("/app3","newLife".getBytes(StandardCharsets.UTF_8));
}
8.删除节点
/**
* 4.删除节点
* 1.删除单个节点 delete().forPath()
* 2.删除带有子节点的节点 delete().deleteChildrenIfNeeded().forPath
* 3.必须成功的删除 delete().guaranteed().forPath()
* 4.回调 inBackground(new BackgroundCallBack()).forPath()
*
*/
@Test
public void testDelete() throws Exception {
//1.删除单个节点
client.delete().forPath("/app3");
}
@Test
public void testDeleteNeeded() throws Exception {
//1.删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app4");
}
/**
* 可能删除的时候(客户端和Server需要建立连接),可能出现耗时较长导致删除失败的情况
* delete().guaranteed().forPath()
*/
@Test
public void testMustDelete() throws Exception {
//1.必须成功删除
client.delete().guaranteed().forPath("app2");
}
9.回调机制
/**
* 回调机制——>这里是删除后的回调
* delete().guaranteed().inBackground(new BackgroundCallBack())
* @throws Exception
*/
@Test
public void testBack() throws Exception {
//1.调用回调方法inBackground->并且实现参数接口的回调机制processResult
client.delete().guaranteed().inBackground((client, event) -> {
System.out.println("当前节点被删除~");
System.out.println(event);//2.打印节点信息——>被CuratorEvent捕捉
}).forPath("/app2");
}
Watch事件的监听
主要用于发布订阅,比如三个子节点订阅了APP1,当其中一个节点修改了APP1的数据时,会发布信息给到其他两个节点告诉数据的变化
我们对比一下Redis的发布订阅
(53条消息) Redis配置文件+发布订阅+新数据类型_Fairy要carry的博客-CSDN博客
我们对于Zookeeper的服务端事件监听也是用的Curator来实现的——>引入了Cache
NodeCache:监听特定节点
PathChildrenCache:监听某节点的子节点们
TreeCache:监听整个树节点=NodeCache+PathChildrenCache
package com.wyh.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* @author diao 2022/9/4
*/
public class CuratorWatchTest {
private CuratorFramework client;
/**
* 1.建立连接
*/
@Before
public void testConnect() throws Exception {
RetryPolicy retry = new ExponentialBackoffRetry(3000, 10);//重试策略:3s重试1次,最多10次
/**
* 1.3第二种方式
* namespace(“xxx”)相当于是一个命名空间,方便区分,在根目录前加前缀
*/
client = CuratorFrameworkFactory.builder().connectString("192.168.184.129:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retry).namespace("itheima").build();
client.start();
}
/**
* 1.NodeCache监听当前节点
* @throws Exception
*/
@Test
public void testNodeCache() throws Exception {
NodeCache nodeCache = new NodeCache(client, "/app2");//1、监听的节点
//2、注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点发生变化");
//2.1变化的结果
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//3.开启监听
nodeCache.start(true);
//4.需要持续监听——>线程则需要一直存活
while (true) {
}
}
/**
* PathNodeCache监听子节点
* @throws Exception
*/
@Test
public void testPathChildrenCache() throws Exception {
//1.创建监听对象
PathChildrenCache childrenCache = new PathChildrenCache(client, "/app2", true);
//2.绑定监听器
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点发生变化...");
//2.1打印监听事件
System.out.println(pathChildrenCacheEvent);
//2.2判断监听事件类型判断哪些是不能监听的
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
//2.3当监听到的事件是update时
if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
System.out.println("数据变了!!!");
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(new String(data));
}
}
});
//3.开启
childrenCache.start();
//4.保证持续监听
while(true){
}
}
/**
* TreeNodeCache完成监听
* 1.new TreeCache封装连接以及节点
* 2.getListenable().addListener(new TreeCacheListener{
* childEvent();
* })
* @throws Exception
*/
@Test
public void testTreeCache() throws Exception {
//1.创建监听器
TreeCache treeCache = new TreeCache(client, "/app2");
//2.注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("节点发生变化");
System.out.println(treeCacheEvent);
byte[] data = treeCacheEvent.getData().getData();
System.out.println("改变后数据为:"+data);
}
});
//3.启动
treeCache.start();
//4.持续监听
while(true){
}
}
/**
* 3.释放资源
*/
@After
public void testClose() {
if (client != null) client.close();
}
}
分布式锁
Zookeeper分布式锁原理
1.首先先创建节点,比如三个client都要获取,那就在lock节点下创建三个节点(临时节点,保证宕机的时候删除)——>2.获取lock下的所有子节点,客户端getChildren返回所有子节点后比较大小,最小的先获取到锁,如果说用完之后,会将节点删除(类似于之前在finally中将锁释放一样),这里是client.close() ——>3.其他节点不是最小的那么就会创建一个事件监听器,监听比自己小的节点,监听删除事件——>4.当发现监听的节点被删除后,也就是锁释放了,那么此时再次判断是不是最小的,如果是,则获取锁,不是则重复以上操作
Curator实现分布锁API
分布式锁12306售票
1.将锁加到12306上面
1.任务类
package com.wyh.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
/**
* @author diao 2022/9/4
*/
public class Ticket12306 implements Runnable {
private volatile int tickets = 10;//数据库票数
private InterProcessMutex lock;
/**
* 1.通过构造方法初始化lock
*/
public Ticket12306() {
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client= CuratorFrameworkFactory.builder()
.connectString("192.168.184.129:2181")
.sessionTimeoutMs(60*1000)
.connectionTimeoutMs(15*1000)
.retryPolicy(retry).build();
client.start();
this.lock=new InterProcessMutex(client,"/lock");
}
/**
* 2.线程执行方法
*/
@Override
public void run() {
while (true) {
//1.加锁
try {
lock.acquire(3, TimeUnit.SECONDS);//本质其实就是cas
if (tickets > 0) {
System.out.println(Thread.currentThread() + ":" + tickets);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
//2.释放锁
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
2.主方法
package com.wyh.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Before;
/**
* @author diao 2022/9/4
*/
public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
//1.创建客户端
Thread t1 = new Thread(ticket12306, "携程");
Thread t2 = new Thread(ticket12306, "飞猪");
t1.start();
t2.start();
}
}
Zookeeper集群搭建
类似redis的分片集群,master互票
Redis分片集群_Fairy要carry的博客-CSDN博客_redis分片和集群
这里弄个假的,多个端口,而不是多个ip
1.创建三个单节点的zk
(55条消息) Zookeeper安装与配置_Murmure__的博客-CSDN博客
2.然后修改配置文件的端口,data,log路径
3.分别记录id值(对于每个节点的id文件)
4.让他们互相知道彼此(在配置文件中配置)
server.服务器id=服务器ip:服务器通信端口:服务器之间投票选举端口
此时已经构建集群环境
5.启动即可
三个都启动
usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start
然后看彼此节点状态发现2节点是leader
[root@localhost ~]# /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-cluster/zookeeper-2/bin/../conf/zoo.cfg
Client port found: 2182. Client address: localhost. Client SSL: false.
Mode: leader
异常测试
当你把主节点stop之后,再次start之后,主节点还是主节点
更多推荐
所有评论(0)