目录

zk安装

(55条消息) Zookeeper安装与配置_Murmure__的博客-CSDN博客 

介绍

 Zookeeper的数据模型

节点四大类:

服务端常见命令操作 

 客户端命令操作

 1.在bin目录在执行这个工具(连接server服务端口)开始客户端

 2.常见增删改查命令

3.对节点的CRUD

4.查看节点信息

  Java API的操作

1.建立连接

Watch事件的监听

 我们对于Zookeeper的服务端事件监听也是用的Curator来实现的——>引入了Cache

分布式锁

 Zookeeper分布式锁原理

 Curator实现分布锁API

 分布式锁12306售票

Zookeeper集群搭建


zk安装

(55条消息) Zookeeper安装与配置_Murmure__的博客-CSDN博客 

介绍

 简介:Zookeeper是一个分布式开源的协调服务(感觉有点像nacos)

 功能:配置管理,分布式锁,集群管理

配置管理:比如三个服务节点都用同一个配置文件,一个个去配置就很麻烦,我们直接在配置中心配置,abc节点连接即可(Nacos一站式解决)

(51条消息) Nacos注册中心_Fairy要carry的博客-CSDN博客_nacos注册中心

(51条消息) nacos实战项目中的配置_Fairy要carry的博客-CSDN博客_nacos配置有哪些

(51条消息) 面试-SpringCloud常见组件和注册表结构+nacos_Fairy要carry的博客-CSDN博客

分布式锁:

主要是解决数据一致性,比如我们的seata就是解决分布式情况下数据一致性问题

(51条消息) 缓存同步-Canal_Fairy要carry的博客-CSDN博客_canal同步缓存(51条消息) Seata的几种事务模式_Fairy要carry的博客-CSDN博客_seata 模式(51条消息) 缓存同步-Canal_Fairy要carry的博客-CSDN博客_canal同步缓存

当多个节点对数据进行更改,你加synchornized这种锁只是在这个当前节点修改,比较JVM不一样了,这时候就要引入我们的分布式锁去保证数据的一致性,话说seata还跟这个有点不一样,它主要是应用在数据库,分布式下,各个服务数据库不一样,要保证数据一致性(引入了XA,AT,TCC等模式)

 集群管理

就是类似于注册中心,拉取服务用的

 Zookeeper的数据模型

 节点也可以有子节点,允许少量的数据(1MB)存储在该节点下

节点四大类:

1.PERSISENT: 持久化节点——>就是宕机客户端与服务端断开再连时,数据还是保持一致性的

2.EPHEMERAL:临时节点——>断开之后,数据节点就没了,相当于一次会话

服务端常见命令操作 

#在bin目录下
./zkServer.sh restart

 

 客户端命令操作

客户端工具zkCli.sh

 1.在bin目录在执行这个工具(连接server服务端口)开始客户端

./zkCli.sh -server localhost:2181

 2.常见增删改查命令

 

 

3.对节点的CRUD

创建临时节点

create -e /test2

 创建顺序节点

create -s /test2

创建临时顺序节点

create -es /test2

 

 发现临时节点消失了

4.查看节点信息

ls -s /Path

  Java API的操作

1.建立连接

 /**
     * 1.建立连接
     */
    @Before
    public void testConnect() throws Exception {
        //1.1第一种方式
//        CuratorFrameworkFactory client = CuratorFrameworkFactory.class.getConstructor().newInstance();

        /**
         * 1.2创建一个新的客户端
         * 参数: connectString - 连接到服务器的列表
         * sessionTimeoutMs - 会话超时 ,多久没连就挂
         * connectionTimeoutMs - 连接超时 ,连了多久
         * retryPolicy -使用重试策略返回:客户端
         */
        RetryPolicy retry = new ExponentialBackoffRetry(3000, 10);//重试策略:3s重试1次,最多10次
//        client = CuratorFrameworkFactory.newClient(
//                "192.168.184.129:2181",
//                60 * 1000,
//                15 * 1000, retry);
        开启连接
//        client.start();

        /**
         * 1.3第二种方式
         * namespace(“xxx”)相当于是一个命名空间,方便区分,在根目录前加前缀
         */
        client = CuratorFrameworkFactory.builder().connectString("192.168.184.129:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retry).namespace("itheima").build();
        client.start();
    }

 2.创建节点并且赋值

  /**
     * 2.创建节点:持久 临时 顺序 数据
     * 2.1基本创建
     * 2.2创建带有数据的
     * 2.3多级节点+设置节点类型
     */
    @Test
    public void testCreate() throws Exception {
        //1.基本创建持久化
        String path = client.create()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/app3", "hehe".getBytes(StandardCharsets.UTF_8));
        System.out.println(path);

        //2.让他不结束
        while (true) {

        }
    }

3.创建子节点

/**
     * 创建子节点
     *
     * @throws Exception
     */
    @Test
    public void testMulti() throws Exception {
        //creatingParentsIfNeeded():如果父节点不存在就进行创建
        String path = client.create()
                .creatingParentsIfNeeded()
                .forPath("/app4/p1");
        System.out.println(path);

    }

4.查询节点数据

  /**
     * 查询节点数据
     */
    @Test
    public void testGet() throws Exception {
        byte[] bytes = client.getData().forPath("/app3");
        System.out.println(new String(bytes));
    }

5.查询当前路径下所有子节点

  /**
     * 查询子节点
     * getChildren().forPath():得到目录下所有子节点
     * 当前命名空间下的子目录节点
     */
    @Test
    public void testChildren() throws Exception {
        List<String> path = client.getChildren().forPath("/");
        System.out.println(path);
    }

 6.查询节点状态

 /**
     * 查询节点状态信息
     * new Stat()
     * client.getData():查询数据
     * .storingStateIn(stat).forPath():查询节点状态值,将状态值给到Stat
     */
    @Test
    public void testState() throws Exception {
        Stat stat = new Stat();
        System.out.println(stat);

        //查询节点状态信息ls -s
        client.getData().storingStatIn(stat).forPath("/app3");
        System.out.println(stat);
    }

7.利用版本version修改节点数据

 /**
     * 根据版本判断修改节点状态值
     * setData().withVersion().forPath()
     * 如果版本不一致就不允许更改,保证了数据一致性,操作的原子性
     */
    @Test
    public void testSetForVersion() throws Exception {
        Stat stat = new Stat();
        //1.查询节点状态
        client.getData().storingStatIn(stat).forPath("/app3");
        //2.得到查询出来节点状态中的版本version
        int version = stat.getVersion();
        System.out.println("该节点数据的版本:"+version);

        client.setData().withVersion(version).forPath("/app3","newLife".getBytes(StandardCharsets.UTF_8));
    }

8.删除节点

  /**
     * 4.删除节点
     * 1.删除单个节点 delete().forPath()
     * 2.删除带有子节点的节点 delete().deleteChildrenIfNeeded().forPath
     * 3.必须成功的删除 delete().guaranteed().forPath()
     * 4.回调 inBackground(new BackgroundCallBack()).forPath()
     *
     */
    @Test
    public void testDelete() throws Exception {
        //1.删除单个节点
        client.delete().forPath("/app3");

    }

    @Test
    public void testDeleteNeeded() throws Exception {
        //1.删除带有子节点的节点
        client.delete().deletingChildrenIfNeeded().forPath("/app4");
    }

    /**
     * 可能删除的时候(客户端和Server需要建立连接),可能出现耗时较长导致删除失败的情况
     * delete().guaranteed().forPath()
     */
    @Test
    public void testMustDelete() throws Exception {
        //1.必须成功删除
        client.delete().guaranteed().forPath("app2");
    }

9.回调机制

  /**
     * 回调机制——>这里是删除后的回调
     * delete().guaranteed().inBackground(new BackgroundCallBack())
     * @throws Exception
     */
    @Test
    public void testBack() throws Exception {
        //1.调用回调方法inBackground->并且实现参数接口的回调机制processResult
        client.delete().guaranteed().inBackground((client, event) -> {
            System.out.println("当前节点被删除~");
            System.out.println(event);//2.打印节点信息——>被CuratorEvent捕捉
        }).forPath("/app2");
    }

Watch事件的监听

主要用于发布订阅,比如三个子节点订阅了APP1,当其中一个节点修改了APP1的数据时,会发布信息给到其他两个节点告诉数据的变化 

我们对比一下Redis的发布订阅

(53条消息) Redis配置文件+发布订阅+新数据类型_Fairy要carry的博客-CSDN博客

 我们对于Zookeeper的服务端事件监听也是用的Curator来实现的——>引入了Cache

NodeCache:监听特定节点

PathChildrenCache:监听某节点的子节点们

TreeCache:监听整个树节点=NodeCache+PathChildrenCache

package com.wyh.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
 * @author diao 2022/9/4
 */

public class CuratorWatchTest {

    private CuratorFramework client;

    /**
     * 1.建立连接
     */
    @Before
    public void testConnect() throws Exception {

        RetryPolicy retry = new ExponentialBackoffRetry(3000, 10);//重试策略:3s重试1次,最多10次

        /**
         * 1.3第二种方式
         * namespace(“xxx”)相当于是一个命名空间,方便区分,在根目录前加前缀
         */
        client = CuratorFrameworkFactory.builder().connectString("192.168.184.129:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retry).namespace("itheima").build();
        client.start();
    }

    /**
     * 1.NodeCache监听当前节点
     * @throws Exception
     */
    @Test
    public void testNodeCache() throws Exception {
        NodeCache nodeCache = new NodeCache(client, "/app2");//1、监听的节点
        //2、注册监听
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("节点发生变化");
                //2.1变化的结果
                byte[] data = nodeCache.getCurrentData().getData();
                System.out.println(new String(data));
            }
        });

        //3.开启监听
        nodeCache.start(true);

        //4.需要持续监听——>线程则需要一直存活
        while (true) {

        }
    }

    /**
     * PathNodeCache监听子节点
     * @throws Exception
     */
    @Test
    public void testPathChildrenCache() throws Exception {
        //1.创建监听对象
        PathChildrenCache childrenCache = new PathChildrenCache(client, "/app2", true);

        //2.绑定监听器
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                System.out.println("子节点发生变化...");
                //2.1打印监听事件
                System.out.println(pathChildrenCacheEvent);
                //2.2判断监听事件类型判断哪些是不能监听的
                PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
                //2.3当监听到的事件是update时
                if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                    System.out.println("数据变了!!!");
                    byte[] data = pathChildrenCacheEvent.getData().getData();
                    System.out.println(new String(data));
                }
            }
        });

        //3.开启
        childrenCache.start();

        //4.保证持续监听
        while(true){

        }
    }

    /**
     * TreeNodeCache完成监听
     * 1.new TreeCache封装连接以及节点
     * 2.getListenable().addListener(new TreeCacheListener{
     *     childEvent();
     * })
     * @throws Exception
     */
    @Test
    public void testTreeCache() throws Exception {
        //1.创建监听器
        TreeCache treeCache = new TreeCache(client, "/app2");

        //2.注册监听
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println("节点发生变化");
                System.out.println(treeCacheEvent);
                byte[] data = treeCacheEvent.getData().getData();
                System.out.println("改变后数据为:"+data);
            }
        });
        //3.启动
        treeCache.start();

        //4.持续监听
        while(true){

        }
    }


    /**
     * 3.释放资源
     */
    @After
    public void testClose() {
        if (client != null) client.close();
    }

}

分布式锁

 Zookeeper分布式锁原理

1.首先先创建节点,比如三个client都要获取,那就在lock节点下创建三个节点(临时节点,保证宕机的时候删除)——>2.获取lock下的所有子节点,客户端getChildren返回所有子节点后比较大小,最小的先获取到锁,如果说用完之后,会将节点删除(类似于之前在finally中将锁释放一样),这里是client.close() ——>3.其他节点不是最小的那么就会创建一个事件监听器,监听比自己小的节点,监听删除事件——>4.当发现监听的节点被删除后,也就是锁释放了,那么此时再次判断是不是最小的,如果是,则获取锁,不是则重复以上操作

 Curator实现分布锁API

 分布式锁12306售票

1.将锁加到12306上面 

 1.任务类

package com.wyh.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

/**
 * @author diao 2022/9/4
 */
public class Ticket12306 implements Runnable {

    private volatile int tickets = 10;//数据库票数

    private InterProcessMutex lock;

    /**
     * 1.通过构造方法初始化lock
     */
    public Ticket12306() {
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 10);
        CuratorFramework client= CuratorFrameworkFactory.builder()
                .connectString("192.168.184.129:2181")
                .sessionTimeoutMs(60*1000)
                .connectionTimeoutMs(15*1000)
                .retryPolicy(retry).build();
        client.start();

        this.lock=new InterProcessMutex(client,"/lock");
    }

    /**
     * 2.线程执行方法
     */
    @Override
    public void run() {
        while (true) {
            //1.加锁
            try {
                lock.acquire(3, TimeUnit.SECONDS);//本质其实就是cas
                if (tickets > 0) {
                    System.out.println(Thread.currentThread() + ":" + tickets);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                try {
                    //2.释放锁
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }


        }
    }
}

 2.主方法

package com.wyh.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Before;

/**
 * @author diao 2022/9/4
 */
public class LockTest {

    public static void main(String[] args) {
        Ticket12306 ticket12306 = new Ticket12306();

        //1.创建客户端
        Thread t1 = new Thread(ticket12306, "携程");
        Thread t2 = new Thread(ticket12306, "飞猪");

        t1.start();
        t2.start();
    }

}

 

Zookeeper集群搭建

类似redis的分片集群,master互票 

Redis分片集群_Fairy要carry的博客-CSDN博客_redis分片和集群

 这里弄个假的,多个端口,而不是多个ip

1.创建三个单节点的zk

(55条消息) Zookeeper安装与配置_Murmure__的博客-CSDN博客

 2.然后修改配置文件的端口,data,log路径

3.分别记录id值(对于每个节点的id文件)

4.让他们互相知道彼此(在配置文件中配置)

server.服务器id=服务器ip:服务器通信端口:服务器之间投票选举端口

 此时已经构建集群环境

5.启动即可

三个都启动

usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start

 然后看彼此节点状态发现2节点是leader

[root@localhost ~]# /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-cluster/zookeeper-2/bin/../conf/zoo.cfg
Client port found: 2182. Client address: localhost. Client SSL: false.
Mode: leader

异常测试

 当你把主节点stop之后,再次start之后,主节点还是主节点

 

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