HDFS-13544. Improve logging for JournalNode in federated cluster.
This commit is contained in:
parent
6653f4ba2e
commit
6beb25ab7e
@ -208,11 +208,12 @@ private synchronized EditLogFile scanStorageForLatestEdits() throws IOException
|
|||||||
while (!files.isEmpty()) {
|
while (!files.isEmpty()) {
|
||||||
EditLogFile latestLog = files.remove(files.size() - 1);
|
EditLogFile latestLog = files.remove(files.size() - 1);
|
||||||
latestLog.scanLog(Long.MAX_VALUE, false);
|
latestLog.scanLog(Long.MAX_VALUE, false);
|
||||||
LOG.info("Latest log is " + latestLog);
|
LOG.info("Latest log is " + latestLog + " ; journal id: " + journalId);
|
||||||
if (latestLog.getLastTxId() == HdfsServerConstants.INVALID_TXID) {
|
if (latestLog.getLastTxId() == HdfsServerConstants.INVALID_TXID) {
|
||||||
// the log contains no transactions
|
// the log contains no transactions
|
||||||
LOG.warn("Latest log " + latestLog + " has no transactions. " +
|
LOG.warn("Latest log " + latestLog + " has no transactions. " +
|
||||||
"moving it aside and looking for previous log");
|
"moving it aside and looking for previous log"
|
||||||
|
+ " ; journal id: " + journalId);
|
||||||
latestLog.moveAsideEmptyFile();
|
latestLog.moveAsideEmptyFile();
|
||||||
} else {
|
} else {
|
||||||
return latestLog;
|
return latestLog;
|
||||||
@ -230,7 +231,7 @@ void format(NamespaceInfo nsInfo) throws IOException {
|
|||||||
Preconditions.checkState(nsInfo.getNamespaceID() != 0,
|
Preconditions.checkState(nsInfo.getNamespaceID() != 0,
|
||||||
"can't format with uninitialized namespace info: %s",
|
"can't format with uninitialized namespace info: %s",
|
||||||
nsInfo);
|
nsInfo);
|
||||||
LOG.info("Formatting " + this + " with namespace info: " +
|
LOG.info("Formatting journal id : " + journalId + " with namespace info: " +
|
||||||
nsInfo);
|
nsInfo);
|
||||||
storage.format(nsInfo);
|
storage.format(nsInfo);
|
||||||
refreshCachedData();
|
refreshCachedData();
|
||||||
@ -323,7 +324,7 @@ synchronized NewEpochResponseProto newEpoch(
|
|||||||
// any other that we've promised.
|
// any other that we've promised.
|
||||||
if (epoch <= getLastPromisedEpoch()) {
|
if (epoch <= getLastPromisedEpoch()) {
|
||||||
throw new IOException("Proposed epoch " + epoch + " <= last promise " +
|
throw new IOException("Proposed epoch " + epoch + " <= last promise " +
|
||||||
getLastPromisedEpoch());
|
getLastPromisedEpoch() + " ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
|
|
||||||
updateLastPromisedEpoch(epoch);
|
updateLastPromisedEpoch(epoch);
|
||||||
@ -343,7 +344,8 @@ synchronized NewEpochResponseProto newEpoch(
|
|||||||
|
|
||||||
private void updateLastPromisedEpoch(long newEpoch) throws IOException {
|
private void updateLastPromisedEpoch(long newEpoch) throws IOException {
|
||||||
LOG.info("Updating lastPromisedEpoch from " + lastPromisedEpoch.get() +
|
LOG.info("Updating lastPromisedEpoch from " + lastPromisedEpoch.get() +
|
||||||
" to " + newEpoch + " for client " + Server.getRemoteIp());
|
" to " + newEpoch + " for client " + Server.getRemoteIp() +
|
||||||
|
" ; journal id: " + journalId);
|
||||||
lastPromisedEpoch.set(newEpoch);
|
lastPromisedEpoch.set(newEpoch);
|
||||||
|
|
||||||
// Since we have a new writer, reset the IPC serial - it will start
|
// Since we have a new writer, reset the IPC serial - it will start
|
||||||
@ -378,7 +380,7 @@ synchronized void journal(RequestInfo reqInfo,
|
|||||||
}
|
}
|
||||||
|
|
||||||
checkSync(curSegment != null,
|
checkSync(curSegment != null,
|
||||||
"Can't write, no segment open");
|
"Can't write, no segment open" + " ; journal id: " + journalId);
|
||||||
|
|
||||||
if (curSegmentTxId != segmentTxId) {
|
if (curSegmentTxId != segmentTxId) {
|
||||||
// Sanity check: it is possible that the writer will fail IPCs
|
// Sanity check: it is possible that the writer will fail IPCs
|
||||||
@ -389,17 +391,20 @@ synchronized void journal(RequestInfo reqInfo,
|
|||||||
// and throw an exception.
|
// and throw an exception.
|
||||||
JournalOutOfSyncException e = new JournalOutOfSyncException(
|
JournalOutOfSyncException e = new JournalOutOfSyncException(
|
||||||
"Writer out of sync: it thinks it is writing segment " + segmentTxId
|
"Writer out of sync: it thinks it is writing segment " + segmentTxId
|
||||||
+ " but current segment is " + curSegmentTxId);
|
+ " but current segment is " + curSegmentTxId
|
||||||
|
+ " ; journal id: " + journalId);
|
||||||
abortCurSegment();
|
abortCurSegment();
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
checkSync(nextTxId == firstTxnId,
|
checkSync(nextTxId == firstTxnId,
|
||||||
"Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
|
"Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId
|
||||||
|
+ " ; journal id: " + journalId);
|
||||||
|
|
||||||
long lastTxnId = firstTxnId + numTxns - 1;
|
long lastTxnId = firstTxnId + numTxns - 1;
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
|
LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId +
|
||||||
|
" ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the edit has already been marked as committed, we know
|
// If the edit has already been marked as committed, we know
|
||||||
@ -423,7 +428,7 @@ synchronized void journal(RequestInfo reqInfo,
|
|||||||
|
|
||||||
if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
|
if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
|
||||||
LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
|
LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
|
||||||
" took " + milliSeconds + "ms");
|
" took " + milliSeconds + "ms" + " ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isLagging) {
|
if (isLagging) {
|
||||||
@ -455,7 +460,7 @@ private synchronized void checkRequest(RequestInfo reqInfo) throws IOException {
|
|||||||
if (reqInfo.getEpoch() < lastPromisedEpoch.get()) {
|
if (reqInfo.getEpoch() < lastPromisedEpoch.get()) {
|
||||||
throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
|
throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
|
||||||
" is less than the last promised epoch " +
|
" is less than the last promised epoch " +
|
||||||
lastPromisedEpoch.get());
|
lastPromisedEpoch.get() + " ; journal id: " + journalId);
|
||||||
} else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) {
|
} else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) {
|
||||||
// A newer client has arrived. Fence any previous writers by updating
|
// A newer client has arrived. Fence any previous writers by updating
|
||||||
// the promise.
|
// the promise.
|
||||||
@ -465,16 +470,16 @@ private synchronized void checkRequest(RequestInfo reqInfo) throws IOException {
|
|||||||
// Ensure that the IPCs are arriving in-order as expected.
|
// Ensure that the IPCs are arriving in-order as expected.
|
||||||
checkSync(reqInfo.getIpcSerialNumber() > currentEpochIpcSerial,
|
checkSync(reqInfo.getIpcSerialNumber() > currentEpochIpcSerial,
|
||||||
"IPC serial %s from client %s was not higher than prior highest " +
|
"IPC serial %s from client %s was not higher than prior highest " +
|
||||||
"IPC serial %s", reqInfo.getIpcSerialNumber(),
|
"IPC serial %s ; journal id: %s", reqInfo.getIpcSerialNumber(),
|
||||||
Server.getRemoteIp(),
|
Server.getRemoteIp(), currentEpochIpcSerial, journalId);
|
||||||
currentEpochIpcSerial);
|
|
||||||
currentEpochIpcSerial = reqInfo.getIpcSerialNumber();
|
currentEpochIpcSerial = reqInfo.getIpcSerialNumber();
|
||||||
|
|
||||||
if (reqInfo.hasCommittedTxId()) {
|
if (reqInfo.hasCommittedTxId()) {
|
||||||
Preconditions.checkArgument(
|
Preconditions.checkArgument(
|
||||||
reqInfo.getCommittedTxId() >= committedTxnId.get(),
|
reqInfo.getCommittedTxId() >= committedTxnId.get(),
|
||||||
"Client trying to move committed txid backward from " +
|
"Client trying to move committed txid backward from " +
|
||||||
committedTxnId.get() + " to " + reqInfo.getCommittedTxId());
|
committedTxnId.get() + " to " + reqInfo.getCommittedTxId() +
|
||||||
|
" ; journal id: " + journalId);
|
||||||
|
|
||||||
committedTxnId.set(reqInfo.getCommittedTxId());
|
committedTxnId.set(reqInfo.getCommittedTxId());
|
||||||
}
|
}
|
||||||
@ -486,7 +491,7 @@ private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOExcept
|
|||||||
if (reqInfo.getEpoch() != lastWriterEpoch.get()) {
|
if (reqInfo.getEpoch() != lastWriterEpoch.get()) {
|
||||||
throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
|
throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
|
||||||
" is not the current writer epoch " +
|
" is not the current writer epoch " +
|
||||||
lastWriterEpoch.get());
|
lastWriterEpoch.get() + " ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -497,7 +502,8 @@ public synchronized boolean isFormatted() {
|
|||||||
private void checkFormatted() throws JournalNotFormattedException {
|
private void checkFormatted() throws JournalNotFormattedException {
|
||||||
if (!isFormatted()) {
|
if (!isFormatted()) {
|
||||||
throw new JournalNotFormattedException("Journal " +
|
throw new JournalNotFormattedException("Journal " +
|
||||||
storage.getSingularStorageDir() + " not formatted");
|
storage.getSingularStorageDir() + " not formatted" +
|
||||||
|
" ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -542,7 +548,8 @@ public synchronized void startLogSegment(RequestInfo reqInfo, long txid,
|
|||||||
if (curSegment != null) {
|
if (curSegment != null) {
|
||||||
LOG.warn("Client is requesting a new log segment " + txid +
|
LOG.warn("Client is requesting a new log segment " + txid +
|
||||||
" though we are already writing " + curSegment + ". " +
|
" though we are already writing " + curSegment + ". " +
|
||||||
"Aborting the current segment in order to begin the new one.");
|
"Aborting the current segment in order to begin the new one." +
|
||||||
|
" ; journal id: " + journalId);
|
||||||
// The writer may have lost a connection to us and is now
|
// The writer may have lost a connection to us and is now
|
||||||
// re-connecting after the connection came back.
|
// re-connecting after the connection came back.
|
||||||
// We should abort our own old segment.
|
// We should abort our own old segment.
|
||||||
@ -556,7 +563,7 @@ public synchronized void startLogSegment(RequestInfo reqInfo, long txid,
|
|||||||
if (existing != null) {
|
if (existing != null) {
|
||||||
if (!existing.isInProgress()) {
|
if (!existing.isInProgress()) {
|
||||||
throw new IllegalStateException("Already have a finalized segment " +
|
throw new IllegalStateException("Already have a finalized segment " +
|
||||||
existing + " beginning at " + txid);
|
existing + " beginning at " + txid + " ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If it's in-progress, it should only contain one transaction,
|
// If it's in-progress, it should only contain one transaction,
|
||||||
@ -565,7 +572,8 @@ public synchronized void startLogSegment(RequestInfo reqInfo, long txid,
|
|||||||
existing.scanLog(Long.MAX_VALUE, false);
|
existing.scanLog(Long.MAX_VALUE, false);
|
||||||
if (existing.getLastTxId() != existing.getFirstTxId()) {
|
if (existing.getLastTxId() != existing.getFirstTxId()) {
|
||||||
throw new IllegalStateException("The log file " +
|
throw new IllegalStateException("The log file " +
|
||||||
existing + " seems to contain valid transactions");
|
existing + " seems to contain valid transactions" +
|
||||||
|
" ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -573,7 +581,7 @@ public synchronized void startLogSegment(RequestInfo reqInfo, long txid,
|
|||||||
if (curLastWriterEpoch != reqInfo.getEpoch()) {
|
if (curLastWriterEpoch != reqInfo.getEpoch()) {
|
||||||
LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch +
|
LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch +
|
||||||
" to " + reqInfo.getEpoch() + " for client " +
|
" to " + reqInfo.getEpoch() + " for client " +
|
||||||
Server.getRemoteIp());
|
Server.getRemoteIp() + " ; journal id: " + journalId);
|
||||||
lastWriterEpoch.set(reqInfo.getEpoch());
|
lastWriterEpoch.set(reqInfo.getEpoch());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -608,8 +616,8 @@ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
|
|||||||
|
|
||||||
checkSync(nextTxId == endTxId + 1,
|
checkSync(nextTxId == endTxId + 1,
|
||||||
"Trying to finalize in-progress log segment %s to end at " +
|
"Trying to finalize in-progress log segment %s to end at " +
|
||||||
"txid %s but only written up to txid %s",
|
"txid %s but only written up to txid %s ; journal id: %s",
|
||||||
startTxId, endTxId, nextTxId - 1);
|
startTxId, endTxId, nextTxId - 1, journalId);
|
||||||
// No need to validate the edit log if the client is finalizing
|
// No need to validate the edit log if the client is finalizing
|
||||||
// the log segment that it was just writing to.
|
// the log segment that it was just writing to.
|
||||||
needsValidation = false;
|
needsValidation = false;
|
||||||
@ -618,25 +626,27 @@ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
|
|||||||
FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId);
|
FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId);
|
||||||
if (elf == null) {
|
if (elf == null) {
|
||||||
throw new JournalOutOfSyncException("No log file to finalize at " +
|
throw new JournalOutOfSyncException("No log file to finalize at " +
|
||||||
"transaction ID " + startTxId);
|
"transaction ID " + startTxId + " ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (elf.isInProgress()) {
|
if (elf.isInProgress()) {
|
||||||
if (needsValidation) {
|
if (needsValidation) {
|
||||||
LOG.info("Validating log segment " + elf.getFile() + " about to be " +
|
LOG.info("Validating log segment " + elf.getFile() + " about to be " +
|
||||||
"finalized");
|
"finalized ; journal id: " + journalId);
|
||||||
elf.scanLog(Long.MAX_VALUE, false);
|
elf.scanLog(Long.MAX_VALUE, false);
|
||||||
|
|
||||||
checkSync(elf.getLastTxId() == endTxId,
|
checkSync(elf.getLastTxId() == endTxId,
|
||||||
"Trying to finalize in-progress log segment %s to end at " +
|
"Trying to finalize in-progress log segment %s to end at " +
|
||||||
"txid %s but log %s on disk only contains up to txid %s",
|
"txid %s but log %s on disk only contains up to txid %s " +
|
||||||
startTxId, endTxId, elf.getFile(), elf.getLastTxId());
|
"; journal id: %s",
|
||||||
|
startTxId, endTxId, elf.getFile(), elf.getLastTxId(), journalId);
|
||||||
}
|
}
|
||||||
fjm.finalizeLogSegment(startTxId, endTxId);
|
fjm.finalizeLogSegment(startTxId, endTxId);
|
||||||
} else {
|
} else {
|
||||||
Preconditions.checkArgument(endTxId == elf.getLastTxId(),
|
Preconditions.checkArgument(endTxId == elf.getLastTxId(),
|
||||||
"Trying to re-finalize already finalized log " +
|
"Trying to re-finalize already finalized log " +
|
||||||
elf + " with different endTxId " + endTxId);
|
elf + " with different endTxId " + endTxId +
|
||||||
|
" ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Once logs are finalized, a different length will never be decided.
|
// Once logs are finalized, a different length will never be decided.
|
||||||
@ -667,7 +677,8 @@ private void purgePaxosDecision(long segmentTxId) throws IOException {
|
|||||||
File paxosFile = storage.getPaxosFile(segmentTxId);
|
File paxosFile = storage.getPaxosFile(segmentTxId);
|
||||||
if (paxosFile.exists()) {
|
if (paxosFile.exists()) {
|
||||||
if (!paxosFile.delete()) {
|
if (!paxosFile.delete()) {
|
||||||
throw new IOException("Unable to delete paxos file " + paxosFile);
|
throw new IOException("Unable to delete paxos file " + paxosFile +
|
||||||
|
" ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -717,7 +728,7 @@ SegmentStateProto getSegmentInfo(long segmentTxId)
|
|||||||
}
|
}
|
||||||
if (elf.getLastTxId() == HdfsServerConstants.INVALID_TXID) {
|
if (elf.getLastTxId() == HdfsServerConstants.INVALID_TXID) {
|
||||||
LOG.info("Edit log file " + elf + " appears to be empty. " +
|
LOG.info("Edit log file " + elf + " appears to be empty. " +
|
||||||
"Moving it aside...");
|
"Moving it aside..." + " ; journal id: " + journalId);
|
||||||
elf.moveAsideEmptyFile();
|
elf.moveAsideEmptyFile();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -727,7 +738,7 @@ SegmentStateProto getSegmentInfo(long segmentTxId)
|
|||||||
.setIsInProgress(elf.isInProgress())
|
.setIsInProgress(elf.isInProgress())
|
||||||
.build();
|
.build();
|
||||||
LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " +
|
LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " +
|
||||||
TextFormat.shortDebugString(ret));
|
TextFormat.shortDebugString(ret) + " ; journal id: " + journalId);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -771,7 +782,7 @@ public synchronized PrepareRecoveryResponseProto prepareRecovery(
|
|||||||
|
|
||||||
PrepareRecoveryResponseProto resp = builder.build();
|
PrepareRecoveryResponseProto resp = builder.build();
|
||||||
LOG.info("Prepared recovery for segment " + segmentTxId + ": " +
|
LOG.info("Prepared recovery for segment " + segmentTxId + ": " +
|
||||||
TextFormat.shortDebugString(resp));
|
TextFormat.shortDebugString(resp) + " ; journal id: " + journalId);
|
||||||
return resp;
|
return resp;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -792,8 +803,8 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
|
|||||||
// at least one transaction.
|
// at least one transaction.
|
||||||
Preconditions.checkArgument(segment.getEndTxId() > 0 &&
|
Preconditions.checkArgument(segment.getEndTxId() > 0 &&
|
||||||
segment.getEndTxId() >= segmentTxId,
|
segment.getEndTxId() >= segmentTxId,
|
||||||
"bad recovery state for segment %s: %s",
|
"bad recovery state for segment %s: %s ; journal id: %s",
|
||||||
segmentTxId, TextFormat.shortDebugString(segment));
|
segmentTxId, TextFormat.shortDebugString(segment), journalId);
|
||||||
|
|
||||||
PersistedRecoveryPaxosData oldData = getPersistedPaxosData(segmentTxId);
|
PersistedRecoveryPaxosData oldData = getPersistedPaxosData(segmentTxId);
|
||||||
PersistedRecoveryPaxosData newData = PersistedRecoveryPaxosData.newBuilder()
|
PersistedRecoveryPaxosData newData = PersistedRecoveryPaxosData.newBuilder()
|
||||||
@ -806,8 +817,9 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
|
|||||||
// checkRequest() call above should filter non-increasing epoch numbers.
|
// checkRequest() call above should filter non-increasing epoch numbers.
|
||||||
if (oldData != null) {
|
if (oldData != null) {
|
||||||
alwaysAssert(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(),
|
alwaysAssert(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(),
|
||||||
"Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n",
|
"Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: " +
|
||||||
oldData, newData);
|
"%s\nJournalId: %s\n",
|
||||||
|
oldData, newData, journalId);
|
||||||
}
|
}
|
||||||
|
|
||||||
File syncedFile = null;
|
File syncedFile = null;
|
||||||
@ -817,7 +829,7 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
|
|||||||
currentSegment.getEndTxId() != segment.getEndTxId()) {
|
currentSegment.getEndTxId() != segment.getEndTxId()) {
|
||||||
if (currentSegment == null) {
|
if (currentSegment == null) {
|
||||||
LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
|
LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
|
||||||
": no current segment in place");
|
": no current segment in place ; journal id: " + journalId);
|
||||||
|
|
||||||
// Update the highest txid for lag metrics
|
// Update the highest txid for lag metrics
|
||||||
updateHighestWrittenTxId(Math.max(segment.getEndTxId(),
|
updateHighestWrittenTxId(Math.max(segment.getEndTxId(),
|
||||||
@ -825,7 +837,7 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
|
|||||||
} else {
|
} else {
|
||||||
LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
|
LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
|
||||||
": old segment " + TextFormat.shortDebugString(currentSegment) +
|
": old segment " + TextFormat.shortDebugString(currentSegment) +
|
||||||
" is not the right length");
|
" is not the right length ; journal id: " + journalId);
|
||||||
|
|
||||||
// Paranoid sanity check: if the new log is shorter than the log we
|
// Paranoid sanity check: if the new log is shorter than the log we
|
||||||
// currently have, we should not end up discarding any transactions
|
// currently have, we should not end up discarding any transactions
|
||||||
@ -838,14 +850,15 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
|
|||||||
" with new segment " +
|
" with new segment " +
|
||||||
TextFormat.shortDebugString(segment) +
|
TextFormat.shortDebugString(segment) +
|
||||||
": would discard already-committed txn " +
|
": would discard already-committed txn " +
|
||||||
committedTxnId.get());
|
committedTxnId.get() +
|
||||||
|
" ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Another paranoid check: we should not be asked to synchronize a log
|
// Another paranoid check: we should not be asked to synchronize a log
|
||||||
// on top of a finalized segment.
|
// on top of a finalized segment.
|
||||||
alwaysAssert(currentSegment.getIsInProgress(),
|
alwaysAssert(currentSegment.getIsInProgress(),
|
||||||
"Should never be asked to synchronize a different log on top of an " +
|
"Should never be asked to synchronize a different log on top of " +
|
||||||
"already-finalized segment");
|
"an already-finalized segment ; journal id: " + journalId);
|
||||||
|
|
||||||
// If we're shortening the log, update our highest txid
|
// If we're shortening the log, update our highest txid
|
||||||
// used for lag metrics.
|
// used for lag metrics.
|
||||||
@ -858,7 +871,7 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
|
|||||||
} else {
|
} else {
|
||||||
LOG.info("Skipping download of log " +
|
LOG.info("Skipping download of log " +
|
||||||
TextFormat.shortDebugString(segment) +
|
TextFormat.shortDebugString(segment) +
|
||||||
": already have up-to-date logs");
|
": already have up-to-date logs ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is one of the few places in the protocol where we have a single
|
// This is one of the few places in the protocol where we have a single
|
||||||
@ -890,12 +903,12 @@ public synchronized void acceptRecovery(RequestInfo reqInfo,
|
|||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Accepted recovery for segment " + segmentTxId + ": " +
|
LOG.info("Accepted recovery for segment " + segmentTxId + ": " +
|
||||||
TextFormat.shortDebugString(newData));
|
TextFormat.shortDebugString(newData) + " ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private LongRange txnRange(SegmentStateProto seg) {
|
private LongRange txnRange(SegmentStateProto seg) {
|
||||||
Preconditions.checkArgument(seg.hasEndTxId(),
|
Preconditions.checkArgument(seg.hasEndTxId(),
|
||||||
"invalid segment: %s", seg);
|
"invalid segment: %s ; journal id: %s", seg, journalId);
|
||||||
return new LongRange(seg.getStartTxId(), seg.getEndTxId());
|
return new LongRange(seg.getStartTxId(), seg.getEndTxId());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -970,7 +983,7 @@ private void completeHalfDoneAcceptRecovery(
|
|||||||
if (tmp.exists()) {
|
if (tmp.exists()) {
|
||||||
File dst = storage.getInProgressEditLog(segmentId);
|
File dst = storage.getInProgressEditLog(segmentId);
|
||||||
LOG.info("Rolling forward previously half-completed synchronization: " +
|
LOG.info("Rolling forward previously half-completed synchronization: " +
|
||||||
tmp + " -> " + dst);
|
tmp + " -> " + dst + " ; journal id: " + journalId);
|
||||||
FileUtil.replaceFile(tmp, dst);
|
FileUtil.replaceFile(tmp, dst);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -991,8 +1004,8 @@ private PersistedRecoveryPaxosData getPersistedPaxosData(long segmentTxId)
|
|||||||
PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData.parseDelimitedFrom(in);
|
PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData.parseDelimitedFrom(in);
|
||||||
Preconditions.checkState(ret != null &&
|
Preconditions.checkState(ret != null &&
|
||||||
ret.getSegmentState().getStartTxId() == segmentTxId,
|
ret.getSegmentState().getStartTxId() == segmentTxId,
|
||||||
"Bad persisted data for segment %s: %s",
|
"Bad persisted data for segment %s: %s ; journal id: %s",
|
||||||
segmentTxId, ret);
|
segmentTxId, ret, journalId);
|
||||||
return ret;
|
return ret;
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(in);
|
IOUtils.closeStream(in);
|
||||||
@ -1041,7 +1054,7 @@ public synchronized void doUpgrade(StorageInfo sInfo) throws IOException {
|
|||||||
storage.cTime = sInfo.cTime;
|
storage.cTime = sInfo.cTime;
|
||||||
int oldLV = storage.getLayoutVersion();
|
int oldLV = storage.getLayoutVersion();
|
||||||
storage.layoutVersion = sInfo.layoutVersion;
|
storage.layoutVersion = sInfo.layoutVersion;
|
||||||
LOG.info("Starting upgrade of edits directory: "
|
LOG.info("Starting upgrade of edits directory: " + storage.getRoot()
|
||||||
+ ".\n old LV = " + oldLV
|
+ ".\n old LV = " + oldLV
|
||||||
+ "; old CTime = " + oldCTime
|
+ "; old CTime = " + oldCTime
|
||||||
+ ".\n new LV = " + storage.getLayoutVersion()
|
+ ".\n new LV = " + storage.getLayoutVersion()
|
||||||
@ -1112,7 +1125,7 @@ synchronized boolean moveTmpSegmentToCurrent(File tmpFile, File finalFile,
|
|||||||
if (endTxId <= committedTxnId.get()) {
|
if (endTxId <= committedTxnId.get()) {
|
||||||
if (!finalFile.getParentFile().exists()) {
|
if (!finalFile.getParentFile().exists()) {
|
||||||
LOG.error(finalFile.getParentFile() + " doesn't exist. Aborting tmp " +
|
LOG.error(finalFile.getParentFile() + " doesn't exist. Aborting tmp " +
|
||||||
"segment move to current directory");
|
"segment move to current directory ; journal id: " + journalId);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
Files.move(tmpFile.toPath(), finalFile.toPath(),
|
Files.move(tmpFile.toPath(), finalFile.toPath(),
|
||||||
@ -1122,13 +1135,13 @@ synchronized boolean moveTmpSegmentToCurrent(File tmpFile, File finalFile,
|
|||||||
} else {
|
} else {
|
||||||
success = false;
|
success = false;
|
||||||
LOG.warn("Unable to move edits file from " + tmpFile + " to " +
|
LOG.warn("Unable to move edits file from " + tmpFile + " to " +
|
||||||
finalFile);
|
finalFile + " ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
success = false;
|
success = false;
|
||||||
LOG.error("The endTxId of the temporary file is not less than the " +
|
LOG.error("The endTxId of the temporary file is not less than the " +
|
||||||
"last committed transaction id. Aborting move to final file" +
|
"last committed transaction id. Aborting move to final file" +
|
||||||
finalFile);
|
finalFile + " ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return success;
|
return success;
|
||||||
|
Loading…
Reference in New Issue
Block a user