6

zookeeper 实现分布式锁安全用法

 3 years ago
source link: https://www.cnblogs.com/wangiqngpei557/p/10323149.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

zookeeper 实现分布式锁安全用法

标签: zookeeper sessionExpire connectionLoss 分布式锁


  • ConnectionLoss 链接丢失
  • SessionExpired 会话过期
  • 绕开 zookeeper broker 进行状态通知
  • leader 选举与zkNode 断开
  • 静态扩容、动态扩容

分布式锁现在用的越来越多,通常用来协调多个并发任务。在一般的应用场景中存在一定的不安全用法,不安全用法会带来多个master在并行执行,业务或数据可能存在重复计算带来的副作用,在没有拿到lock的情况下扮演者master等诸如此类。

要想准确的拿到分布式锁,并且准确的捕获在分布式情况下锁的动态转移状态,需要处理网络变化带来的连锁反应。比如常见的 session expire、connectionLoss,在设置lock状态的时候我们如何保证准确拿到lock。

在设计任务的时候我们需要具有 stop point 的策略,这个策略是用来在感知到lock丢失后能够交付执行权的机制。但是是否需要这么严肃的处理这个问题还取决于业务场景,比如下游的任务已经做好幂等也无所谓重复计算。 但是在有些情况下确实需要严肃精准控制。

ConnectionLoss 链接丢失

先说第一个场景,connectionLoss事件,此事件表示提交的commit有可能执行成功也有可能执行失败,成功是指在zookeeper broker 中执行成功但是返回的时候tcp断开了,导致未能拿到返回的状态。失败是指根本就没有提交到zookeper broker中链接就断开了。

所以在我们获取lock的时候需要做 connectionLoss 事件处理,我们看个例子。

protected void runForMaster() {

        logger.info("master:run for master.");

        AsyncCallback.StringCallback createCallback =
                (rc, path, ctx, name) -> {
                    switch (KeeperException.Code.get(rc)) {
                        case CONNECTIONLOSS:
                            checkMaster();//链接失效检查znode设置是否成功
                            return;
                        case OK:
                            isLeader = true;
                            logger.info("master:I'm the leader serverId:" + serverId);
                            addMasterWatcher();//监控 master znode
                            this.takeLeadership();//执行leader权利
                            break;
                        case NODEEXISTS:
                            isLeader = false;
                            String serverId = this.getMasterServerId();
                            this.takeBackup(serverId);
                            break;

                    }
                };

        zk.create(rootPath + "/master", serverId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL, createCallback, null);//创建master节点
    }

    /**
     * check master 循环检查
     */
    private void checkMaster() {

        AsyncCallback.DataCallback masterCheckCallback =
                (rc, path, ctx, data[], stat) -> {
                    switch (KeeperException.Code.get(rc)) {
                        case CONNECTIONLOSS:
                            checkMaster();
                            return;
                        case NONODE:
                            runForMaster();
                            return;
                        default: {
                            String serverId = this.getMasterServerId();
                            isLeader = serverId.equals(this.serverId);
                            if (BooleanUtils.isNotTrue(isLeader)) {
                                this.takeBackup(serverId);
                            } else {
                                this.takeLeadership();
                            }
                        }

                        return;
                    }
                };

        zk.getData(masterZnode, false, masterCheckCallback, null);
    }

这里的master表示具有执行权,只有成功拿到master 角色才能履行master权利。

runForMaster 方法一旦发现有connectionLoss就发起checkMaster进行检查,同时checkMaster方法中也进行connectinLoss检查,直到拿到明确的状态为止。在此时有可能有另外的节点获取到了master角色,那么当前节点就做好backup等待机会。

我们需要捕获zookeeper所有的状态变化,要知道master什么时候失效做好申请准备,当自己是master时候会话失效需要释放master权利。

/**
     * 监控 master znode 做 master/slave 切换
     */
    private void addMasterWatcher() {

        AsyncCallback.StatCallback addMasterWatcher = (rc, path, ctx, stat) -> {
            switch (KeeperException.Code.get(rc)) {
                case CONNECTIONLOSS:
                    addMasterWatcher();
                    break;
                case OK:
                    if (stat == null) {
                        runForMaster();//master 已经不存在
                    } else {
                        logger.info("master:watcher master znode ok.");
                    }
                    break;
                case NONODE:
                    logger.info("master:master znode delete.");
                    runForMaster();
                    break;
            }
        };

        zk.exists(masterZnode, MasterExistsWatcher, addMasterWatcher, null);
    }

通过zookeeper watcher 机制来进行状态监听,保持与网络、zookeeper状态变化联动。

SessionExpired 会话过期

我们在来看第二个问题,第一个问题是获取lock的时候如何保证一定可以准确拿到状态,这里状态是指master角色或者backup角色。

当我们成功与zookeeper broker建立链接,成功获取到master角色并且正在履行master义务时突然zookeeper通知session过期,SessionExpired事件表示zookeeper将会删除所有当前会话创建的临时znode,也就意味这master znode将会被其他会话创建。

此时我们需要将自己的master 权利交出去,也就是我们必须放下目前手上执行的任务,这个停止的状态必须能够反应到全局。此时最容易出现到问题就是,我们已经不是master了但是还在偷偷到执行master权利,通过dashboard会看到很奇怪的问题,不是master的服务器还在执行。

case SESSIONEXPIRED:
    //执行 stop point 通知
    this.stopPoint();
    break;

所以这里需要我们在设计任务时有stop point 策略,类似jvm的safe point,随时响应全局停止。

绕开 zookeeper broker 进行状态通知

还有一种常见的使用方式是绕开zookeeper 来做状态通知。

我们都知道zookeeper cluster 是由多台实例组成,每个实例都在全国甚至全球的不同地方,leader到这些节点之间都有很大的同步延迟差异,zookeeper内部采用法定人数的两阶段提交的方式来完成一次commit。

比如有7个实例构成一套zookeeper cluster ,当一次client 写入 commit只需要集群中有超过半数完成写入就算这次commit提交成功了。但是cleint得到这个提交成功的响应之后立马执行接下来的任务,这个任务可能是读取某个znode下的所有状态数据,此时有可能无法读取到这个状态。

如果是分布式锁的话很有可能是锁在zk集群中的转移无法和client集群保持一直。所以只要是基于zookeeper做集群调度就要完全原来zookeeper来做状态通知,不可以绕开zookeeper来自行调度。

leader 选举与zkNode 断开

zookeeper leader 是所有状态变更的串行化器,add、update、delete都需要leader来处理,然后传播给所有follower、observer节点。

所有的session是保存在leader中的,所有的watcher是保存在client链接的zookeper node中的,这里两个场景都会导致状态迁移的通知不准时。

如果zookeeper是由多数据中心构成的一套集群,存在异地同步延迟的问题,leader是肯定会放在写入的数据中心中,同时zid应该是最大的,甚至是一组高zid的机器都在写入的数据中心中,这样保证leader宕机也不会轻易导致leader选举到其他数据中心。

但是follower、observer都会有client在使用,也会有在这些节点进行协调的分布式集群。

先说leader选举导致异地节点延迟感知问题,比如当前 zookeeper cluster 有7台机器构成:

dataCenter shanghai:zid=100、zid=80、zid=50
dataCenter beijing: zid=10、zid=20
dataCenter shenzhen:zid=30、zid=40

由于网络问题集群发生leader选举,zid=100暂时脱离集群,zid=80成为leader,这里不考虑日志新旧问题,优先使用zid进行选举。

由于集群中所有的session是保存在原来zid=100的机器中的,新leader没有任何session信息,所以将导致所有session丢失。

session的保持时间是取决于我们设置的sessinoTimeout时间来的,client通过ping来将心跳传播到所链接的zkNode,这个zkNode可能是任意角色的node,然后zkNode在与zkleaderNode进行心跳来保持会话,同时zkNode也会通过ping来保持会话超时时间。

此时当原有当client在重新链接上zkNode时会被告知sessionExpired。sessionExpired 是由zkNode通知出来的,当会话丢失或者过期,client在去尝试链接zkNode时候会被zkNode告知会话过期。

如果client只捕获了sessionExpired显然会出现多个master运行情况,因为当你与zkNode断开到时候,当时还没有收到sessionExpired事件时,已经有另外client成功创建master拿到权利。

这种情况在zkNode出现脱离集群当时候也会出现,当zkNode断开之后也会出现sessionExpired延迟通知问题。所有的watcher都是需要在新的zkNode上创建才会收到新的事件。

静态扩容、动态扩容

在极端情况下静态扩容可能会导致zookeeper集群出现严重的数据不一致问题,比如现有集群:A、B、C,现在需要进行静态扩容,停止ABC实例,拉入DE实例,此时如果C实例是ABC中最滞后的实例,如果AB启动的速度没有C快就会导致CDE组成新的集群,新的纪元号会覆盖原来的AB日志。当然现在基本上不会接受静态扩容,基本上都是动态扩容。

动态扩容在极端情况下也会出现类似问题,比如现在有三个机房,1、2、3,1机房方leader zid=200、100,2机房zid=80、50,3机房zid=40,假设上次的commit是在zid=200、100、50之间提交的,此时机房1出现断网,2机房zid=80、50与3机房zid=40开始组成新的集群,新的纪元在zid=50上产生。

在使用zookeeper来实现分布式锁或者集群调度的时候会出现很多分布式下的问题,为了保证这些问题的出现不会带来业务系统或者业务数据的不一致,我们还是在这些任务上做好幂等性考虑。

比如进行数据的计算,做个时间检查,版本检查之类的。如果本身是基于zookeeper实现的一套独立的分布式系统需要的工作会更多点。

作者:王清培 (沪江集团资深架构师)

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK