Zookeeper客户端Curator使用详解

1、人生经验

本人的那半生经历能够简单回顾为:四岁上小学、10陆周岁进入某98中国共产党第五次全国代表大会学、20岁本科结束学业,然后进入体制内摸爬滚打1陆年(在这之中某段时间返校读了非正式的微处理器大学生),3105周岁主动辞职,从样式内出来,成为自由职业者。

人生中经历过的几段重点节点:

  • 没读中等专业高校,而是上了高级中学;
  • 考上了向往的大学,却并未有进来热爱的微型总括机专业,而是被调剂到汽车正式(原因不详,预计是…你懂的)。但极度时候吗都不懂,也没人指路,傻乎乎的就去了;
  • 0一年结业的时候,未有选拔去费城闯荡加入总计机行业,而是进入了体制内,从一个基层技干做起;
  • 07年脱生产和教学习总结机专业学士学位,为辞职做准备;
  • 可是因为各个缘由蹉跎了拾年,3伍岁才算是屏弃职业,从头发轫。

分布式锁

提醒:

一.推荐介绍使用ConnectionStateListener监察和控制连接的意况,因为当连接LOST时您不再具有锁

二.分布式的锁全局同步,
那代表任何三个时刻点不会有四个客户端都拥有相同的锁。

一. 百度求职

实际上本身最棒的干活主旋律是智能驾车、无人车辆,差不多是正统绝配啊,并且原来的做事中有从事过那块内容。于是拜托百度的校友帮忙内推,本人也在百度社招上面投了简历。可是,石沉大海,未有任何反馈。小编尤其纳闷,难道是简历写的不合适?仔细修改,再投,照旧不行。后来,同学说只怕是百度无人平台高层变动,暂停招聘了。好吧,我们无缘。

可重入共享锁—Shared Reentrant Lock

Shared意味着锁是全局可见的, 客户端都足以请求锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同3个客户端在富有锁的还要,能够屡屡获得,不会被封堵。
它是由类InterProcessMutex来贯彻。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()获得锁,并提供超时机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()措施释放锁。 InterProcessMutex 实例能够引用。

Revoking ZooKeeper recipes wiki定义了可协商的废除机制。
为了打消mutex, 调用上边的章程:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

一经你请求打消当前的锁,
调用attemptRevoke()方法,注意锁释放时RevocationListener将会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

3回提醒:错误处理
依然强烈推荐你使用ConnectionStateListener处理连接情状的更动。
当连接LOST时您不再抱有锁。

第3让大家成立贰个模拟的共享财富,
这一个能源期望只好单线程的造访,不然会有出现难点。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

接下来成立多个InterProcessMutexDemo类, 它肩负请求锁,
使用能源,释放锁那样3个完整的访问进度。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也很容易,生成十一个client, 各类client重复执行十一次请求锁–访问财富–释放锁的进度。各个client都在单身的线程中。
结果能够见到,锁是随便的被每一个实例排他性的行使。

既是是可选拔的,你可以在三个线程中往往调用acquire(),在线程拥有锁时它总是回到true。

您不应当在多少个线程中用同1个InterProcessMutex
你能够在每一个线程中都生成一个新的InterProcessMutex实例,它们的path都同一,那样它们能够共享同三个锁。

4.某VRAV四创业集团

合营社老董娘是自家学弟……年轻有才啊!

协作社地方不在市焦点,而是某处新开发的办公楼,不过对本身却是很有益于。驱车前往,先填表,然后老董初步面试笔者。这一面,不得了,极度投机,感觉自笔者的技艺力量和动向,就是他俩要找的老大人。然后CTO也来了,聊得也合情合理。最终又去跟本人的小业主学弟聊,发现我们也有在圈内的同台熟人,感觉相比较认同。这一面就是从晌午到夜晚,最终总老董小叔子,拉着自家,指着二个工位说:这便是你的工位了,现在你正是我们的类别牵头了,你前些天就过来入职。作者立时很难堪,咱们还没谈薪金待遇等难点呢,怎么能那样快吧?于是找了个借口,赶紧出来了。其实,那一年,小编还有其余面试…..

检查节点是还是不是留存

client.checkExists().forPath("path");

留神:该方法再次回到三个Stat实例,用于检查ZNode是不是留存的操作.
能够调用额外的章程(监察和控制也许后台处理)并在终极调用forPath(
)内定要操作的ZNode

五、总结

人生如此多年,特别近日1两年,总括出了1些经验,分享给大家。

  • 男怕入错行!入了行就绝不再换了,换行的代价太高,错就错下去;
  • 假若未有背景,请一定肯定肯定毫无去体制内;
  • 早就进了体制内的,如故坚定不移下去吧;
  • 电脑技术比比皆是,不变的唯有学习;
  • 人无远虑,必有近忧。不要埋头做技术,要抬眼看世界,规划好团结的职业生涯和技巧途径。

