31

zookeeper的一篇概述

 5 years ago
source link: https://www.tuicool.com/articles/Mzyymin
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

之前在公司由于业务需要,对zookeeper进行了一些知识点的梳理进行分享,对一些刚刚接触zookeeper的小伙伴来说,或许可以借鉴一下

一、ZOOKEEPER介绍

简介

Zookeeper致力于提供一个 高性能高可用 ,且具备 严格的顺序访问 控制能力的分布式协调服务。

设计目标

  • 简单的数据结构: 共享的树形结构,类似文件系统,存储于内存;
  • 可以构建集群: 避免单点故障,3-5台机器就可以组成集群,超过半数,正常工作就能对外提供服务;
  • 顺序访问: 对于每个写请求,zk会分配一个全局唯一的递增编号,利用 这个特性可以实现高级协调服务;
  • 高性能: 基于内存操作,服务于非事务请求,适用于读操作为主的业务 场景。3台zk集群能达到13w QPS;

应用场景

  1. 数据发布订阅
  2. 负载均衡
  3. 命名服务
  4. Master选举
  5. 集群管理
  6. 配置管理
  7. 分布式队列
  8. 分布式锁

二、zookeeper特性

会话(session):客户端与服务端的一次会话连接,本质是TCP长连接,通过会话可以进行心跳检测和数据传输;

数据节点(znode)

  1. 持久节点(PERSISTENT)
  2. 持久顺序节点(PERSISTENT_SEQUENTIAL)
  3. 临时节点(EPHEMERAL)
  4. 临时顺序节点(EPHEMERAL_SEQUENTIAL)
    对于持久节点和临时节点,同一个znode下,节点的名称是唯一的:[center red 20px]
    ZnIZR36.png!web

Watcher 事件监听器:客户端可以在节点上注册监听器,当特定的事件发生后,zk会通知到感兴趣的客户端。

EventType: NodeCreated、NodeDeleted、NodeDataChanged、NodeChildrenChange

ACL:Zk采用ACL(access control lists)策略来控制权限

权限类型:create,read,write,delete,admin

三、zookeeper常用命令

  1. 启动ZK服务: bin/zkServer.sh start
  2. 查看ZK服务状态:bin/zkServer.sh status
  3. 停止ZK服务: bin/zkServer.sh stop
  4. 重启ZK服务: bin/zkServer.sh restart
  5. 客户端连接:zkCli.sh -server 127.0.0.1:2181
  6. 显示目录:ls /
  7. 创建:create /zk “test”
  8. 获得值:get /zk
  9. 修改值:set /zk “test”
  10. 删除:delete /zk
  11. ACL:
    • getAcl / setAcl
    • addauth

四、zookeeper的java客户端

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
</dependency>
<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
 </dependency>
public class App {
    public static void main(String[] args) throws Exception {
        String connectString = "211.159.174.226:2181";

        RetryPolicy retryPolicy = getRetryPolicy();
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, 5000, 5000, retryPolicy);
        client.start();

        //增删改查
        client.create().withMode(CreateMode.PERSISTENT).forPath("/test-Curator-PERSISTENT-nodata");
        client.create().withMode(CreateMode.PERSISTENT).forPath("/test-Curator-PERSISTENT-data", "test-Curator-PERSISTENT-data".getBytes());
        client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-Curator-EPHEMERAL-nodata");
        client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-Curator-EPHEMERAL-data", "/test-Curator-EPHEMERAL-data".getBytes());

        for (int i = 0; i < 5; i++) {
            client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test-Curator-PERSISTENT_SEQUENTIAL-nodata");
        }

        byte[] bytes = client.getData().forPath("/test-Curator-PERSISTENT-data");
        System.out.println("----------zk节点数据:" + new String(bytes) + "------------");

        client.create().withMode(CreateMode.PERSISTENT).forPath("/test-listener", "test-listener".getBytes());
        final NodeCache nodeCache = new NodeCache(client, "/test-listener");
        nodeCache.start();
        NodeCacheListener listener = new NodeCacheListener() {

            @Override
            public void nodeChanged() throws Exception {
                System.out.println("node changed : " + nodeCache.getCurrentData());
            }
        };
        nodeCache.getListenable().addListener(listener);

        client.setData().forPath("/test-listener", "/test-listener-change".getBytes());

    }
    /**
     * RetryOneTime: 只重连一次.
     * RetryNTime: 指定重连的次数N.
     * RetryUtilElapsed: 指定最大重连超时时间和重连时间间隔,间歇性重连直到超时或者链接成功.
     * ExponentialBackoffRetry: 基于"backoff"方式重连,和RetryUtilElapsed的区别是重连的时间间隔是动态的
     * BoundedExponentialBackoffRetry: 同ExponentialBackoffRetry,增加了最大重试次数的控制.
     */
    public static RetryPolicy getRetryPolicy() {
        return new ExponentialBackoffRetry(1000, 3);
    }
}

五、分布式锁

public class ZookeeperLock {

    private final String lockPath = "/distributed-lock";
    private String connectString;
    private RetryPolicy retry;
    private CuratorFramework client;
    private InterProcessLock interProcessMutex;


    public void init() throws Exception {
        connectString = "211.159.174.226:2181";
        retry = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
        client.start();

        //共享可重入锁
        interProcessMutex = new InterProcessMutex(client,lockPath);
    }

    public void lock(){
        try {
            interProcessMutex.acquire();
        } catch (Exception e) {
            System.out.println("锁失败了,真惨");
        }
    }

    public void unlock(){
        try {
            interProcessMutex.release();
        } catch (Exception e) {
            System.out.println("释放失败了,更惨");
        }
    }

    public static void main(String[] args) throws Exception {
        final ZookeeperLock zookeeperLock = new ZookeeperLock();
        zookeeperLock.init();

        Executor executor = Executors.newFixedThreadPool(5);
        for (int i = 0;i<50;i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    zookeeperLock.lock();
                    Long time = System.nanoTime();
                    System.out.println(time);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(time);
                    zookeeperLock.unlock();
                }
            });
        }

        while (true){

        }
    }
}

六、zab协议

  1. ZAB协议所定义的三种节点状态
    • Looking :选举状态。
    • Following :Follower节点(从节点)所处的状态。
    • Leading :Leader节点(主节点)所处状态。
  2. Zxid(64位的数据结构)
    前32位:Leader 周期编号 myid
    低32位:事务的自增序列(单调递增的序列)只要客户端有请求,就+1
    当产生新Leader的时候,就从这个Leader服务器上取出本地log中最大事务zxid,从里面读出epoch+1,作为一个新epoch,并将低32位置0(保证id绝对自增)。
  3. 崩溃恢复
    • 每个server都有一张选票<myid,zxid>,选票投自己。
    • 搜集各个服务器的投票。
    • 比较投票,比较逻辑:优先比较zxid,然后才比较myid。
    • 改变服务器状态(崩溃恢复=》数据同步,或者崩溃恢复=》消息广播)
      ru2mMbN.png!web
  4. 消息广播(类似2P提交):
    • Leader接受请求后,讲这个请求赋予全局的唯一64位自增Id(zxid)。
    • 将zxid作为议案发给所有follower。
    • 所有的follower接受到议案后,想将议案写入硬盘后,马上回复Leader一个ACK(OK)。
    • 当Leader接受到合法数量Acks,Leader给所有follower发送commit命令。
    • follower执行commit命令。
    • PS: :到了这个阶段,ZK集群才正式对外提供服务,并且Leader可以进行消息广播,如果有新节点加入,还需要进行同步。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK