源码之ZK服务端启动
1.zk数据存储机制
- zk中的数据模型,是一棵树,DataTree,每个节点,叫做DataNode。
- zk集群中的 DataTree 时刻保持状态同步。
- Zookeeper 集群中每个zk节点中,数据在内存和磁盘中都有一份完整的数据。
- 内存数据:DataTree
- 磁盘数据:快照文件 + 编辑日志
2. zk的服务端start()方法流程
3. 冷启动数据恢复快照数据
- 启动集群
QuorumPeerMain.java
java
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
......
LOG.info("Starting quorum peer, myid=" + config.getServerId());
......
try {
ServerMetrics.metricsProviderInitialized(metricsProvider);
ProviderRegistry.initialize();
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}
// 通信组件初始化,默认是 NIO 通信
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
}
// 把解析的参数赋值给该Zookeeper节点
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
// 管理 zk 数据的存储
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
// 管理 zk 的通信
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
......
quorumPeer.initialize();
......
// 启动 zk
quorumPeer.start();
ZKAuditProvider.addZKStartStopAuditLog();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
} finally {
try {
metricsProvider.stop();
} catch (Throwable error) {
LOG.warn("Error while stopping metrics", error);
}
}
}
- 冷启动恢复数据
QuorumPeer.java
java
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
// 冷启动数据恢复
loadDataBase();
startServerCnxnFactory();
try {
// 启动通信工厂实例对象
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
}
// 准备选举环境
startLeaderElection();
startJvmPauseMonitor();
// 执行选举
super.start();
}
- 加载数据
java
private void loadDataBase() {
try {
// 加载磁盘数据到内存,恢复 DataTree,操作分两种:事务操作和非事务操作
zkDb.loadDataBase();
// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
} catch (FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
LOG.info(
"{} not found! Creating with a reasonable default of {}. "
+ "This should only happen when you are upgrading your installation",
CURRENT_EPOCH_FILENAME,
currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
}
if (epochOfZxid > currentEpoch) {
// acceptedEpoch.tmp file in snapshot directory
File currentTmp = new File(getTxnFactory().getSnapDir(),
CURRENT_EPOCH_FILENAME + AtomicFileOutputStream.TMP_EXTENSION);
if (currentTmp.exists()) {
long epochOfTmp = readLongFromFile(currentTmp.getName());
LOG.info("{} found. Setting current epoch to {}.", currentTmp, epochOfTmp);
setCurrentEpoch(epochOfTmp);
} else {
throw new IOException(
"The current epoch, " + ZxidUtils.zxidToString(currentEpoch)
+ ", is older than the last zxid, " + lastProcessedZxid);
}
}
try {
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
} catch (FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
acceptedEpoch = epochOfZxid;
LOG.info(
"{} not found! Creating with a reasonable default of {}. "
+ "This should only happen when you are upgrading your installation",
ACCEPTED_EPOCH_FILENAME,
acceptedEpoch);
writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
}
if (acceptedEpoch < currentEpoch) {
throw new IOException("The accepted epoch, "
+ ZxidUtils.zxidToString(acceptedEpoch)
+ " is less than the current epoch, "
+ ZxidUtils.zxidToString(currentEpoch));
}
} catch (IOException ie) {
LOG.error("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
}
- 加载磁盘数据到内存,恢复DataTree
数据恢复过程:
(1)从快照文件中恢复大部分数据,并得到一个lastProcessZXid。
(2)再从编辑日志中执行replay,执行到最后一条日志并更新 lastProcessZXid。
(3)最终得到,datatree和lastProcessZXid,表示数据恢复完成。
java
private void loadDataBase() {
try {
// zk恢复数据的操作分两种:事务操作和非事务操作
zkDb.loadDataBase();
// 1.事务操作:zk.cteate()方法;都会被分配一个全局唯一的zxid,zxid一共64位:(前32位:epoch每个leader任期的代号;后32位:txid 为事务id)
// 2. 非事务操作:zk.getData()方法
// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
} catch (FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
LOG.info(
"{} not found! Creating with a reasonable default of {}. "
+ "This should only happen when you are upgrading your installation",
CURRENT_EPOCH_FILENAME,
currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
}
if (epochOfZxid > currentEpoch) {
// acceptedEpoch.tmp file in snapshot directory
File currentTmp = new File(getTxnFactory().getSnapDir(),
CURRENT_EPOCH_FILENAME + AtomicFileOutputStream.TMP_EXTENSION);
if (currentTmp.exists()) {
long epochOfTmp = readLongFromFile(currentTmp.getName());
LOG.info("{} found. Setting current epoch to {}.", currentTmp, epochOfTmp);
setCurrentEpoch(epochOfTmp);
} else {
throw new IOException(
"The current epoch, " + ZxidUtils.zxidToString(currentEpoch)
+ ", is older than the last zxid, " + lastProcessedZxid);
}
}
try {
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
} catch (FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
acceptedEpoch = epochOfZxid;
LOG.info(
"{} not found! Creating with a reasonable default of {}. "
+ "This should only happen when you are upgrading your installation",
ACCEPTED_EPOCH_FILENAME,
acceptedEpoch);
writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
}
if (acceptedEpoch < currentEpoch) {
throw new IOException("The accepted epoch, "
+ ZxidUtils.zxidToString(acceptedEpoch)
+ " is less than the current epoch, "
+ ZxidUtils.zxidToString(currentEpoch));
}
} catch (IOException ie) {
LOG.error("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
}
- ZKDatabase.java的loadDataBase()方法
java
public long loadDataBase() throws IOException {
long startTime = Time.currentElapsedTime();
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
long loadTime = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().DB_INIT_TIME.add(loadTime);
LOG.info("Snapshot loaded in {} ms, highest zxid is 0x{}, digest is {}",
loadTime, Long.toHexString(zxid), dataTree.getTreeDigest());
return zxid;
}
- FileTxnSnapLog.java源码
java
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
long snapLoadingStartTime = Time.currentElapsedTime();
// 恢复快照文件数据到 DataTree
long deserializeResult = snapLog.deserialize(dt, sessions);
ServerMetrics.getMetrics().STARTUP_SNAP_LOAD_TIME.add(Time.currentElapsedTime() - snapLoadingStartTime);
FileTxnLog txnLog = new FileTxnLog(dataDir);
boolean trustEmptyDB;
File initFile = new File(dataDir.getParent(), "initialize");
if (Files.deleteIfExists(initFile.toPath())) {
LOG.info("Initialize file found, an empty database will not block voting participation");
trustEmptyDB = true;
} else {
trustEmptyDB = autoCreateDB;
}
// 恢复编辑日志数据到DataTree
RestoreFinalizer finalizer = () -> {
long highestZxid = fastForwardFromEdits(dt, sessions, listener);
// The snapshotZxidDigest will reset after replaying the txn of the
// zxid in the snapshotZxidDigest, if it's not reset to null after
// restoring, it means either there are not enough txns to cover that
// zxid or that txn is missing
DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
if (snapshotZxidDigest != null) {
LOG.warn(
"Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, "
+ "which might lead to inconsistent state",
Long.toHexString(highestZxid),
Long.toHexString(snapshotZxidDigest.getZxid()));
}
return highestZxid;
};
if (-1L == deserializeResult) {
/* this means that we couldn't find any snapshot, so we need to
* initialize an empty database (reported in ZOOKEEPER-2325) */
if (txnLog.getLastLoggedZxid() != -1) {
// ZOOKEEPER-3056: provides an escape hatch for users upgrading
// from old versions of zookeeper (3.4.x, pre 3.5.3).
if (!trustEmptySnapshot) {
throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");
} else {
LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING);
return finalizer.run();
}
}
if (trustEmptyDB) {
/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
* or use Map on save() */
save(dt, (ConcurrentHashMap<Long, Integer>) sessions, false);
/* return a zxid of 0, since we know the database is empty */
return 0L;
} else {
/* return a zxid of -1, since we are possibly missing data */
LOG.warn("Unexpected empty data tree, setting zxid to -1");
dt.lastProcessedZxid = -1L;
return -1L;
}
}
return finalizer.run();
}
- 查找deserialize()实现类
FileSnap.java
java
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
// we run through 100 snapshots (not all of them)
// if we cannot get it running within 100 snapshots
// we should give up
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
long snapZxid = -1;
boolean foundValid = false;
// 依次遍历每一个快照的数据
for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
snap = snapList.get(i);
LOG.info("Reading snapshot {}", snap);
// 反序列化环境准备
snapZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) {
InputArchive ia = BinaryInputArchive.getArchive(snapIS);
// 反序列化,恢复数据到DataTree
deserialize(dt, sessions, ia);
SnapStream.checkSealIntegrity(snapIS, ia);
// Digest feature was added after the CRC to make it backward
// compatible, the older code can still read snapshots which
// includes digest.
//
// To check the intact, after adding digest we added another
// CRC check.
if (dt.deserializeZxidDigest(ia, snapZxid)) {
SnapStream.checkSealIntegrity(snapIS, ia);
}
foundValid = true;
break;
} catch (IOException e) {
LOG.warn("problem reading snap file {}", snap, e);
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
dt.lastProcessedZxid = snapZxid;
lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000);
// compare the digest if this is not a fuzzy snapshot, we want to compare
// and find inconsistent asap.
if (dt.getDigestFromLoadedSnapshot() != null) {
dt.compareSnapshotDigests(dt.lastProcessedZxid);
}
return dt.lastProcessedZxid;
}
- 校验文件格式
java
public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException {
FileHeader header = new FileHeader();
header.deserialize(ia, "fileheader");
if (header.getMagic() != SNAP_MAGIC) {
throw new IOException("mismatching magic headers " + header.getMagic() + " != " + FileSnap.SNAP_MAGIC);
}
// 恢复快照数据到DataTree
SerializeUtils.deserializeSnapshot(dt, ia, sessions);
}
- 反序列化快照 SerializeUtils.java
java
public static void deserializeSnapshot(DataTree dt, InputArchive ia, Map<Long, Integer> sessions) throws IOException {
int count = ia.readInt("count");
while (count > 0) {
long id = ia.readLong("id");
int to = ia.readInt("timeout");
sessions.put(id, to);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.SESSION_TRACE_MASK,
"loadData --- session in archive: " + id + " with timeout: " + to);
}
count--;
}
dt.deserialize(ia, "tree");
}
- 从根节点开始逐个节点进行恢复快照数据 DataTree.java
java
public void deserialize(InputArchive ia, String tag) throws IOException {
aclCache.deserialize(ia);
nodes.clear();
pTrie.clear();
nodeDataSize.set(0);
String path = ia.readString("path");
// 从快照中恢复每一个datanode节点数据到 DataTree
while (!"/".equals(path)) {
// 每次循环创建一个节点对象
DataNode node = new DataNode();
ia.readRecord(node, "node");
// 将DataNode恢复到DataTree
nodes.put(path, node);
synchronized (node) {
aclCache.addUsage(node.acl);
}
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1) {
root = node;
} else {
// 处理父节点
String parentPath = path.substring(0, lastSlash);
DataNode parent = nodes.get(parentPath);
if (parent == null) {
throw new IOException("Invalid Datatree, unable to find "
+ "parent "
+ parentPath
+ " of path "
+ path);
}
// 处理子节点
parent.addChild(path.substring(lastSlash + 1));
// 处理临时节点和永久节点
long eowner = node.stat.getEphemeralOwner();
EphemeralType ephemeralType = EphemeralType.get(eowner);
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (eowner != 0) {
HashSet<String> list = ephemerals.get(eowner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(eowner, list);
}
list.add(path);
}
}
path = ia.readString("path");
}
// have counted digest for root node with "", ignore here to avoid
// counting twice for root node
nodes.putWithoutDigest("/", root);
nodeDataSize.set(approximateDataSize());
// we are done with deserializing the
// the datatree
// update the quotas - create path trie
// and also update the stat nodes
setupQuota();
aclCache.purgeUnused();
}
4. 冷启动数据恢复编辑日志
- 回到FileTxnSnapLog.java类中的
restore()
方法
java
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
long snapLoadingStartTime = Time.currentElapsedTime();
// 恢复快照文件数据到 DataTree
long deserializeResult = snapLog.deserialize(dt, sessions);
......
// 恢复编辑日志数据到 DataTree
RestoreFinalizer finalizer = () -> {
long highestZxid = fastForwardFromEdits(dt, sessions, listener);
// The snapshotZxidDigest will reset after replaying the txn of the
// zxid in the snapshotZxidDigest, if it's not reset to null after
// restoring, it means either there are not enough txns to cover that
// zxid or that txn is missing
DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
if (snapshotZxidDigest != null) {
LOG.warn(
"Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, "
+ "which might lead to inconsistent state",
Long.toHexString(highestZxid),
Long.toHexString(snapshotZxidDigest.getZxid()));
}
return highestZxid;
};
......
return finalizer.run();
}
- 在此之前,已经从快照文件中恢复了大部分数据,接下来只需从编辑日志中开始恢复
java
public long fastForwardFromEdits(
DataTree dt,
Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
// 接下来只需从快照的zxid + 1位置开始恢复
TxnIterator itr = txnLog.read(dt.lastProcessedZxid + 1);
// 获取快照中最大的zxid,在执行编辑日志时,这个值会不断更新,直到所有操作执行完
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
int txnLoaded = 0;
long startTime = Time.currentElapsedTime();
try {
// 从lastProcessedZxid事务编号器开始,不断的从编辑日志中恢复剩下的还没有恢复的数据
while (true) {
// iterator points to
// the first valid txn when initialized
// 获取事务头信息(有zxid)
hdr = itr.getHeader();
if (hdr == null) {
//empty logs
return dt.lastProcessedZxid;
}
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
LOG.error("{}(highestZxid) > {}(next log) for type {}", highestZxid, hdr.getZxid(), hdr.getType());
} else {
highestZxid = hdr.getZxid();
}
try {
// // 根据编辑日志恢复数据到DataTree,每执行一次,对应的事务id,highestZxid + 1
processTransaction(hdr, dt, sessions, itr.getTxn());
dt.compareDigest(hdr, itr.getTxn(), itr.getDigest());
txnLoaded++;
} catch (KeeperException.NoNodeException e) {
throw new IOException("Failed to process transaction type: "
+ hdr.getType()
+ " error: "
+ e.getMessage(),
e);
}
listener.onTxnLoaded(hdr, itr.getTxn(), itr.getDigest());
if (!itr.next()) {
break;
}
}
} finally {
if (itr != null) {
itr.close();
}
}
long loadTime = Time.currentElapsedTime() - startTime;
LOG.info("{} txns loaded in {} ms", txnLoaded, loadTime);
ServerMetrics.getMetrics().STARTUP_TXNS_LOADED.add(txnLoaded);
ServerMetrics.getMetrics().STARTUP_TXNS_LOAD_TIME.add(loadTime);
return highestZxid;
}
- 处理编辑日志,每个分支都执行processTxn()方法
java
public void processTransaction(
TxnHeader hdr,
DataTree dt,
Map<Long, Integer> sessions,
Record txn) throws KeeperException.NoNodeException {
ProcessTxnResult rc;
switch (hdr.getType()) {
case OpCode.createSession:
sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut());
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.SESSION_TRACE_MASK,
"playLog --- create session in log: 0x" + Long.toHexString(hdr.getClientId())
+ " with timeout: " + ((CreateSessionTxn) txn).getTimeOut());
}
// give dataTree a chance to sync its lastProcessedZxid
rc = dt.processTxn(hdr, txn);
break;
case OpCode.closeSession:
sessions.remove(hdr.getClientId());
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.SESSION_TRACE_MASK,
"playLog --- close session in log: 0x" + Long.toHexString(hdr.getClientId()));
}
// 创建节点、删除节点和其他的各种事务操作等
rc = dt.processTxn(hdr, txn);
break;
default:
rc = dt.processTxn(hdr, txn);
}
if (rc.err != Code.OK.intValue()) {
LOG.debug("Ignoring processTxn failure hdr: {}, error: {}, path: {}", hdr.getType(), rc.err, rc.path);
}
}
- 实际处理编辑日志方法,按照节点操作类型进行恢复数据
java
public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
return this.processTxn(header, txn, false);
}
public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) {
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
// 创建节点
createNode(
createTxn.getPath(),
createTxn.getData(),
createTxn.getAcl(),
createTxn.getEphemeral() ? header.getClientId() : 0,
createTxn.getParentCVersion(),
header.getZxid(),
header.getTime(),
null);
break;
case OpCode.create2:
CreateTxn create2Txn = (CreateTxn) txn;
rc.path = create2Txn.getPath();
Stat stat = new Stat();
createNode(
create2Txn.getPath(),
create2Txn.getData(),
create2Txn.getAcl(),
create2Txn.getEphemeral() ? header.getClientId() : 0,
create2Txn.getParentCVersion(),
header.getZxid(),
header.getTime(),
stat);
rc.stat = stat;
break;
case OpCode.createTTL:
CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
rc.path = createTtlTxn.getPath();
stat = new Stat();
createNode(
createTtlTxn.getPath(),
createTtlTxn.getData(),
createTtlTxn.getAcl(),
EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
createTtlTxn.getParentCVersion(),
header.getZxid(),
header.getTime(),
stat);
rc.stat = stat;
break;
case OpCode.createContainer:
CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
rc.path = createContainerTxn.getPath();
stat = new Stat();
createNode(
createContainerTxn.getPath(),
createContainerTxn.getData(),
createContainerTxn.getAcl(),
EphemeralType.CONTAINER_EPHEMERAL_OWNER,
createContainerTxn.getParentCVersion(),
header.getZxid(),
header.getTime(),
stat);
rc.stat = stat;
break;
case OpCode.delete:
case OpCode.deleteContainer:
DeleteTxn deleteTxn = (DeleteTxn) txn;
rc.path = deleteTxn.getPath();
deleteNode(deleteTxn.getPath(), header.getZxid());
break;
case OpCode.reconfig:
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(
setDataTxn.getPath(),
setDataTxn.getData(),
setDataTxn.getVersion(),
header.getZxid(),
header.getTime());
break;
case OpCode.setACL:
SetACLTxn setACLTxn = (SetACLTxn) txn;
rc.path = setACLTxn.getPath();
rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion());
break;
case OpCode.closeSession:
long sessionId = header.getClientId();
if (txn != null) {
killSession(sessionId, header.getZxid(),
ephemerals.remove(sessionId),
((CloseSessionTxn) txn).getPaths2Delete());
} else {
killSession(sessionId, header.getZxid());
}
break;
case OpCode.error:
ErrorTxn errTxn = (ErrorTxn) txn;
rc.err = errTxn.getErr();
break;
case OpCode.check:
CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
rc.path = checkTxn.getPath();
break;
case OpCode.multi:
MultiTxn multiTxn = (MultiTxn) txn;
List<Txn> txns = multiTxn.getTxns();
rc.multiResult = new ArrayList<ProcessTxnResult>();
boolean failed = false;
for (Txn subtxn : txns) {
if (subtxn.getType() == OpCode.error) {
failed = true;
break;
}
}
boolean post_failed = false;
for (Txn subtxn : txns) {
ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
Record record = null;
switch (subtxn.getType()) {
case OpCode.create:
case OpCode.create2:
record = new CreateTxn();
break;
case OpCode.createTTL:
record = new CreateTTLTxn();
break;
case OpCode.createContainer:
record = new CreateContainerTxn();
break;
case OpCode.delete:
case OpCode.deleteContainer:
record = new DeleteTxn();
break;
case OpCode.setData:
record = new SetDataTxn();
break;
case OpCode.error:
record = new ErrorTxn();
post_failed = true;
break;
case OpCode.check:
record = new CheckVersionTxn();
break;
default:
throw new IOException("Invalid type of op: " + subtxn.getType());
}
assert (record != null);
ByteBufferInputStream.byteBuffer2Record(bb, record);
if (failed && subtxn.getType() != OpCode.error) {
int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() : Code.OK.intValue();
subtxn.setType(OpCode.error);
record = new ErrorTxn(ec);
}
assert !failed || (subtxn.getType() == OpCode.error);
TxnHeader subHdr = new TxnHeader(
header.getClientId(),
header.getCxid(),
header.getZxid(),
header.getTime(),
subtxn.getType());
ProcessTxnResult subRc = processTxn(subHdr, record, true);
rc.multiResult.add(subRc);
if (subRc.err != 0 && rc.err == 0) {
rc.err = subRc.err;
}
}
break;
}
} catch (KeeperException e) {
LOG.debug("Failed: {}:{}", header, txn, e);
rc.err = e.code().intValue();
} catch (IOException e) {
LOG.debug("Failed: {}:{}", header, txn, e);
}
if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) {
LOG.debug("Adjusting parent cversion for Txn: {} path: {} err: {}", header.getType(), rc.path, rc.err);
int lastSlash = rc.path.lastIndexOf('/');
String parentName = rc.path.substring(0, lastSlash);
CreateTxn cTxn = (CreateTxn) txn;
try {
setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid());
} catch (KeeperException.NoNodeException e) {
LOG.error("Failed to set parent cversion for: {}", parentName, e);
rc.err = e.code().intValue();
}
} else if (rc.err != Code.OK.intValue()) {
LOG.debug("Ignoring processTxn failure hdr: {} : error: {}", header.getType(), rc.err);
}
if (!isSubTxn) {
if (rc.zxid > lastProcessedZxid) {
lastProcessedZxid = rc.zxid;
}
if (digestFromLoadedSnapshot != null) {
compareSnapshotDigests(rc.zxid);
} else {
// only start recording digest when we're not in fuzzy state
logZxidDigest(rc.zxid, getTreeDigest());
}
}
return rc;
}