不知不觉一年过去,有成功也有战败,但不论是什么样都要向前看,万1梦想完结了啊?^-^!

好了,胡言乱语到此甘休。祝大家也祝自个儿二〇一八年拿走成功和愉悦!

读取数据节点数据

读取2个节点的多少内容

client.getData().forPath("path");

瞩目,此办法返的重回值是byte[ ];

读取二个节点的数目内容,同时取获得该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

三、今年经历了哪些

201七新年,办完离职手续,要初步找工作了。因为学习和做事的经验比较复杂,并且有所小车和处理器双行业内部,经理过大型项目,对项目管理、设计开发、产品测试,以及课题申报、招投标、专利申请、预算结算等都比较精通,算是个多面手,所以刚开头照旧英姿勃勃、信心满满的。结果…..受打击了。

多少节点操作

三. 某正式行业公司

这家不是自身要好投的。是自个儿在招聘网址下边随手仍了个简历,对方搜索到的。对大专营商有了心情阴视后,想了想,不可能自视太高,应该放下身段。但自个儿留了个心眼,网上寻找了一下该公司,评价不太好,管理者比技术职员多。想了想,权当刷一波经历,去探视啊。集团地方在天体焦点,办公环境大、宽敞,但没看出几人,美貌的前台兼文书在嗑瓜子。一番交换、填表后,来了个女老董,准备面试。这些女CEO看起来比笔者还紧张,于是我就问了句,你们特邀本人面试的那么些“董事长助理”是个什么样地点?对方懵了,说仍然让技术理事来吗。于是作者也懵了。

过一会,来了个分管技术的副主管,人挺和蔼,说话不紧不慢,跟作者介绍了贰个钟头公司的情况,笔者的头都快点木了。终于进入实质性阶段,原来自家那几个“董事长助理”岗位指的是直接对董事长负责的技艺帮扶COO。其实今年,我已经舍弃了该商户,于是就开了个笔者自以为的‘天价’,对方并未有当即回绝,说等董事长回来后决定,然后笔者就走了。本以为基本就这么了,不会再有挂钩。没悟出一个星期后,对方竟然还电话布告说,他们董事长想约作者谈谈,那是允许笔者的价位了?不然还谈什么?然则自身大概没去,不仅是不想去,也是因为那时候已经得到offer了。

三.创设包括隔绝命名空间的对话

为了贯彻不相同的Zookeeper业务之间的割裂,要求为各样业务分配3个单身的命名空间(NameSpace),即钦赐1个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下边的例子)当客户端钦点了独自命名空间为“/base”,那么该客户端对Zookeeper上的多少节点的操作都是根据该目录实行的。通过安装Chroot能够将客户端应用与Zookeeper服务端的1课子树相对应,在七个使用共用一个Zookeeper集群的景色下,那对于贯彻区别采用之间的交互隔绝十二分有意义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

二、为何要出去

体制内的图景不便宜细谈,总括正是自作者智力商数够,可是情商不够,而且收入其实是低,在京城生存困难,于今没买房子(后悔死了)。跟很四个人想的差别,体制内并未有铁饭碗的,真混不下去,壹样有希望被解雇。思考了家庭、生活、理想、幸福指数(笔者是还是不是想多了?),终于依旧决定主动离职,尝试换一条路。出来的时候,笔者早就是高档工程师职称,处级干部待遇,得到过五次省部级一等奖,五次二等奖,也算多少底子。

分布式int计数器—SharedCount

本条类使用int类型来计数。 首要涉及八个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount表示计数器,
能够为它扩充二个SharedCountListener,当计数器改变时此Listener能够监听到改变的风浪,而SharedCountReader能够读取到最新的值,
包涵字面值和带版本消息的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在那么些事例中,大家运用baseCount来监听计数值(addListener措施来添加SharedCountListener
)。 任意的SharedCount, 只要利用同样的path,都足以拿走这些计数值。
然后我们使用几个线程为计数值扩张2个十以内的随机数。相同的path的SharedCount对计数值举行改动,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

此处大家运用trySetCount去设置计数器。
率先个参数提供当前的VersionedValue,要是时期别的client更新了此计数值,
你的换代恐怕不成功,
但是此时你的client更新了新型的值,所以失利了您可以品味再更新2回。
setCount是挟持更新计数器的值

在意计数器必须start,使用完事后必须调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

二. 京东求职

一如既往是无人车方向,也是托朋友关系,简历投递很顺畅,1个星期前边试。进程不详说,技术面试和牵头面试都没难点,万分满足,甚至双方在少数地点已经还有过接触,感觉此次稳了。再等了1个星期,H本田UR-V电话公告去谈待遇,看来基本没难点了。不料,约定小时的头天又说老总要再面一下,可以吗,准备充裕,又去了。京东对无人平台很珍视啊,作者面试的也不是管制岗,居然出动了主管级别,具体是什么人小编就隐瞒了,大家能够百度。当然,笔者那样长年累月样式内摸爬滚打,高级别的经营管理者也没少见,同桌就餐,聊聊天也是有过的,肯定不会怯场。聊了有一个钟头,进程也不细说,但感觉不太好,只怕是自己还并未有从体制内完全走出去,没有get到对方兴趣点,那事看来悬了。果不其然,七日后,告知落选。看来,笔者和京东也没缘分。

分布式队列

动用Curator也足以简化Ephemeral Node
(一时节点)的操作。Curator也提供ZK Recipe的分布式队列落成。 利用ZK的
PE陆风X8SISTENTS_EQUENTIAL节点,
能够保险放入到行列中的项目是依据顺序排队的。
倘若纯粹的买主从队列中取数据, 那么它是先入先出的,那也是队列的风味。
假如你严苛须要顺序,你就的施用单一的顾客,能够行使Leader选举只让Leader作为唯一的买主。

可是, 依照Netflix的Curator作者所说,
ZooKeeper真心不合乎做Queue,可能说ZK未有落到实处一个好的Queue,详细内容能够看
Tech Note
4

原因有5:

  1. ZK有1MB 的传导限制。
    实践中ZNode必须相对较小,而队列包蕴众多的新闻,非常大。
  2. 假使有成都百货上千节点,ZK运营时一点也很快。 而使用queue会导致不可枚举ZNode.
    你须求肯定增大 initLimit 和 syncLimit.
  3. ZNode不小的时候很难清理。Netflix不得不创设了三个专程的顺序做那事。
  4. 当十分的大方的蕴藏众多的子节点的ZNode时, ZK的属性别变化得不佳
  5. ZK的数据库完全放在内部存款和储蓄器中。 多量的Queue意味着会占用很多的内存空间。

就算, Curator依旧创立了种种Queue的落到实处。
假如Queue的数据量不太多,数据量不太大的状态下,酌情思虑,依旧得以应用的。

四、未来在做怎么着

按理说说,作者今天应有在大数量公司,做着自个儿的项目主任,不会在那边写博客。可事实是自家从大数目公司积极离职了,为何?

  • 信用社尽管没明着说9九陆,但周末主导要突击;
  • 节奏太快,上午的活无法拖到早晨,更别说过夜;
  • 商家架构复杂,部门涉及理不清,存在斗争,上面包车型客车人不佳干;
  • 每一天下班前开会,开着开着就九10点了;
  • 壹位当五个人使,刚进来没多长时间,就压上海重机厂担;
  • 开端本人是当甲方的,未来当乙方…..

实际,小编心中很清楚,上边的原因都不是事,大多数供销合作社都如此。真正的缘故是在样式内呆了1陆年,别的没学会,臭毛病倒是一群。要双休,要有时间照顾家里,要给男女引导作业,要定期训练身体,要有出行假期,要办事时间随便,要做甲方….由此可知笔者实在不相符在信用合作社公司里。

也是沸腾了少数个夜晚,也曾逼迫自个儿适应过,但结尾依旧发现到祥和一度老了,心态不正确了,真的无法在职场里努力了。正好当时,被一句鸡汤迷了–人总要做点本人喜好的事体,于是毅然辞职。小编当成个loser啊,太任性了。

辞职干什么呢?当然是钻探协调最喜爱的微处理器技术。作者自个儿对Python和Django还算比较精晓,于是就从无到有,在阿里云上买了个主机和域名,用Django写了个个人网站,又花了6个月时光,原创了Python教程和Django教程,并免费挂在网上供大家学习。梦想是像前辈那样,靠点击和流量就能生活就行。近日网址开通才六个月,每一天已经有几千的PV和几百的UV了,我们的评论和介绍还不易。有趣味的能够点击访问,刘江(Liu Jiang)的博客和科目,多提宝贵意见,多谢!

带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和下边包车型大巴队列类似,只是足以为队列中的每贰个要素设置一个ID
能够透过ID把队列中随机的成分移除。 它事关多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

经过上边方法创造:

builder.buildIdQueue()

放入成分时:

queue.put(aMessage, messageId);

移除元素时:

int numberRemoved = queue.remove(messageId);

在那么些例子中,
有个别成分还尚未被消费者消费前就移除了,那样消费者不会收到删除的音信。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

5. 某大数量公司

那是一家很NB的信用合作社,纵然不有名,但体量十分的大、背景很深,属于潜水的‘巨鳄’,详细无法说。

在上家V宝马X5公司面试后飞速,就来这家小编投了简历的店堂面试,职位是售前项目COO。对方赏识笔者的样式背景、项目管理经验、招投标经验。面试相当的粗略,间接牵头的副老总看了看简历,问了问情形,觉得不错,就带小编去找H科雷傲首席执行官。HWrangler组长挺好,给自身须要的薪饷砍了点….,幸好,小编曾经往上抬了点(窃笑),所以最后感觉还算满意。约好明日入职,马上上岗。也就那天,作者接过VTucson集团的对讲机,对方把薪给提升了,还思虑以后能够给点股份,不过须要二日五日的行事。

用作一个刚从体制内出来的人,笔者对9玖陆是很顶牛的。综合思虑半天,觉得照旧这家大数据集团更好点,报酬更高、发展前途更大,而且周末双休(后来才精通被套路了)。VCR-V在当时,日子不太好过,于是只好对不起学弟了,祝你们事业如日方升。

除了上边介绍的这几家,小编还面过部分协作社,有胜有负,就不单独介绍了。

不可重入共享锁—Shared Lock

以此锁和地点的InterProcessMutex相比较之下,正是少了Reentrant的意义,也就代表它无法在同三个线程中重入。这几个类是InterProcessSemaphoreMutex,使用格局和InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运作后意识,有且唯有3个client成功得到第多个锁(第贰个acquire()主意重返true),然后它自身过不去在其次个acquire()方法,获取第三个锁超时;别的兼具的客户端都阻塞在率先个acquire()措施超时并且抛出十分。

那般也就印证了InterProcessSemaphoreMutex福寿齐天的锁是不行重入的。

Leader选举

在分布式总计中, leader elections是很要紧的二个功力,
这一个公投进度是这样子的: 指派一个经过作为组织者,将职务分发给各节点。
在职务开始前,
哪个节点都不亮堂哪个人是leader(领导者)或然coordinator(协调者).
当公投算法初始实践后, 每一种节点最终会收获一个唯壹的节点作为义务leader.
除此而外,
大选还四天五头会时有爆发在leader意外宕机的情形下,新的leader要被选举出来。

在zookeeper集群中,leader负责写操作,然后经过Zab协和落实follower的联合,leader也许follower都得以拍卖读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前端是兼备存活的客户端不间断的交替做Leader,南平社会。后者是如若大选出Leader,除非有客户端挂掉重新触发大选,不然不会交出领导权。某党?

分布式long计数器—DistributedAtomicLong

再看叁个Long类型的计数器。 除了计数的限量比SharedCount大了之外,
它首先尝试使用乐观锁的方法设置计数器,
即使不成事(比如时期计数器已经被其余client更新了),
它利用InterProcessMutex格局来更新计数值。

能够从它的内部贯彻DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一连串的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增添一定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须检查重临结果的succeeded(), 它意味着此操作是还是不是成功。
若是操作成功, preValue()意味着操作前的值,
postValue()表示操作后的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某叁个一定的节点。它关系到上面包车型客车多个类:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:应用cache,仍旧要调用它的start()措施,使用完后调用close()方法。

getCurrentData()将取得节点当前的情景,通过它的场合能够拿走当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:以身作则中的Thread.sleep(十)能够注释,可是注释后事件监听的触发次数会不全,那或许与NodeCache的兑现原理有关,不可能太过频仍的触及事件!

注意:NodeCache只好监听三个节点的情状变化。

Curator的基本Api

更新数据节点数据

履新三个节点的数码内容

client.setData().forPath("path","data".getBytes());

只顾:该接口会回到2个Stat实例

立异一个节点的数量内容,强制钦命版本进行立异

client.setData().withVersion(10086).forPath("path","data".getBytes());

简介

Curator是Netflix集团开源的一套zookeeper客户端框架,消除了过多Zookeeper客户端万分底层的细节开发工作,包涵连日来重连、反复注册沃特cher和NodeExistsException很是等等。Patrixck
Hunt(Zookeeper)以一句“Guava is to Java that Curator to
Zookeeper”给Curator予中度评价。
引子和趣闻:
Zookeeper名字的由来是相比较有意思的,上面包车型大巴部分摘抄自《从PAXOS到ZOOKEEPESportage分布式一致性原理与执行》一书:
Zookeeper最早起点于雅虎的商量院的三个商量小组。在及时,商量人口发现,在雅虎内部很多重型的系统必要正视多个近乎的体系开始展览分布式协调,但是这个系统往往存在分布式单点难题。所以雅虎的开发人员就打算开发3个通用的无单点难题的分布式协调框架。在立项初期,思索到不少类型都以用动物的名字来命名的(例如盛名的Pig项目),雅虎的工程师希望给那些项目也取二个动物的名字。时任商量院的首席物教育学家Raghu
Ramakrishnan开玩笑说:再这么下去,大家那儿就变成动物园了。此话一出,我们纷纭表示就叫动物园管理员吧——因为各种以动物命名的分布式组件放在壹块儿,雅虎的一体分布式系统看上去仿佛二个大型的动物园了,而Zookeeper正好用来拓展分布式环境的调和——于是,Zookeeper的名字由此诞生了。

Curator无疑是Zookeeper客户端中的瑞士联邦军刀,它译作”馆长”可能”管理者”,不清楚是或不是付出小组有意而为之,小编估量有望那样命名的因由是验证Curator便是Zookeeper的馆长(脑洞有点大:Curator正是动物园的园长)。
Curator包涵了多少个包:
curator-framework:对zookeeper的底部api的部分装进
curator-client:提供一些客户端的操作,例如重试策略等
curator-recipes:装进了一些高档特性,如:Cache事件监听、公投、分布式锁、分布式计数器、分布式巴里r等
Maven依赖(使用curator的版本:二.12.0,对应Zookeeper的本子为:三.四.x,即使跨版本会有包容性难题,很有十分大希望导致节点操作退步):

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

SimpleDistributedQueue

日前固然完结了各样队列,不过你放在心上到没有,那几个队列并未达成类似JDK一样的接口。
SimpleDistributedQueue提供了和JDK基本1致的接口(然则未有兑现Queue接口)。
创制很简短:

public SimpleDistributedQueue(CuratorFramework client,String path)

追日币素:

public boolean offer(byte[] data) throws Exception

剔除元素:

public byte[] take() throws Exception

别的还提供了其余措施:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take方法在成功重回此前会被堵塞。
poll主目的在于队列为空时直接回到null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

然而事实上发送了十0条信息,消费完第三条之后,前边的音讯无法消费,近日没找到原因。查看一下合法文书档案推荐的demo使用上边多少个Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

只是其实应用发现仍旧存在消费阻塞难题。

分布式队列—DistributedQueue

DistributedQueue是最家常的壹种队列。 它布署以下四个类:

  • QueueBuilder – 创立队列使用QueueBuilder,它也是别的队列的开创类
  • QueueConsumer – 队列中的消息消费者接口
  • QueueSerializer –
    队列音信类别化和反系列化接口,提供了对队列中的对象的体系化和反系列化
  • DistributedQueue – 队列达成类

QueueConsumer是消费者,它能够接收队列的数量。处理队列中的数据的代码逻辑能够放在QueueConsumer.consumeMessage()中。

常规情形下先将音讯从队列中移除,再付出消费者消费。但那是四个步骤,不是原子的。能够调用Builder的lockPath()消费者加锁,当消费者消费数据时享有锁,那样任何消费者不可能消费此音信。假诺消费退步也许经过死掉,音信能够付出其余进度。那会拉动或多或少品质的损失。最棒依旧单消费者形式应用队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了七个分布式队列和七个买主,因为PATH是一致的,会存在消费者抢占消费音讯的情景。

可重入读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。2个读写锁管理一对相关的锁。一个承担读操作,其余贰个承受写操作。读操作在写锁没被运用时可同时由三个进度使用,而写锁在应用时区别意读(阻塞)。

此锁是可重入的。3个具备写锁的线程可重入读锁,然而读锁却不能够跻身写锁。那也代表写锁能够降级成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级成写锁是可怜的。

可重入读写锁主要由四个类达成:InterProcessReadWriteLockInterProcessMutex。使用时首先创造1个InterProcessReadWriteLock实例,然后再依照你的急需获得读锁或许写锁,读写锁的门类是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

分布式计数器

顾名思义,计数器是用来计数的,
利用ZooKeeper能够达成一个集群共享的计数器。
只要选择同1的path就足以拿走最新的计数器值,
那是由ZooKeeper的1致性有限协助的。Curator有五个计数器,
3个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

Curator食谱(高级性子)

提醒:首先你不能够不添加curator-recipes信赖,下文仅仅对recipes一些天性的应用举行分解和举例,不打算展开源码级别的探索

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

关键提示:强烈推荐使用ConnectionStateListener监察和控制连接的场所,当连接情状为LOST,curator-recipes下的具有Api将会失效或许逾期,固然前面全体的例证都尚未应用到ConnectionStateListener。

信号量—Shared Semaphore

叁个计数的时限信号量类似JDK的Semaphore。
JDK中Semaphore维护的1组认同(permits),而Curator中称之为租约(Lease)。
有二种方式得以决定semaphore的最大租约数。第三种方法是用户给定path并且钦定最大LeaseSize。第三种办法用户给定path并且使用SharedCountReader类。壹旦不选取SharedCountReader,
必须确定保障全部实例在多进度中运用相同的(最大)租约数量,不然有相当大可能率出现A进程中的实例持有最大租约数量为10,然则在B进度中负有的最大租约数量为20,此时租约的意思就失效了。

此次调用acquire()会回到一个租约对象。
客户端必须在finally中close这么些租约对象,不然这一个租约会丢失掉。 可是,
可是,假若客户端session由于某种原因比如crash丢掉,
那么那个客户端持有的租约会自动close,
那样任何客户端能够继承运用那个租约。 租约还是能经过上边的措施返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

留意你能够一遍性请求五个租约,假若Semaphore当前的租约不够,则呼吁线程会被卡住。
同时还提供了晚点的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的机要类包罗下边多少个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

先是大家先取得了四个租约, 最终大家把它还给了semaphore。
接着请求了一个租约,因为semaphore还有陆个租约,所以恳请能够满意,再次来到贰个租约,还剩陆个租约。
然后再请求三个租约,因为租约不够,堵塞到过期,仍旧没能满意,再次来到结果为null(租约不足会阻塞到过期,然后回来null,不会积极性抛出11分;要是不安装超时时间,会一如既往阻塞)。

上面说讲的锁都是正义锁(fair)。 总ZooKeeper的角度看,
每种客户端都根据请求的各样得到锁,不设有非公平的并吞的气象。

二.接纳Fluent风格的Api创造会话

主干参数变为流式设置,三个列子如下:

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

创立会话

DistributedBarrier

DistributedBarrier类达成了栅栏的功效。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

先是你须求安装栅栏,它将封堵在它上面等待的线程:

setBarrier();

下一场需求阻塞的线程调用方法等待放行条件:

public void waitOnBarrier()

当规则知足时,移除栅栏,全数等待的线程将继续执行:

removeBarrier();

卓殊处理 DistributedBarrier
会监察和控制连接景况,当连接断掉时waitOnBarrier()方法会抛出至极。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

以此事例创造了controlBarrier来设置栅栏和移除栅栏。
大家创制了两个线程,在此Barrier上等待。
最后移除栅栏后有着的线程才继续执行。

若果你从头不安装栅栏,全数的线程就不会阻塞住。

异步接口

地点提到的创办、删除、更新、读取等情势都以一块的,Curator提供异步接口,引入了BackgroundCallback接口用于拍卖异步接口调用之后服务端再次回到的结果消息。BackgroundCallback接口中2个第3的回调值为Curator伊芙nt,里面含有事件类型、响应吗和节点的详细音讯。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

3个异步成立节点的例子如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不钦命executor,那么会暗许使用Curator的伊夫ntThread去开始展览异步处理。

Zookeeper客户端Curator使用详解

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在盘算的开头和得了时协同。当丰硕的进度进入到双栅栏时,进程伊始计算,
当计算完结时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter()办法被调用时,成员被封堵,直到全数的积极分子都调用了enter()
leave()格局被调用时,它也短路调用线程,直到全数的分子都调用了leave()
就像百米赛跑竞赛, 发令枪响,
全体的健儿早先跑,等具备的健儿跑过极端线,竞赛才甘休。

DistributedDoubleBarrier会监察和控制连接境况,当连接断掉时enter()leave()方法会抛出格外。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参考资料:
《从PAXOS到ZOOKEEPE奥德赛分布式一致性原理与履行》
《 跟着实例学习ZooKeeper的用法》博客连串

品类仓库:
https://github.com/zjcscut/curator-seed
(其实本文的MD是带导航目录[toc]的,比较便利导航到各样章节,只是简书不辅助,本文的MD原来的小说放在项目标/resources/md目录下,有爱自取,小说用Typora编写,提议用Typora打开)

End on 2017-5-13 13:10.
Help yourselves!
本人是throwable,在苏黎世奋斗,白天上班,上午和双休不定时加班,早晨空余坚持写下博客。
期望自个儿的小说能够给你带来收获,共勉。

[TOC]

LeaderLatch

LeaderLatch有八个构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

万1运营,LeaderLatch会和其余使用相同latch
path的别的LeaderLatch交涉,然后里面多个聊到底会被推举为leader,能够透过hasLeadership格局查看LeaderLatch实例是还是不是leader:

leaderLatch.hasLeadership( ); //重临true表明当前实例是leader

好像JDK的CountDownLatch,
LeaderLatch在呼吁成为leadership会block(阻塞),壹旦不采用LeaderLatch了,必须调用close主意。
假设它是leader,会释放leadership, 其余的出席者将会大选1个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

这多少个处理:
LeaderLatch实例可以追加ConnectionStateListener来监听互连网连接难题。 当
SUSPENDED 或 LOST 时,
leader不再认为本身如故leader。当LOST后总是重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后再也创建2个。LeaderLatch用户必须思考导致leadership丢失的总是难点。
强烈推荐你选择ConnectionStateListener。

叁个LeaderLatch的应用例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

可以添加test module的信赖方便进行测试,不须求运维真实的zookeeper服务端:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

首先我们创设了13个LeaderLatch,运转后它们中的四个会被推选为leader。
因为公投会开支壹些时日,start后并无法及时就拿走leader。
通过hasLeadership翻开自身是或不是是leader, 假设是的话重返true。
可以经过.getLeader().getId()能够得到当前的leader的ID。
只好通过close放活当前的政权。
await是二个围堵方法, 尝试获取leader地位,不过未必能上位。

预先级分布式队列—DistributedPriorityQueue

先行级队列对队列中的成分遵照事先级进行排序。 Priority越小,
元素越靠前, 越先被消费掉
。 它关系上边多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

通过builder.buildPriorityQueue(minItemsBeforeRefresh)方法创造。
当优先级队列获得元素增加和删除音讯时,它会中断处理当下的要素队列,然后刷新队列。minItemsBeforeRefresh钦定刷新前当前活动的种类的细微数量。
首要安装你的次序可以容忍的不排序的纤维值。

放入队列时要求钦点优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

有时你只怕会有错觉,优先级设置并不曾起效。那是因为事先级是对于队列积压的成分而言,如果消费速度过快有十分大或者现身在后一个因素入队操作此前前四个要素已经被消费,那种景况下DistributedPriorityQueue会退化为DistributedQueue。

运营客户端

当创立会话成功,获得client的实例然后能够直接调用其start( )方法:

client.start();

获取某些节点的全体子节点路径

client.getChildren().forPath("path");

专注:该措施的再次回到值为List<String>,获得ZNode的子节点Path列表。
能够调用额外的法门(监察和控制、后台处理依然取得状态watch, background or get
stat) 并在末了调用forPath()钦赐要操作的父ZNode

缓存

Zookeeper原生援助通过注册沃特cher来展开事件监听,可是开发者供给反复注册(Watcher只可以单次注册单次使用)。Cache是Curator中对事件监听的卷入,能够看成是对事件监听的地点缓存视图,能够活动为开发者处理反复注册监听。Curator提供了三种沃特cher(Cache)来监听结点的变通。

1.用到静态工程措施创立客户端

三个例证如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient静态工厂方法包罗多个重要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

除去数据节点

删去一个节点

client.delete().forPath("path");

在意,此措施只可以去除叶子节点,不然会抛出尤其。

去除三个节点,并且递归删除其持有的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

删去三个节点,强制钦命版本实行删减

client.delete().withVersion(10086).forPath("path");

去除3个节点,强制有限支撑删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是八个保持办法,只要客户端会话有效,那么Curator会在后台持续开始展览删除操作,直到删除节点成功。

注意:地方的多少个流式接口是能够自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

分布式屏障—Barrier

分布式巴里r是如此二个类:
它会阻塞全体节点上的等待历程,直到某一个被满意,
然后具有的节点继续开始展览。

例如赛马竞技前, 等赛马陆续驶来起跑线前。
一声令下,全数的赛马都飞奔而出。

Tree Cache

Tree
Cache能够监察和控制整个树上的富有节点,类似于PathCache和NodeCache的结合,重要涉嫌到上面多少个类:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCache伊夫nt – 触发的事件类
  • ChildData – 节点数据

public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中绝非使用Thread.sleep(10),可是事件触发次数也是常规的。

注意:TreeCache在开端化(调用start()措施)的时候会回调TreeCacheListener实例二个事TreeCache伊芙nt,而回调的TreeCache伊夫nt对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有一点都不小希望导致空指针相当,那里应该积极处理并防止这种情景。

多共享锁对象 —Multi Shared Lock

Multi Shared Lock是五个锁的器皿。 当调用acquire()
全数的锁都会被acquire(),如果请求失利,全部的锁都会被release。
同样调用release时享有的锁都被release(战败被忽视)。
基本上,它正是组锁的代表,在它上边的请求释放操作都会传送给它含有的有着的锁。

驷不如舌涉嫌三个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数供给包括的锁的聚合,恐怕1组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建3个InterProcessMultiLock, 包括三个重入锁和贰个非重入锁。
调用acquire()后能够看看线程同时持有了那多少个锁。
调用release()探望这四个锁都被释放了。

终极再重复一次,
强烈推荐使用ConnectionStateListener监察和控制连接的景象,当连接情状为LOST,锁将会丢掉。

Path Cache

Path Cache用来监督三个ZNode的子节点. 当3个子节点增添, 更新,删除时,
Path Cache会改变它的事态, 会包含最新的子节点,
子节点的多寡和景观,而事态的更变将通过PathChildrenCacheListener公告。

实际选择时会涉及到多少个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

经过上面的构造函数创立Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想采纳cache,必须调用它的start措施,使用完后调用close格局。
能够安装StartMode来达成运转的形式,

StartMode有下边两种:

  1. NO景逸SUVMAL:符合规律初阶化。
  2. BUILD_INITIAL_CACHE:在调用start()事先会调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache开首化数据后发送1个帕特hChildrenCache伊夫nt.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)能够追加listener监听缓存的变化。

getCurrentData()办法再次来到三个List<ChildData>对象,能够遍历全体的子节点。

设置/更新、移除其实是应用client (CuratorFramework)来操作,
不经过帕特hChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:假如new PathChildrenCache(client, PATH,
true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将再次来到null,cache将不会缓存节点数据。

注意:演示中的Thread.sleep(拾)能够注释掉,但是注释后事件监听的触发次数会不全,这大概与PathCache的落到实处原理有关,不可能太过数次的触发事件!

分布式延迟队列—DistributedDelayQueue

JDK中也有DelayQueue,不掌握你是不是熟稔。
DistributedDelayQueue也提供了就如的功能, 成分有个delay值,
消费者隔一段时间才能选择元素。 涉及到上面八个类。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

因此上边包车型大巴语句创设:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入成分时能够钦命delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离今后的三个时刻间隔,
比如20微秒,而是现在的多少个时日戳,如 System.currentTimeMillis() + 十秒。
假如delayUntilEpoch的岁月已经死亡,音信会立时被消费者收到。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

创办数量节点

Zookeeper的节点创造方式:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带连串号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:目前并且带连串号

**创办3个节点,开端内容为空 **

client.create().forPath("path");

瞩目:假设未有安装节点属性,节点创设方式暗中同意为持久化节点,内容暗中同意为空

创造一个节点,附带伊始化内容

client.create().forPath("path","init".getBytes());

制造多少个节点,钦点成立形式(最近节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

开创3个节点,钦定创制方式(临时节点),附带起首化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

成立3个节点,钦赐创立情势(暂时节点),附带初阶化内容,并且自动递归创建父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

本条creatingParentContainersIfNeeded()接口10分有用,因为相似处境开发人士在创制二个子节点必须认清它的父节点是不是存在,假设不存在直接创制会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归成立全部所需的父节点。

LeaderSelector

LeaderSelector使用的时候根本涉嫌下边多少个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

主题类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦运营,当实例取得领导权时您的listener的takeLeadership()艺术被调用。而takeLeadership()方法唯有领导权被放飞时才回来。
当你不再使用LeaderSelector实例时,应该调用它的close方法。

拾分处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接情况的变更。假若实例成为leader,
它应有响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时,
实例必须假定在重复连接成功在此以前它大概不再是leader了。 假使LOST状态现身,
实例不再是leader, takeLeadership方法重返。

重要: 推荐处理方式是当接过SUSPENDED 或
LOST时抛出CancelLeadershipException非常.。那会导致LeaderSelector实例中断并撤除执行takeLeadership方法的相当.。那可怜主要,
你不能够不思索增添LeaderSelectorListenerAdapter.
LeaderSelectorListenerAdapter提供了推荐介绍的拍卖逻辑。

上边包车型地铁二个事例摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

您能够在takeLeadership实行任务的分配等等,并且永不回来,即使你想要要此实例一直是leader的话可以加一个死循环。调用
leaderSelector.autoRequeue();管教在此实例释放领导权之后还可能获得领导权。
在此地我们选取AtomicInteger来记录此client获得领导权的次数, 它是”fair”,
每一个client有平等的火候得到领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

相比较能够,LeaderLatch必须调用close()方法才会释放领导权,而对于LeaderSelector,通过LeaderSelectorListener能够对领导权举行支配,
在适用的时候释放领导权,那样种种节点都有望取得领导权。从而,LeaderSelector具有更好的一帆风顺和可控性,建议有LeaderElection应用场景下优先接纳LeaderSelector。

事务

CuratorFramework的实例包涵inTransaction(
)接口方法,调用此办法开启一个ZooKeeper事务. 能够复合create, setData,
check, and/or delete
等操作然后调用commit()作为三个原子操作提交。二个事例如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

Post Author: admin

发表评论

电子邮件地址不会被公开。 必填项已用*标注