5

zookeeper源码(08)请求处理及数据读写流程 - 用户不存在!

 7 months ago
source link: https://www.cnblogs.com/xugf/p/18020590
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

ServerCnxnFactory

用于接收客户端连接、管理客户端session、处理客户端请求。

ServerCnxn抽象类

代表一个客户端连接对象:

  • 从网络读写数据
  • 数据编解码
  • 将请求转发给上层组件或者从上层组件接收响应
  • 管理连接状态,比如:enableRecv、sessionTimeout、stale、invalid等
  • 保存当前的packetsReceived、packetsSent、lastCxid、lastZxid等
  • 继承了Watcher接口,也可以作为监听器

两个实现类:

  • NIOServerCnxn - 基于NIO
  • NettyServerCnxn - 基于Netty

NIOServerCnxnFactory

基于NIO的非阻塞、多线程的ServerCnxnFactory实现类,多线程之间通过queue通信:

  • 1个accept线程,用来接收客户端连接,交给selector线程处理
  • 1-N个selector线程,每个线程会select 1/N个连接,多个selector线程的原因是,由于有大量连接,select()可能会成为性能瓶颈
  • 0-M个socket IO worker线程,做socket读写,如果配置为0则selector线程来做IO
  • 1个清理线程,用于关闭空闲连接

线程数量分配示例:32核的机器,1accept线程,1个清理线程,4个selector线程,64个worker线程。

configure方法

  • 不支持ssl

  • 创建ConnectionExpirerThread线程

  • 根据CPU核数确定各种线程的数量

    int numCores = Runtime.getRuntime().availableProcessors();
    // 32 cores sweet spot seems to be 4 selector threads
    numSelectorThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
        Math.max((int) Math.sqrt((float) numCores / 2), 1));
    
    // 64
    numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
    
  • 创建SelectorThread线程

  • 创建ServerSocketChannel、启动监听、设置非阻塞

  • 创建AcceptThread线程

start方法

启动acceptThread、selectorThreads、workerPool、expirerThread线程。

acceptThread线程

1个accept线程,用来接收客户端连接,交给selector线程处理:

  1. select查找acceptable的key

  2. doAccept接受连接

    if (key.isAcceptable()) {
        if (!doAccept()) {
            pauseAccept(10);
        }
    }
    
  3. 给sc(SocketChannel)设置非阻塞、验证远程IP连接数不超过maxClientCnxns(60)、获取SelectorThread开始select读写事件

    // Round-robin assign this connection to a selector thread
    if (!selectorIterator.hasNext()) {
        selectorIterator = selectorThreads.iterator();
    }
    SelectorThread selectorThread = selectorIterator.next();
    // 使用队列缓存SocketChannel
    if (!selectorThread.addAcceptedConnection(sc)) {
        throw new IOException("Unable to add connection to selector queue");
    }
    

selectorThread线程

run方法select读写事件、接受客户连接、为key注册"感兴趣"的事件:

  • run方法

    public void run() {
        try {
            while (!stopped) {
                try {
                    select(); // select读写事件
                    processAcceptedConnections(); // 接受客户连接
                    processInterestOpsUpdateRequests();
                } catch (RuntimeException e) {
                } catch (Exception e) {
                }
            }
        }
        // ...
    }
    
  • 接受客户连接会注册OP_READ、创建NIOServerCnxn、绑定到key上面

    private void processAcceptedConnections() {
        SocketChannel accepted;
        while (!stopped && (accepted = acceptedQueue.poll()) != null) {
            SelectionKey key = null;
            try {
                key = accepted.register(selector, SelectionKey.OP_READ);
                NIOServerCnxn cnxn = createConnection(accepted, key, this);
                key.attach(cnxn); // 绑定到key上
                addCnxn(cnxn); // 维护连接层会话
            } catch (IOException e) {
                //  略
            }
        }
    }
    
  • select到读写事件会交给handleIO方法处理

    private void handleIO(SelectionKey key) {
        IOWorkRequest workRequest = new IOWorkRequest(this, key);
        NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
    
        // Stop selecting this key while processing on its connection
        cnxn.disableSelectable();
        key.interestOps(0); // 重置感兴趣的事件,IO处理完成之后会重新注册读写事件
        touchCnxn(cnxn); // 维护连接层会话,刷新过期时间
        workerPool.schedule(workRequest); // workRequest.doWork方法做异步读写
    }
    
  • 为key注册"感兴趣"的事件

    private void processInterestOpsUpdateRequests() {
        SelectionKey key;
        while (!stopped && (key = updateQueue.poll()) != null) {
            NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
            if (cnxn.isSelectable()) {
                key.interestOps(cnxn.getInterestOps());
            }
        }
    }
    

workRequest.doWork方法

workRequest是IOWorkRequest类型对象,doWork会read数据并传递给上层组件:

public void doWork() throws InterruptedException {
    // 略

    if (key.isReadable() || key.isWritable()) {
        cnxn.doIO(key); // 在workerPool线程上执行

        // 略
        touchCnxn(cnxn); // 维护连接层会话,刷新过期时间
    }

    // 略
}

数据包使用 len body 方式传输,read的过程不介绍了,cnxn在read到完整的数据之后会调用readConnectRequest或readRequest方法将数据传递给上层组件:

// 应用层建立连接
private void readConnectRequest() throws IOException, ClientCnxnLimitException {
    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
    // ConnectRequest封装:
    // protocolVersion(0), lastZxidSeen(0), timeOut(3s), sessionId(0), passwd(16位byte), readOnly(F)
    ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
    zkServer.processConnectRequest(this, request);
    initialized = true;
}

protected void readRequest() throws IOException {
    RequestHeader h = new RequestHeader();
    // 请求头,封装客户端xid和type由客户端传递过来
    ByteBufferInputStream.byteBuffer2Record(incomingBuffer, h);
    // 转ByteBufferRequestRecord对象,封装请求字节流
    // readRecord将字节流反序列化为指定的Record实现类对象
    RequestRecord request = RequestRecord.fromBytes(incomingBuffer.slice());
    zkServer.processPacket(this, h, request);
}

NettyServerCnxnFactory

基于Netty的ServerCnxnFactory实现。

CnxnChannelHandler类

核心的网络层处理器,此处记录重要代码:

class CnxnChannelHandler extends ChannelDuplexHandler {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final Channel channel = ctx.channel();
        // 略
        // 创建NettyServerCnxn绑定到channel
        NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this);
        ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);

        // 略
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            try {
                NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
                if (cnxn == null) {
                    LOG.error("channelRead() on a closed or closing NettyServerCnxn");
                } else {
                    // 读取请求数据
                    cnxn.processMessage((ByteBuf) msg);
                }
            } catch (Exception ex) {
                throw ex;
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
}

cnxn读取请求数据

void processMessage(ByteBuf buf) {
    if (throttled.get()) {
        // 略
    } else {
        if (queuedBuffer != null) {
            appendToQueuedBuffer(buf.retainedDuplicate());
            processQueuedBuffer();
        } else {
            receiveMessage(buf); // 解码逻辑在此方法中
            // Have to check !closingChannel, because an error in
            // receiveMessage() could have led to close() being called.
            if (!closingChannel && buf.isReadable()) {
                if (queuedBuffer == null) {
                    queuedBuffer = channel.alloc().compositeBuffer();
                }
                appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
            }
        }
    }
}

read到完整的数据之后会将数据传递给上层组件:

if (initialized) {
    RequestHeader h = new RequestHeader();
    ByteBufferInputStream.byteBuffer2Record(bb, h);
    RequestRecord request = RequestRecord.fromBytes(bb.slice());
    zks.processPacket(this, h, request);
} else {
    // 应用层建立连接
    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
    ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
    zks.processConnectRequest(this, request);
    initialized = true;
}

ZooKeeperServer处理方法

processConnectRequest方法处理连接请求

ZooKeeperServer的processConnectRequest方法用来处理连接请求:

public void processConnectRequest(
        ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {

    long sessionId = request.getSessionId(); // 默认0
    int tokensNeeded = 1;
    // 略

    // ro验证
    if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
        String msg = "Refusing session request for not-read-only client";
        throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
    }
    // 客户端zxid验证
    if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
        String msg = "Refusing session request for client "
                     + cnxn.getRemoteSocketAddress()
                     + " as it has seen zxid 0x"
                     + Long.toHexString(request.getLastZxidSeen())
                     + " our last zxid is 0x"
                     + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
                     + " client must try another server";
        throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
    }
    int sessionTimeout = request.getTimeOut(); // 客户端默认30000
    byte[] passwd = request.getPasswd();
    int minSessionTimeout = getMinSessionTimeout(); // 默认tickTime * 2
    if (sessionTimeout < minSessionTimeout) {
        sessionTimeout = minSessionTimeout;
    }
    int maxSessionTimeout = getMaxSessionTimeout(); // 默认tickTime * 20
    if (sessionTimeout > maxSessionTimeout) {
        sessionTimeout = maxSessionTimeout;
    }
    cnxn.setSessionTimeout(sessionTimeout); // 设置超时时长

    // We don't want to receive any packets until we are sure that the session is setup
    cnxn.disableRecv();

    if (sessionId == 0) {
        // 创建新session
        long id = createSession(cnxn, passwd, sessionTimeout);
    } else {
        validateSession(cnxn, sessionId);
        // 杀掉旧的session和连接
        if (serverCnxnFactory != null) {
            serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
        }
        if (secureServerCnxnFactory != null) {
            secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
        }
        // add新session
        cnxn.setSessionId(sessionId);
        // 返回connect响应
        reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
    }
}

// 重点看一下创建新session
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
    if (passwd == null) {
        passwd = new byte[0];
    }
    long sessionId = sessionTracker.createSession(timeout); // 创建Session返回sessionId
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd); // passwd会赋值给request.passwd
    CreateSessionTxn txn = new CreateSessionTxn(timeout);
    cnxn.setSessionId(sessionId);
    // 给业务层处理器提交createSession请求
    // RequestRecord.fromRecord(txn)返回SimpleRequestRecord对象,封装Record对象
    Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
    submitRequest(si);
    return sessionId;
}

processPacket方法处理业务请求

ZooKeeperServer的processPacket方法用来处理业务请求:

public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {

    cnxn.incrOutstandingAndCheckThrottle(h);

    if (h.getType() == OpCode.auth) {
        // 略
        return;
    } else if (h.getType() == OpCode.sasl) {
        // 略
    } else {
        if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
            return;
        } else {
            Request si = new Request(
                cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
            int length = request.limit();
            // 大请求验证
            if (isLargeRequest(length)) { // 默认返回false
                checkRequestSizeWhenMessageReceived(length);
                si.setLargeRequestSize(length);
            }
            si.setOwner(ServerCnxn.me);
            // 提交给业务层处理器
            submitRequest(si);
        }
    }
}

submitRequest流程

  1. 先把request提交给requestThrottler组件
  2. requestThrottler是一个限流(默认不启用)组件,内部使用队列缓存request,异步线程消费队列,将request提交给业务处理器
  3. 直到submitRequest方法,业务处理才离开workerPool线程
if (request != null) {
    if (request.isStale()) {
        ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
    }
    final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
    ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
    // 默认不限流
    if (shouldThrottleOp(request, elapsedTime)) {
      request.setIsThrottled(true);
      ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
    }
    // 提交
    zks.submitRequestNow(request);
}

submitRequestNow方法将请求提交给业务层处理器:

public void submitRequestNow(Request si) {
    // 略
    try {
        touch(si.cnxn); // 刷新session过期时间
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            setLocalSessionFlag(si);
            firstProcessor.processRequest(si); // 提交给业务层处理器
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            // Update request accounting/throttling limits
            requestFinished(si);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } catch (MissingSessionException e) {
        requestFinished(si);
    } catch (RequestProcessorException e) {
        requestFinished(si);
    }
}

Leader客户端业务层处理器链

在之前的文章已经介绍,leader使用LeaderZooKeeperServer作为服务实现类。

本章节介绍"leader处理客户端请求"的流程。

// 构建处理器链
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor toBeAppliedProcessor =
        new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
    commitProcessor = new CommitProcessor(
        toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
    commitProcessor.start();
    ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
    proposalProcessor.initialize();
    prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
    prepRequestProcessor.start();
    firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

    setupContainerManager();
}
  • FinalRequestProcessor - 处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾

  • ToBeAppliedRequestProcessor - 维护toBeApplied列表,之后必须是FinalRequestProcessor且processRequest必须同步处理

  • CommitProcessor - 等待commit完成之后调用下游RequestProcessor处理器

  • ProposalRequestProcessor - 发起proposal并将Request转发给内部的SyncRequestProcessor和AckRequestProcessor

    public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        // 内部有维护一个SyncRequestProcessor和AckRequestProcessor
        AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
        syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
    
        forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean(
                FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED);
    }
    
  • PrepRequestProcessor - 通常位于RequestProcessor链开头,为更新请求关联的事务做设置

  • LeaderRequestProcessor - 负责执行本地会话升级,只有直接提交给leader的Request才能通过这个处理器

LeaderRequestProcessor

public void processRequest(Request request) throws RequestProcessorException {
    // 略

    // 默认不支持localSession
    Request upgradeRequest = null;
    try {
        upgradeRequest = lzks.checkUpgradeSession(request);
    } catch (KeeperException ke) {
        // 略
    } catch (IOException ie) {
        // 略
    }
    // 此处upgradeRequest==null
    if (upgradeRequest != null) {
        nextProcessor.processRequest(upgradeRequest);
    }
    // 调用下游processor
    nextProcessor.processRequest(request);
}

PrepRequestProcessor

事务设置:

  • 使用队列缓存request
  • 消费线程从队列拉request设置事务

run方法

public void run() {
    try {
        while (true) {
            ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
            Request request = submittedRequests.take();
            ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
                .add(Time.currentElapsedTime() - request.prepQueueStartTime);
            // 略
            if (Request.requestOfDeath == request) {
                break;
            }

            request.prepStartTime = Time.currentElapsedTime();
            pRequest(request);
        }
    } catch (Exception e) {
        handleException(this.getName(), e);
    }
}

protected void pRequest(Request request) throws RequestProcessorException {
    request.setHdr(null);
    request.setTxn(null);

    if (!request.isThrottled()) {
      pRequestHelper(request);
    }

    request.zxid = zks.getZxid(); // zxid
    long timeFinishedPrepare = Time.currentElapsedTime();
    ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(timeFinishedPrepare - request.prepStartTime);
    nextProcessor.processRequest(request); // 调用下游processor
    ServerMetrics.getMetrics().PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - timeFinishedPrepare);
}

pRequestHelper方法

private void pRequestHelper(Request request) {
    try {
        switch (request.type) {
        case OpCode.createContainer:
        case OpCode.create:
        case OpCode.create2:
            // 创建节点请求封装path、data、acl、flag
            CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
            // zks.getNextZxid()获取递增zxid
            pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
            break;
        case OpCode.createTTL:
            // 创建ttl请求封装path、data、acl、flag、ttl
            CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
            break;
        case OpCode.deleteContainer:
            // 封装path
            DeleteContainerRequest deleteContainerRequest =
                request.readRequestRecord(DeleteContainerRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
            break;
        case OpCode.delete:
            // 删除节点请求封装path、version
            DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
            break;
        case OpCode.setData:
            // 设置节点数据请求封装path、data、version
            SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
            break;
        case OpCode.reconfig:
            // reconfig请求封装joiningServers、leavingServers、newMembers、curConfigId
            ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest);
            break;
        case OpCode.setACL:
            // 设置acl请求封装path、acl、version
            SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
            break;
        case OpCode.check:
            // check请求封装path、version
            CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
            break;
        case OpCode.multi:
            // 遍历 逐个pRequest2Txn
            // pRequest2Txn(op.getType(), zxid, request, subrequest)
            // 封装MultiTxn
            break;

        // create/close session don't require request record
        case OpCode.createSession:
        case OpCode.closeSession:
            if (!request.isLocalSession()) { // 非本地会话
                pRequest2Txn(request.type, zks.getNextZxid(), request, null);
            }
            break;

        // All the rest don't need to create a Txn - just verify session
        case OpCode.sync:
        // sync,exists,getData,getACL,getChildren,getAllChildrenNumber,getChildren2,ping
        // setWatches,setWatches2,checkWatches,removeWatches,getEphemerals,multiRead,addWatch
        case OpCode.whoAmI:
            zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
            break;
        default:
            break;
        }
    } catch (KeeperException e) {
        // 略
    } catch (Exception e) {
        // 略
    }
}

pRequest2Txn方法流程

代码量大,仅对重点的业务类型做简单分析。

该方法首先会为request设置TxnHeader信息:

if (request.getHdr() == null) {
    request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));
}

TxnHeader封装事务请求头:

public class TxnHeader implements Record {
  private long clientId; // 会话ID
  private int cxid; // 客户端xid
  private long zxid; // 服务端xid
  private long time;
  private int type; // 操作类型
}

pRequest2Txn - create相关操作

create/create2/createTTL/createContainer操作:

  1. 从flags创建createMode并验证ttl和ephemeral

  2. 验证acl

    zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
    
  3. 生成顺序节点path

    int parentCVersion = parentRecord.stat.getCversion();
    if (createMode.isSequential()) {
        // 形如/users/admin0000000001
        path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
    }
    validatePath(path, request.sessionId);
    
  4. request.setTxn

    int newCversion = parentRecord.stat.getCversion() + 1;
    if (type == OpCode.createContainer) {
        request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
    } else if (type == OpCode.createTTL) {
        request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
    } else {
        request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
    }
    
  5. 获取ephemeralOwner

    TxnHeader hdr = request.getHdr();
    long ephemeralOwner = 0;
    if (createMode.isContainer()) {
        ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
    } else if (createMode.isTTL()) {
        ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
    } else if (createMode.isEphemeral()) {
        ephemeralOwner = request.sessionId;
    }
    
  6. addChangeRecord

    StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
    parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
    parentRecord.childCount++;
    parentRecord.stat.setCversion(newCversion);
    parentRecord.stat.setPzxid(request.getHdr().getZxid());
    parentRecord.precalculatedDigest = precalculateDigest(
            DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
    addChangeRecord(parentRecord); // 维护outstandingChanges集
    ChangeRecord nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL);
    nodeRecord.data = data;
    nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s);
    setTxnDigest(request, nodeRecord.precalculatedDigest);
    addChangeRecord(nodeRecord); // 维护outstandingChanges集
    

pRequest2Txn - delete操作

  1. 验证acl和version

    checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path);
    
    // stat.version与delete.version需要一致
    
  2. 验证没有子节点,有子节点无法删除

  3. 创建DeleteTxn

    request.setTxn(new DeleteTxn(path));
    
  4. addChangeRecord

    parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
    parentRecord.childCount--;
    parentRecord.stat.setPzxid(request.getHdr().getZxid());
    parentRecord.precalculatedDigest = precalculateDigest(
            DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
    addChangeRecord(parentRecord); // 维护outstandingChanges集
    
    nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
    nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
    setTxnDigest(request, nodeRecord.precalculatedDigest);
    addChangeRecord(nodeRecord); // 维护outstandingChanges集
    

pRequest2Txn - setData操作

  1. 验证acl、获取newVersion

  2. 创建SetDataTxn

    request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
    
  3. addChangeRecord

    nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
    nodeRecord.stat.setVersion(newVersion);
    nodeRecord.stat.setMtime(request.getHdr().getTime());
    nodeRecord.stat.setMzxid(zxid);
    nodeRecord.data = setDataRequest.getData();
    nodeRecord.precalculatedDigest = precalculateDigest(
            DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
    setTxnDigest(request, nodeRecord.precalculatedDigest);
    addChangeRecord(nodeRecord);
    

pRequest2Txn - setACL操作

  1. 验证acl、获取newVersion

  2. 创建SetACLTxn

    request.setTxn(new SetACLTxn(path, listACL, newVersion));
    
  3. addChangeRecord

pRequest2Txn - createSession操作

CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new);
request.setTxn(createSessionTxn);
// only add the global session tracker but not to ZKDb
zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
zks.setOwner(request.sessionId, request.getOwner());

pRequest2Txn - closeSession操作

long startTime = Time.currentElapsedTime();
synchronized (zks.outstandingChanges) {
    // 获取所有临时节点
    Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
    for (ChangeRecord c : zks.outstandingChanges) {
        if (c.stat == null) {
            // Doing a delete
            es.remove(c.path);
        } else if (c.stat.getEphemeralOwner() == request.sessionId) {
            es.add(c.path);
        }
    }
    for (String path2Delete : es) {
        if (digestEnabled) {
            parentPath = getParentPathAndValidate(path2Delete);
            parentRecord = getRecordForPath(parentPath);
            parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
            parentRecord.stat.setPzxid(request.getHdr().getZxid());
            parentRecord.precalculatedDigest = precalculateDigest(
                    DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
            addChangeRecord(parentRecord);
        }
        nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null);
        nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path2Delete);
        addChangeRecord(nodeRecord);
    }
    if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
        request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
    }
    zks.sessionTracker.setSessionClosing(request.sessionId);
}

ProposalRequestProcessor

processRequest方法

public void processRequest(Request request) throws RequestProcessorException {
    if (request instanceof LearnerSyncRequest) { // 处理sync命令,后续补充sync命令分析
        zks.getLeader().processSync((LearnerSyncRequest) request);
    } else {
        if (shouldForwardToNextProcessor(request)) {
            nextProcessor.processRequest(request); // 调用下游processor(CommitProcessor)
        }
        if (request.getHdr() != null) { // 事务消息需要发proposal、写磁盘
            // We need to sync and get consensus on any transactions
            try {
                zks.getLeader().propose(request); // 给follower发proposal
            } catch (XidRolloverException e) {
                throw new RequestProcessorException(e.getMessage(), e);
            }
            // 该对象的nextProcessor是AckRequestProcessor
            syncProcessor.processRequest(request);
        }
    }
}

发proposal

发起一个proposal并发给所有成员:

public Proposal propose(Request request) throws XidRolloverException {
    // zxid的低32位满了,强制重新选举,生成新一轮epoch和zxid
    if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
        String msg =
            "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
        shutdown(msg);
        throw new XidRolloverException(msg);
    }

    // 序列化
    byte[] data = request.getSerializeData();
    proposalStats.setLastBufferSize(data.length);
    // 封装数据包
    QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);

    // 封装Proposal对象
    Proposal p = new Proposal();
    p.packet = pp;
    p.request = request;

    synchronized (this) {
        p.addQuorumVerifier(self.getQuorumVerifier());

        if (request.getHdr().getType() == OpCode.reconfig) {
            // 此处会把lastSeenQuorumVerifier写入zoo.cfg.dynamic.next文件
            self.setLastSeenQuorumVerifier(request.qv, true);
        }

        if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) {
            p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
        }

        lastProposed = p.packet.getZxid();
        // 缓存到outstandingProposals中,processAck时会根据quorum状态确定是否提交
        outstandingProposals.put(lastProposed, p);
        // 给follower发数据
        sendPacket(pp);
    }
    ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
    return p;
}

syncProcessor.processRequest方法

processRequest方法将request放入queuedRequests队列,异步线程消费做业务处理:

  1. 从queuedRequests拉request

  2. 写txnlog

    zks.getZKDatabase().append(si);
    
  3. 滚动txnlog文件、生成snapshot文件

    // 默认当logCount超过了50000或logSize超过2GB时触发
    if (shouldSnapshot()) {
        resetSnapshotStats();
        // 滚动txnlog文件
        zks.getZKDatabase().rollLog();
        // 生成snapshot文件
        if (!snapThreadMutex.tryAcquire()) {
            LOG.warn("Too busy to snap, skipping");
        } else {
            // 异步线程生成snapshot文件
            new ZooKeeperThread("Snapshot Thread") {
                public void run() {
                    try {
                        zks.takeSnapshot();
                    } catch (Exception e) {
                    } finally {
                        snapThreadMutex.release();
                    }
                }
            }.start();
        }
    }
    
  4. 之后会把request传递给nextProcessor(AckRequestProcessor对象)

AckRequestProcessor

public void processRequest(Request request) {
    QuorumPeer self = leader.self;
    if (self != null) {
        request.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
        leader.processAck(self.getMyId(), request.zxid, null);
    }
}

processAck

Keep a count of acks that are received by the leader for a particular proposal.

public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {

    // 略

    if ((zxid & 0xffffffffL) == 0) {
        // We no longer process NEWLEADER ack with this method. However,
        // the learner sends an ack back to the leader after it gets
        // UPTODATE, so we just ignore the message.
        return;
    }

    if (outstandingProposals.size() == 0) {
        return;
    }
    // 说明zxid的数据已经提交
    if (lastCommitted >= zxid) {
        // The proposal has already been committed
        return;
    }
    Proposal p = outstandingProposals.get(zxid);
    if (p == null) {
        return;
    }

    // 略

    p.addAck(sid); // 添加ack

    boolean hasCommitted = tryToCommit(p, zxid, followerAddr);

    // reconfig类型命令的特殊处理,略
}

tryToCommit方法会判断quorum状态,即超过半数ack,如果到了quorum状态:

  1. 从outstandingProposals集移除

  2. 加入到toBeApplied集

  3. 给follower发COMMIT

    public void commit(long zxid) {
        synchronized (this) {
            lastCommitted = zxid;
        }
        QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
        sendPacket(qp); // 发给follower
    }
    
  4. zk.commitProcessor.commit(p.request);
    
    // 会进入commitProcessor的committedRequests队列
    

CommitProcessor

processRequest方法

本地写磁盘之后即调用此方法:

  1. 把request提交到queuedRequests队列
  2. 写请求提交到queuedWriteRequests队列
public void processRequest(Request request) {
    request.commitProcQueueStartTime = Time.currentElapsedTime();
    queuedRequests.add(request); // 所有请求
    // If the request will block, add it to the queue of blocking requests
    if (needCommit(request)) { // 写请求
        queuedWriteRequests.add(request);
        numWriteQueuedRequests.incrementAndGet();
    } else {
        numReadQueuedRequests.incrementAndGet();
    }
    wakeup();
}

commit方法

follower对proposal到了quorum状态后,会使用这个方法提交事务,然后会将事务写到ZKDatabase中。

public void commit(Request request) {
    request.commitRecvTime = Time.currentElapsedTime();
    ServerMetrics.getMetrics().COMMITS_QUEUED.add(1);
    committedRequests.add(request); // 进committedRequests队列
    wakeup();
}

run方法

对比queuedRequests、queuedWriteRequests、committedRequests这几个队列,将提交成功的请求或读请求转发给下游的ToBeAppliedRequestProcessor处理器。

ToBeAppliedRequestProcessor

维护toBeApplied列表:清理已提交成功的request数据。

FinalRequestProcessor

处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾。

  1. 区分OpCode执行对应逻辑,发回响应
  2. 使用cnxn把响应返回给客户端
if (!request.isThrottled()) {
  rc = applyRequest(request);
}

// ProcessTxnResult rc = zks.processTxn(request);

createSession操作

zks.finishSessionInit(request.cnxn, true);

closeSession操作

给客户端发closeConn数据包:

cnxn.sendCloseSession();

create相关操作

create、create2、createTTL、createContainer操作,创建对应的Response对象。

delete相关操作

setData操作

返回SetDataResponse响应。

setACL操作

返回SetACLResponse响应。

getData操作

private Record handleGetDataRequest(
        Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
    GetDataRequest getDataRequest = (GetDataRequest) request;
    String path = getDataRequest.getPath();
    DataNode n = zks.getZKDatabase().getNode(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    // 检查权限
    zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
    Stat stat = new Stat();
    // 查询数据、addWatcher
    byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
    return new GetDataResponse(b, stat);
}

setWatches相关操作

setWatches、setWatches2操作:

SetWatches2 setWatches = request.readRequestRecord(SetWatches2::new);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
        setWatches.getDataWatches(),
        setWatches.getExistWatches(),
        setWatches.getChildWatches(),
        setWatches.getPersistentWatches(),
        setWatches.getPersistentRecursiveWatches(),
        cnxn);

addWatch操作

AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());

removeWatches操作

RemoveWatchesRequest removeWatches = request.readRequestRecord(RemoveWatchesRequest::new);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);

getChildren相关操作

getChildren、getChildren2操作:

GetChildren2Request getChildren2Request = request.readRequestRecord(GetChildren2Request::new);
Stat stat = new Stat();
path = getChildren2Request.getPath();
DataNode n = zks.getZKDatabase().getNode(path);

zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n),
             ZooDefs.Perms.READ, request.authInfo, path, null);
List<String> children = zks.getZKDatabase()
                           .getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);

Follower处理Leader数据

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(
        finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
    ((FollowerRequestProcessor) firstProcessor).start();

    syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
    syncProcessor.start();
}

处理器链:

FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor

commitProcessor和syncProcessor处理leader的proposal和commit请求。

processPacket方法

在"zookeeper源码(08)leader、follower和observer"中已经介绍,Follower使用processPacket方法处理来自leader的数据包:

protected void processPacket(QuorumPacket qp) throws Exception {
    switch (qp.getType()) {
    case Leader.PING:
        ping(qp);
        break;
    case Leader.PROPOSAL: // 提案
        ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
        TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
        TxnHeader hdr = logEntry.getHeader();
        Record txn = logEntry.getTxn();
        TxnDigest digest = logEntry.getDigest();
        // 略
        lastQueued = hdr.getZxid();

        // 略

        // 记录log数据
        // 使用syncProcessor持久化log数据,之后给leader发ack
        fzk.logRequest(hdr, txn, digest);
        // 略
        if (om != null) {
            // 略
        }
        break;
    case Leader.COMMIT: // 提交
        ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
        fzk.commit(qp.getZxid()); // 使用commitProcessor提交log
        if (om != null) {
            // 略
        }
        break;
    case Leader.COMMITANDACTIVATE: // Similar to COMMIT, only for a reconfig operation
        // get the new configuration from the request
        Request request = fzk.pendingTxns.element();
        SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
        QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));

        // get new designated leader from (current) leader's message
        ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
        long suggestedLeaderId = buffer.getLong();
        final long zxid = qp.getZxid();
        boolean majorChange = self.processReconfig(qv, suggestedLeaderId, zxid, true);
        // commit (writes the new config to ZK tree (/zookeeper/config)
        fzk.commit(zxid);

        // 略
        break;
    case Leader.UPTODATE:
        // leader告知follower已处于最新状态,可以开始响应客户端
        // 正常情况下不应该再出现该类型请求
        break;
    case Leader.REVALIDATE:
        if (om == null || !om.revalidateLearnerSession(qp)) {
            revalidate(qp);
        }
        break;
    case Leader.SYNC:
        fzk.sync(); // sync命令
        break;
    default:
        LOG.warn("Unknown packet type");
        break;
    }
}

处理PROPOSAL

syncProcessor.processRequest方法

processRequest方法将request放入queuedRequests队列,异步线程消费做业务处理。

在本地持久化之后,调用下游处理器(SendAckRequestProcessor对象)。

SendAckRequestProcessor

public void processRequest(Request si) {
    if (si.type != OpCode.sync) {
        // 确认zxid已持久化
        QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
        try {
            si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
            learner.writePacket(qp, false);
        } catch (IOException e) {
            // learner.sock.close();
        }
    }
}

public void flush() throws IOException {
    try {
        learner.writePacket(null, true);
    } catch (IOException e) {
        // learner.sock.close();
    }
}

处理COMMIT

提交给commitProcessor处理器,该处理器会继续向下游(FinalRequestProcessor)传递。

FinalRequestProcessor

上文已经介绍,此处省略。

Observer处理Leader数据

protected void setupRequestProcessors() {
    // We might consider changing the processor behaviour of
    // Observers to, for example, remove the disk sync requirements.
    // Currently, they behave almost exactly the same as followers.
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(
        finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
    ((ObserverRequestProcessor) firstProcessor).start();

    if (syncRequestProcessorEnabled) {
        syncProcessor = new SyncRequestProcessor(this, null);
        syncProcessor.start();
    }
}

processPacket方法

protected void processPacket(QuorumPacket qp) throws Exception {
    TxnLogEntry logEntry;
    TxnHeader hdr;
    TxnDigest digest;
    Record txn;
    switch (qp.getType()) {
    case Leader.PING:
        ping(qp);
        break;
    case Leader.PROPOSAL:
        LOG.warn("Ignoring proposal");
        break;
    case Leader.COMMIT:
        LOG.warn("Ignoring commit");
        break;
    case Leader.UPTODATE:
        LOG.error("Received an UPTODATE message after Observer started");
        break;
    case Leader.REVALIDATE:
        revalidate(qp);
        break;
    case Leader.SYNC:
        ((ObserverZooKeeperServer) zk).sync();
        break;
    case Leader.INFORM:
        ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
        logEntry = SerializeUtils.deserializeTxn(qp.getData());
        hdr = logEntry.getHeader();
        txn = logEntry.getTxn();
        digest = logEntry.getDigest();
        Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
        request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
        request.setTxnDigest(digest);
        ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;
        obs.commitRequest(request);
        break;
    case Leader.INFORMANDACTIVATE: // 处理reconfig请求
        // get new designated leader from (current) leader's message
        ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
        long suggestedLeaderId = buffer.getLong();

        byte[] remainingdata = new byte[buffer.remaining()];
        buffer.get(remainingdata);
        logEntry = SerializeUtils.deserializeTxn(remainingdata);
        hdr = logEntry.getHeader();
        txn = logEntry.getTxn();
        digest = logEntry.getDigest();
        QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) txn).getData(), UTF_8));

        request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
        request.setTxnDigest(digest);
        obs = (ObserverZooKeeperServer) zk;

        boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);

        obs.commitRequest(request);

        if (majorChange) {
            throw new Exception("changes proposed in reconfig");
        }
        break;
    default:
        LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
        break;
    }
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK