Skip to content

源码之ZK服务端启动

1.zk数据存储机制

Alt text

  1. zk中的数据模型,是一棵树,DataTree,每个节点,叫做DataNode。
  2. zk集群中的 DataTree 时刻保持状态同步。
  3. Zookeeper 集群中每个zk节点中,数据在内存和磁盘中都有一份完整的数据。
    • 内存数据:DataTree
    • 磁盘数据:快照文件 + 编辑日志

2. zk的服务端start()方法流程

Alt text

3. 冷启动数据恢复快照数据

  1. 启动集群
    QuorumPeerMain.java
java
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
    ......

    LOG.info("Starting quorum peer, myid=" + config.getServerId());
    ......
    try {
        ServerMetrics.metricsProviderInitialized(metricsProvider);
        ProviderRegistry.initialize();
        ServerCnxnFactory cnxnFactory = null;
        ServerCnxnFactory secureCnxnFactory = null;

        if (config.getClientPortAddress() != null) {
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
        }
        // 通信组件初始化,默认是 NIO 通信
        if (config.getSecureClientPortAddress() != null) {
            secureCnxnFactory = ServerCnxnFactory.createFactory();
            secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
        }
         // 把解析的参数赋值给该Zookeeper节点
        quorumPeer = getQuorumPeer();
        quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
        quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
        quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
        //quorumPeer.setQuorumPeers(config.getAllMembers());
        quorumPeer.setElectionType(config.getElectionAlg());
        quorumPeer.setMyid(config.getServerId());
        quorumPeer.setTickTime(config.getTickTime());
        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
        quorumPeer.setInitLimit(config.getInitLimit());
        quorumPeer.setSyncLimit(config.getSyncLimit());
        quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
        quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
        quorumPeer.setConfigFileName(config.getConfigFilename());
        quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
        // 管理 zk 数据的存储
        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
        quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
        if (config.getLastSeenQuorumVerifier() != null) {
            quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
        }
        quorumPeer.initConfigInZKDatabase();
        // 管理 zk 的通信
        quorumPeer.setCnxnFactory(cnxnFactory);
        quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
        quorumPeer.setSslQuorum(config.isSslQuorum());
        quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
        quorumPeer.setLearnerType(config.getPeerType());
        quorumPeer.setSyncEnabled(config.getSyncEnabled());
        quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
        ......
        quorumPeer.initialize();

        ......
        // 启动 zk
        quorumPeer.start();
        ZKAuditProvider.addZKStartStopAuditLog();
        quorumPeer.join();
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Quorum Peer interrupted", e);
    } finally {
        try {
            metricsProvider.stop();
        } catch (Throwable error) {
            LOG.warn("Error while stopping metrics", error);
        }
    }
}
  1. 冷启动恢复数据
    QuorumPeer.java
java
public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    // 冷启动数据恢复
    loadDataBase();
    startServerCnxnFactory();
    try {
        // 启动通信工厂实例对象
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
    }
    // 准备选举环境
    startLeaderElection();
    startJvmPauseMonitor();
    // 执行选举
    super.start();
}
  1. 加载数据
java
private void loadDataBase() {
    try {
        // 加载磁盘数据到内存,恢复 DataTree,操作分两种:事务操作和非事务操作
        zkDb.loadDataBase();

        // load the epochs
        long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
        long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
        try {
            currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
        } catch (FileNotFoundException e) {
            // pick a reasonable epoch number
            // this should only happen once when moving to a
            // new code version
            currentEpoch = epochOfZxid;
            LOG.info(
                "{} not found! Creating with a reasonable default of {}. "
                    + "This should only happen when you are upgrading your installation",
                CURRENT_EPOCH_FILENAME,
                currentEpoch);
            writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
        }
        if (epochOfZxid > currentEpoch) {
            // acceptedEpoch.tmp file in snapshot directory
            File currentTmp = new File(getTxnFactory().getSnapDir(),
                CURRENT_EPOCH_FILENAME + AtomicFileOutputStream.TMP_EXTENSION);
            if (currentTmp.exists()) {
                long epochOfTmp = readLongFromFile(currentTmp.getName());
                LOG.info("{} found. Setting current epoch to {}.", currentTmp, epochOfTmp);
                setCurrentEpoch(epochOfTmp);
            } else {
                throw new IOException(
                    "The current epoch, " + ZxidUtils.zxidToString(currentEpoch)
                        + ", is older than the last zxid, " + lastProcessedZxid);
            }
        }
        try {
            acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
        } catch (FileNotFoundException e) {
            // pick a reasonable epoch number
            // this should only happen once when moving to a
            // new code version
            acceptedEpoch = epochOfZxid;
            LOG.info(
                "{} not found! Creating with a reasonable default of {}. "
                    + "This should only happen when you are upgrading your installation",
                ACCEPTED_EPOCH_FILENAME,
                acceptedEpoch);
            writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
        }
        if (acceptedEpoch < currentEpoch) {
            throw new IOException("The accepted epoch, "
                                    + ZxidUtils.zxidToString(acceptedEpoch)
                                    + " is less than the current epoch, "
                                    + ZxidUtils.zxidToString(currentEpoch));
        }
    } catch (IOException ie) {
        LOG.error("Unable to load database on disk", ie);
        throw new RuntimeException("Unable to run quorum server ", ie);
    }
}
  1. 加载磁盘数据到内存,恢复DataTree
    数据恢复过程:
    (1)从快照文件中恢复大部分数据,并得到一个lastProcessZXid。
    (2)再从编辑日志中执行replay,执行到最后一条日志并更新 lastProcessZXid。
    (3)最终得到,datatree和lastProcessZXid,表示数据恢复完成。
java
private void loadDataBase() {
    try {
        // zk恢复数据的操作分两种:事务操作和非事务操作
        zkDb.loadDataBase();
        // 1.事务操作:zk.cteate()方法;都会被分配一个全局唯一的zxid,zxid一共64位:(前32位:epoch每个leader任期的代号;后32位:txid 为事务id)
        // 2. 非事务操作:zk.getData()方法
        // load the epochs
        long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
        long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
        try {
            currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
        } catch (FileNotFoundException e) {
            // pick a reasonable epoch number
            // this should only happen once when moving to a
            // new code version
            currentEpoch = epochOfZxid;
            LOG.info(
                "{} not found! Creating with a reasonable default of {}. "
                    + "This should only happen when you are upgrading your installation",
                CURRENT_EPOCH_FILENAME,
                currentEpoch);
            writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
        }
        if (epochOfZxid > currentEpoch) {
            // acceptedEpoch.tmp file in snapshot directory
            File currentTmp = new File(getTxnFactory().getSnapDir(),
                CURRENT_EPOCH_FILENAME + AtomicFileOutputStream.TMP_EXTENSION);
            if (currentTmp.exists()) {
                long epochOfTmp = readLongFromFile(currentTmp.getName());
                LOG.info("{} found. Setting current epoch to {}.", currentTmp, epochOfTmp);
                setCurrentEpoch(epochOfTmp);
            } else {
                throw new IOException(
                    "The current epoch, " + ZxidUtils.zxidToString(currentEpoch)
                        + ", is older than the last zxid, " + lastProcessedZxid);
            }
        }
        try {
            acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
        } catch (FileNotFoundException e) {
            // pick a reasonable epoch number
            // this should only happen once when moving to a
            // new code version
            acceptedEpoch = epochOfZxid;
            LOG.info(
                "{} not found! Creating with a reasonable default of {}. "
                    + "This should only happen when you are upgrading your installation",
                ACCEPTED_EPOCH_FILENAME,
                acceptedEpoch);
            writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
        }
        if (acceptedEpoch < currentEpoch) {
            throw new IOException("The accepted epoch, "
                                    + ZxidUtils.zxidToString(acceptedEpoch)
                                    + " is less than the current epoch, "
                                    + ZxidUtils.zxidToString(currentEpoch));
        }
    } catch (IOException ie) {
        LOG.error("Unable to load database on disk", ie);
        throw new RuntimeException("Unable to run quorum server ", ie);
    }
}
  1. ZKDatabase.java的loadDataBase()方法
java
public long loadDataBase() throws IOException {
    long startTime = Time.currentElapsedTime();
    long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
    initialized = true;
    long loadTime = Time.currentElapsedTime() - startTime;
    ServerMetrics.getMetrics().DB_INIT_TIME.add(loadTime);
    LOG.info("Snapshot loaded in {} ms, highest zxid is 0x{}, digest is {}",
            loadTime, Long.toHexString(zxid), dataTree.getTreeDigest());
    return zxid;
}
  1. FileTxnSnapLog.java源码
java
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
    long snapLoadingStartTime = Time.currentElapsedTime();
    // 恢复快照文件数据到 DataTree
    long deserializeResult = snapLog.deserialize(dt, sessions);
    ServerMetrics.getMetrics().STARTUP_SNAP_LOAD_TIME.add(Time.currentElapsedTime() - snapLoadingStartTime);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    boolean trustEmptyDB;
    File initFile = new File(dataDir.getParent(), "initialize");
    if (Files.deleteIfExists(initFile.toPath())) {
        LOG.info("Initialize file found, an empty database will not block voting participation");
        trustEmptyDB = true;
    } else {
        trustEmptyDB = autoCreateDB;
    }
    // 恢复编辑日志数据到DataTree
    RestoreFinalizer finalizer = () -> {
        long highestZxid = fastForwardFromEdits(dt, sessions, listener);
        // The snapshotZxidDigest will reset after replaying the txn of the
        // zxid in the snapshotZxidDigest, if it's not reset to null after
        // restoring, it means either there are not enough txns to cover that
        // zxid or that txn is missing
        DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
        if (snapshotZxidDigest != null) {
            LOG.warn(
                    "Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, "
                            + "which might lead to inconsistent state",
                    Long.toHexString(highestZxid),
                    Long.toHexString(snapshotZxidDigest.getZxid()));
        }
        return highestZxid;
    };

    if (-1L == deserializeResult) {
        /* this means that we couldn't find any snapshot, so we need to
            * initialize an empty database (reported in ZOOKEEPER-2325) */
        if (txnLog.getLastLoggedZxid() != -1) {
            // ZOOKEEPER-3056: provides an escape hatch for users upgrading
            // from old versions of zookeeper (3.4.x, pre 3.5.3).
            if (!trustEmptySnapshot) {
                throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");
            } else {
                LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING);
                return finalizer.run();
            }
        }

        if (trustEmptyDB) {
            /* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
                *       or use Map on save() */
            save(dt, (ConcurrentHashMap<Long, Integer>) sessions, false);

            /* return a zxid of 0, since we know the database is empty */
            return 0L;
        } else {
            /* return a zxid of -1, since we are possibly missing data */
            LOG.warn("Unexpected empty data tree, setting zxid to -1");
            dt.lastProcessedZxid = -1L;
            return -1L;
        }
    }

    return finalizer.run();
}
  1. 查找deserialize()实现类
    FileSnap.java
java
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
    // we run through 100 snapshots (not all of them)
    // if we cannot get it running within 100 snapshots
    // we should  give up
    List<File> snapList = findNValidSnapshots(100);
    if (snapList.size() == 0) {
        return -1L;
    }
    File snap = null;
    long snapZxid = -1;
    boolean foundValid = false;
    // 依次遍历每一个快照的数据
    for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
        snap = snapList.get(i);
        LOG.info("Reading snapshot {}", snap);
        // 反序列化环境准备
        snapZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
        try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) {
            InputArchive ia = BinaryInputArchive.getArchive(snapIS);
            // 反序列化,恢复数据到DataTree
            deserialize(dt, sessions, ia);
            SnapStream.checkSealIntegrity(snapIS, ia);

            // Digest feature was added after the CRC to make it backward
            // compatible, the older code can still read snapshots which
            // includes digest.
            //
            // To check the intact, after adding digest we added another
            // CRC check.
            if (dt.deserializeZxidDigest(ia, snapZxid)) {
                SnapStream.checkSealIntegrity(snapIS, ia);
            }

            foundValid = true;
            break;
        } catch (IOException e) {
            LOG.warn("problem reading snap file {}", snap, e);
        }
    }
    if (!foundValid) {
        throw new IOException("Not able to find valid snapshots in " + snapDir);
    }
    dt.lastProcessedZxid = snapZxid;
    lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000);

    // compare the digest if this is not a fuzzy snapshot, we want to compare
    // and find inconsistent asap.
    if (dt.getDigestFromLoadedSnapshot() != null) {
        dt.compareSnapshotDigests(dt.lastProcessedZxid);
    }
    return dt.lastProcessedZxid;
}
  1. 校验文件格式
java
public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException {
    FileHeader header = new FileHeader();
    header.deserialize(ia, "fileheader");
    if (header.getMagic() != SNAP_MAGIC) {
        throw new IOException("mismatching magic headers " + header.getMagic() + " !=  " + FileSnap.SNAP_MAGIC);
    }
    // 恢复快照数据到DataTree
    SerializeUtils.deserializeSnapshot(dt, ia, sessions);
}
  1. 反序列化快照 SerializeUtils.java
java
public static void deserializeSnapshot(DataTree dt, InputArchive ia, Map<Long, Integer> sessions) throws IOException {
    int count = ia.readInt("count");
    while (count > 0) {
        long id = ia.readLong("id");
        int to = ia.readInt("timeout");
        sessions.put(id, to);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(
                LOG,
                ZooTrace.SESSION_TRACE_MASK,
                "loadData --- session in archive: " + id + " with timeout: " + to);
        }
        count--;
    }
    dt.deserialize(ia, "tree");
}
  1. 从根节点开始逐个节点进行恢复快照数据 DataTree.java
java
public void deserialize(InputArchive ia, String tag) throws IOException {
    aclCache.deserialize(ia);
    nodes.clear();
    pTrie.clear();
    nodeDataSize.set(0);
    String path = ia.readString("path");
    // 从快照中恢复每一个datanode节点数据到 DataTree
    while (!"/".equals(path)) {
        // 每次循环创建一个节点对象
        DataNode node = new DataNode();
        ia.readRecord(node, "node");
        // 将DataNode恢复到DataTree
        nodes.put(path, node);
        synchronized (node) {
            aclCache.addUsage(node.acl);
        }
        int lastSlash = path.lastIndexOf('/');
        if (lastSlash == -1) {
            root = node;
        } else {
            // 处理父节点
            String parentPath = path.substring(0, lastSlash);
            DataNode parent = nodes.get(parentPath);
            if (parent == null) {
                throw new IOException("Invalid Datatree, unable to find "
                                        + "parent "
                                        + parentPath
                                        + " of path "
                                        + path);
            }
            // 处理子节点
            parent.addChild(path.substring(lastSlash + 1));
            // 处理临时节点和永久节点
            long eowner = node.stat.getEphemeralOwner();
            EphemeralType ephemeralType = EphemeralType.get(eowner);
            if (ephemeralType == EphemeralType.CONTAINER) {
                containers.add(path);
            } else if (ephemeralType == EphemeralType.TTL) {
                ttls.add(path);
            } else if (eowner != 0) {
                HashSet<String> list = ephemerals.get(eowner);
                if (list == null) {
                    list = new HashSet<String>();
                    ephemerals.put(eowner, list);
                }
                list.add(path);
            }
        }
        path = ia.readString("path");
    }
    // have counted digest for root node with "", ignore here to avoid
    // counting twice for root node
    nodes.putWithoutDigest("/", root);

    nodeDataSize.set(approximateDataSize());

    // we are done with deserializing the
    // the datatree
    // update the quotas - create path trie
    // and also update the stat nodes
    setupQuota();

    aclCache.purgeUnused();
}

4. 冷启动数据恢复编辑日志

  1. 回到FileTxnSnapLog.java类中的restore()方法
java
 public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
    long snapLoadingStartTime = Time.currentElapsedTime();
    // 恢复快照文件数据到 DataTree
    long deserializeResult = snapLog.deserialize(dt, sessions);
    ......
    // 恢复编辑日志数据到 DataTree
    RestoreFinalizer finalizer = () -> {
        long highestZxid = fastForwardFromEdits(dt, sessions, listener);
        // The snapshotZxidDigest will reset after replaying the txn of the
        // zxid in the snapshotZxidDigest, if it's not reset to null after
        // restoring, it means either there are not enough txns to cover that
        // zxid or that txn is missing
        DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
        if (snapshotZxidDigest != null) {
            LOG.warn(
                    "Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, "
                            + "which might lead to inconsistent state",
                    Long.toHexString(highestZxid),
                    Long.toHexString(snapshotZxidDigest.getZxid()));
        }
        return highestZxid;
    };

    ......

    return finalizer.run();
}
  1. 在此之前,已经从快照文件中恢复了大部分数据,接下来只需从编辑日志中开始恢复
java
public long fastForwardFromEdits(
    DataTree dt,
    Map<Long, Integer> sessions,
    PlayBackListener listener) throws IOException {
    // 接下来只需从快照的zxid + 1位置开始恢复
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid + 1);
    // 获取快照中最大的zxid,在执行编辑日志时,这个值会不断更新,直到所有操作执行完
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    int txnLoaded = 0;
    long startTime = Time.currentElapsedTime();
    try {
        // 从lastProcessedZxid事务编号器开始,不断的从编辑日志中恢复剩下的还没有恢复的数据
        while (true) {
            // iterator points to
            // the first valid txn when initialized
            // 获取事务头信息(有zxid)
            hdr = itr.getHeader();
            if (hdr == null) {
                //empty logs
                return dt.lastProcessedZxid;
            }
            if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                LOG.error("{}(highestZxid) > {}(next log) for type {}", highestZxid, hdr.getZxid(), hdr.getType());
            } else {
                highestZxid = hdr.getZxid();
            }
            try {
                // // 根据编辑日志恢复数据到DataTree,每执行一次,对应的事务id,highestZxid + 1
                processTransaction(hdr, dt, sessions, itr.getTxn());
                dt.compareDigest(hdr, itr.getTxn(), itr.getDigest());
                txnLoaded++;
            } catch (KeeperException.NoNodeException e) {
                throw new IOException("Failed to process transaction type: "
                                        + hdr.getType()
                                        + " error: "
                                        + e.getMessage(),
                                        e);
            }
            listener.onTxnLoaded(hdr, itr.getTxn(), itr.getDigest());
            if (!itr.next()) {
                break;
            }
        }
    } finally {
        if (itr != null) {
            itr.close();
        }
    }

    long loadTime = Time.currentElapsedTime() - startTime;
    LOG.info("{} txns loaded in {} ms", txnLoaded, loadTime);
    ServerMetrics.getMetrics().STARTUP_TXNS_LOADED.add(txnLoaded);
    ServerMetrics.getMetrics().STARTUP_TXNS_LOAD_TIME.add(loadTime);

    return highestZxid;
}
  1. 处理编辑日志,每个分支都执行processTxn()方法
java
public void processTransaction(
    TxnHeader hdr,
    DataTree dt,
    Map<Long, Integer> sessions,
    Record txn) throws KeeperException.NoNodeException {
    ProcessTxnResult rc;
    switch (hdr.getType()) {
    case OpCode.createSession:
        sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut());
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(
                LOG,
                ZooTrace.SESSION_TRACE_MASK,
                "playLog --- create session in log: 0x" + Long.toHexString(hdr.getClientId())
                + " with timeout: " + ((CreateSessionTxn) txn).getTimeOut());
        }
        // give dataTree a chance to sync its lastProcessedZxid
        rc = dt.processTxn(hdr, txn);
        break;
    case OpCode.closeSession:
        sessions.remove(hdr.getClientId());
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(
                LOG,
                ZooTrace.SESSION_TRACE_MASK,
                "playLog --- close session in log: 0x" + Long.toHexString(hdr.getClientId()));
        }
        // 创建节点、删除节点和其他的各种事务操作等
        rc = dt.processTxn(hdr, txn);
        break;
    default:
        rc = dt.processTxn(hdr, txn);
    }

    if (rc.err != Code.OK.intValue()) {
        LOG.debug("Ignoring processTxn failure hdr: {}, error: {}, path: {}", hdr.getType(), rc.err, rc.path);
    }
}
  1. 实际处理编辑日志方法,按照节点操作类型进行恢复数据
java
public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
    return this.processTxn(header, txn, false);
}

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
    ProcessTxnResult rc = new ProcessTxnResult();

    try {
        rc.clientId = header.getClientId();
        rc.cxid = header.getCxid();
        rc.zxid = header.getZxid();
        rc.type = header.getType();
        rc.err = 0;
        rc.multiResult = null;
        switch (header.getType()) {
        case OpCode.create:
            CreateTxn createTxn = (CreateTxn) txn;
            rc.path = createTxn.getPath();
            // 创建节点
            createNode(
                createTxn.getPath(),
                createTxn.getData(),
                createTxn.getAcl(),
                createTxn.getEphemeral() ? header.getClientId() : 0,
                createTxn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                null);
            break;
        case OpCode.create2:
            CreateTxn create2Txn = (CreateTxn) txn;
            rc.path = create2Txn.getPath();
            Stat stat = new Stat();
            createNode(
                create2Txn.getPath(),
                create2Txn.getData(),
                create2Txn.getAcl(),
                create2Txn.getEphemeral() ? header.getClientId() : 0,
                create2Txn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                stat);
            rc.stat = stat;
            break;
        case OpCode.createTTL:
            CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
            rc.path = createTtlTxn.getPath();
            stat = new Stat();
            createNode(
                createTtlTxn.getPath(),
                createTtlTxn.getData(),
                createTtlTxn.getAcl(),
                EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
                createTtlTxn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                stat);
            rc.stat = stat;
            break;
        case OpCode.createContainer:
            CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
            rc.path = createContainerTxn.getPath();
            stat = new Stat();
            createNode(
                createContainerTxn.getPath(),
                createContainerTxn.getData(),
                createContainerTxn.getAcl(),
                EphemeralType.CONTAINER_EPHEMERAL_OWNER,
                createContainerTxn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                stat);
            rc.stat = stat;
            break;
        case OpCode.delete:
        case OpCode.deleteContainer:
            DeleteTxn deleteTxn = (DeleteTxn) txn;
            rc.path = deleteTxn.getPath();
            deleteNode(deleteTxn.getPath(), header.getZxid());
            break;
        case OpCode.reconfig:
        case OpCode.setData:
            SetDataTxn setDataTxn = (SetDataTxn) txn;
            rc.path = setDataTxn.getPath();
            rc.stat = setData(
                setDataTxn.getPath(),
                setDataTxn.getData(),
                setDataTxn.getVersion(),
                header.getZxid(),
                header.getTime());
            break;
        case OpCode.setACL:
            SetACLTxn setACLTxn = (SetACLTxn) txn;
            rc.path = setACLTxn.getPath();
            rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion());
            break;
        case OpCode.closeSession:
            long sessionId = header.getClientId();
            if (txn != null) {
                killSession(sessionId, header.getZxid(),
                        ephemerals.remove(sessionId),
                        ((CloseSessionTxn) txn).getPaths2Delete());
            } else {
                killSession(sessionId, header.getZxid());
            }
            break;
        case OpCode.error:
            ErrorTxn errTxn = (ErrorTxn) txn;
            rc.err = errTxn.getErr();
            break;
        case OpCode.check:
            CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
            rc.path = checkTxn.getPath();
            break;
        case OpCode.multi:
            MultiTxn multiTxn = (MultiTxn) txn;
            List<Txn> txns = multiTxn.getTxns();
            rc.multiResult = new ArrayList<ProcessTxnResult>();
            boolean failed = false;
            for (Txn subtxn : txns) {
                if (subtxn.getType() == OpCode.error) {
                    failed = true;
                    break;
                }
            }

            boolean post_failed = false;
            for (Txn subtxn : txns) {
                ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
                Record record = null;
                switch (subtxn.getType()) {
                case OpCode.create:
                case OpCode.create2:
                    record = new CreateTxn();
                    break;
                case OpCode.createTTL:
                    record = new CreateTTLTxn();
                    break;
                case OpCode.createContainer:
                    record = new CreateContainerTxn();
                    break;
                case OpCode.delete:
                case OpCode.deleteContainer:
                    record = new DeleteTxn();
                    break;
                case OpCode.setData:
                    record = new SetDataTxn();
                    break;
                case OpCode.error:
                    record = new ErrorTxn();
                    post_failed = true;
                    break;
                case OpCode.check:
                    record = new CheckVersionTxn();
                    break;
                default:
                    throw new IOException("Invalid type of op: " + subtxn.getType());
                }
                assert (record != null);

                ByteBufferInputStream.byteBuffer2Record(bb, record);

                if (failed && subtxn.getType() != OpCode.error) {
                    int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() : Code.OK.intValue();

                    subtxn.setType(OpCode.error);
                    record = new ErrorTxn(ec);
                }

                assert !failed || (subtxn.getType() == OpCode.error);

                TxnHeader subHdr = new TxnHeader(
                    header.getClientId(),
                    header.getCxid(),
                    header.getZxid(),
                    header.getTime(),
                    subtxn.getType());
                ProcessTxnResult subRc = processTxn(subHdr, record, true);
                rc.multiResult.add(subRc);
                if (subRc.err != 0 && rc.err == 0) {
                    rc.err = subRc.err;
                }
            }
            break;
        }
    } catch (KeeperException e) {
        LOG.debug("Failed: {}:{}", header, txn, e);
        rc.err = e.code().intValue();
    } catch (IOException e) {
        LOG.debug("Failed: {}:{}", header, txn, e);
    }

    if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) {
        LOG.debug("Adjusting parent cversion for Txn: {} path: {} err: {}", header.getType(), rc.path, rc.err);
        int lastSlash = rc.path.lastIndexOf('/');
        String parentName = rc.path.substring(0, lastSlash);
        CreateTxn cTxn = (CreateTxn) txn;
        try {
            setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid());
        } catch (KeeperException.NoNodeException e) {
            LOG.error("Failed to set parent cversion for: {}", parentName, e);
            rc.err = e.code().intValue();
        }
    } else if (rc.err != Code.OK.intValue()) {
        LOG.debug("Ignoring processTxn failure hdr: {} : error: {}", header.getType(), rc.err);
    }

    if (!isSubTxn) {
    
        if (rc.zxid > lastProcessedZxid) {
            lastProcessedZxid = rc.zxid;
        }

        if (digestFromLoadedSnapshot != null) {
            compareSnapshotDigests(rc.zxid);
        } else {
            // only start recording digest when we're not in fuzzy state
            logZxidDigest(rc.zxid, getTreeDigest());
        }
    }

    return rc;
}