20200509 Curator入门
简介Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper” 给Curator予高度评价。Cur
简介
Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper” 给Curator予高度评价。
Curator的maven依赖: 一般直接使用curator-recipes就行了,如果需要自己封装一些底层些的功能的话,例如增加连接管理重试机制等,则可以引入curator-framework包
<!-- 对zookeeper的底层api的一些封装 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
基本API
创建会话
使用静态工程方法创建
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.128.129:2181",
5000, 5000, retryPolicy);
其中RetryPolicy为重试策略,第一个参数为baseSleepTimeMs初始的sleep时间,用于计算之后的每次重试的sleep时间。第二个参数为maxRetries,最大重试次数。
使用Fluent风格api创建
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.128.129:2181")
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(5000) // 连接超时时间
.retryPolicy(retryPolicy)
.namespace("base") // 包含隔离名称
.build();
client.start();
创建数据节点
client.create().creatingParentContainersIfNeeded() // 递归创建所需父节点
.withMode(CreateMode.PERSISTENT) // 创建类型为持久节点
.forPath("/nodeA", "init".getBytes()); // 目录及内容
删除数据节点
client.delete()
.guaranteed() // 强制保证删除
.deletingChildrenIfNeeded() // 递归删除子节点
.withVersion(10086) // 指定删除的版本号
.forPath("/nodeA");
读取数据节点
读数据,返回值为byte[]
byte[] bytes = client.getData().forPath("/nodeA");
System.out.println(new String(bytes));
读stat
Stat stat = new Stat();
client.getData()
.storingStatIn(stat)
.forPath("/nodeA");
修改数据节点
client.setData()
.withVersion(10086) // 指定版本修改
.forPath("/nodeA", "data".getBytes());
事务
client.inTransaction().check().forPath("/nodeA")
.and()
.create().withMode(CreateMode.EPHEMERAL).forPath("/nodeB", "init".getBytes())
.and()
.create().withMode(CreateMode.EPHEMERAL).forPath("/nodeC", "init".getBytes())
.and()
.commit();
其他
client.checkExists() // 检查是否存在
.forPath("/nodeA");
client.getChildren().forPath("/nodeA"); // 获取子节点的路径
异步回调
异步创建节点
Executor executor = Executors.newFixedThreadPool(2);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((curatorFramework, curatorEvent) -> {
System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
},executor)
.forPath("path");
监听器
Curator提供了三种Watcher(Cache)来监听结点的变化:
- Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
- Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
- Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.RetryNTimes;
/**
* Curator framework watch test.
*/
public class CuratorWatcherTest {
/** Zookeeper info */
private static final String ZK_ADDRESS = "192.168.1.100:2181";
private static final String ZK_PATH = "/zktest";
public static void main(String[] args) throws Exception {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
System.out.println("zk client start successfully!");
// 2.Register watcher
PathChildrenCache watcher = new PathChildrenCache(
client,
ZK_PATH,
true // if cache data
);
watcher.getListenable().addListener((client1, event) -> {
ChildData data = event.getData();
if (data == null) {
System.out.println("No data in event[" + event + "]");
} else {
System.out.println("Receive event: "
+ "type=[" + event.getType() + "]"
+ ", path=[" + data.getPath() + "]"
+ ", data=[" + new String(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
});
watcher.start(StartMode.BUILD_INITIAL_CACHE);
System.out.println("Register zk watcher successfully!");
Thread.sleep(Integer.MAX_VALUE);
}
}
Curator“菜谱”
既然Maven包叫做curator-recipes,那说明Curator有它独特的“菜谱”:
- 锁:包括共享锁、共享可重入锁、读写锁等。
- 选举:Leader选举算法。
- Barrier:阻止分布式计算直至某个条件被满足的“栅栏”,可以看做JDK Concurrent包中Barrier的分布式实现。
- 缓存:前面提到过的三种Cache及监听机制。
- 持久化结点:连接或Session终止后仍然在Zookeeper中存在的结点。
- 队列:分布式队列、分布式优先级队列等。
分布式锁
分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。例如,现在一台应用正在rebuild缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等。
下面的程序会启动两个线程t1和t2去争夺锁,拿到锁的线程会占用5秒。运行多次可以观察到,有时是t1先拿到锁而t2等待,有时又会反过来。Curator会用我们提供的lock路径的结点作为全局锁,这个结点的数据类似这种格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次获得锁时会生成这种串,释放锁时清空数据。
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import java.util.concurrent.TimeUnit;
/**
* Curator framework's distributed lock test.
*/
public class CuratorDistrLockTest {
/** Zookeeper info */
private static final String ZK_ADDRESS = "192.168.1.100:2181";
private static final String ZK_LOCK_PATH = "/zktest";
public static void main(String[] args) throws InterruptedException {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
System.out.println("zk client start successfully!");
Thread t1 = new Thread(() -> {
doWithLock(client);
}, "t1");
Thread t2 = new Thread(() -> {
doWithLock(client);
}, "t2");
t1.start();
t2.start();
}
private static void doWithLock(CuratorFramework client) {
InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
try {
if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " hold lock");
Thread.sleep(5000L);
System.out.println(Thread.currentThread().getName() + " release lock");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Leader选举
当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.EnsurePath;
/**
* Curator framework's leader election test.
* Output:
* LeaderSelector-2 take leadership!
* LeaderSelector-2 relinquish leadership!
* LeaderSelector-1 take leadership!
* LeaderSelector-1 relinquish leadership!
* LeaderSelector-0 take leadership!
* LeaderSelector-0 relinquish leadership!
* ...
*/
public class CuratorLeaderTest {
/** Zookeeper info */
private static final String ZK_ADDRESS = "192.168.1.100:2181";
private static final String ZK_PATH = "/zktest";
public static void main(String[] args) throws InterruptedException {
LeaderSelectorListener listener = new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println(Thread.currentThread().getName() + " take leadership!");
// takeLeadership() method should only return when leadership is being relinquished.
Thread.sleep(5000L);
System.out.println(Thread.currentThread().getName() + " relinquish leadership!");
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
}
};
new Thread(() -> {
registerListener(listener);
}).start();
new Thread(() -> {
registerListener(listener);
}).start();
new Thread(() -> {
registerListener(listener);
}).start();
Thread.sleep(Integer.MAX_VALUE);
}
private static void registerListener(LeaderSelectorListener listener) {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
// 2.Ensure path
try {
new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());
} catch (Exception e) {
e.printStackTrace();
}
// 3.Register listener
LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);
selector.autoRequeue();
selector.start();
}
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)