HDFS-11180. Intermittent deadlock in NameNode when failover happens.
This commit is contained in:
parent
1f7613be95
commit
e0fa49234f
@ -108,6 +108,33 @@
|
||||
<Class name="org.apache.hadoop.hdfs.server.namenode.FSEditLog" />
|
||||
<Field name="journalSet" />
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
<!--
|
||||
FSEditLog#getTotalSyncCount is not synchronized because this method is
|
||||
used by metrics. NullPointerException can happen and it is ignored.
|
||||
-->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.hdfs.server.namenode.FSEditLog" />
|
||||
<Field name="editLogStream" />
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
<!--
|
||||
FSEditLog#isOpenForWriteWithoutLock and FSEditLog#isSegmentOpenWithoutLock
|
||||
are not synchronized because these methods are used by metrics.
|
||||
-->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.hdfs.server.namenode.FSEditLog" />
|
||||
<Field name="state" />
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
<!--
|
||||
All of the threads which update/increment txid are synchronized,
|
||||
so make txid volatile instead of AtomicLong.
|
||||
-->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.hdfs.server.namenode.FSEditLog" />
|
||||
<Field name="txid" />
|
||||
<Bug pattern="VO_VOLATILE_INCREMENT" />
|
||||
</Match>
|
||||
<!--
|
||||
This method isn't performance-critical and is much clearer to write as it's written.
|
||||
|
@ -155,14 +155,16 @@ private enum State {
|
||||
private EditLogOutputStream editLogStream = null;
|
||||
|
||||
// a monotonically increasing counter that represents transactionIds.
|
||||
private long txid = 0;
|
||||
// All of the threads which update/increment txid are synchronized,
|
||||
// so make txid volatile instead of AtomicLong.
|
||||
private volatile long txid = 0;
|
||||
|
||||
// stores the last synced transactionId.
|
||||
private long synctxid = 0;
|
||||
|
||||
// the first txid of the log that's currently open for writing.
|
||||
// If this value is N, we are currently writing to edits_inprogress_N
|
||||
private long curSegmentTxId = HdfsServerConstants.INVALID_TXID;
|
||||
private volatile long curSegmentTxId = HdfsServerConstants.INVALID_TXID;
|
||||
|
||||
// the time of printing the statistics to the log file.
|
||||
private long lastPrintTime;
|
||||
@ -339,6 +341,17 @@ synchronized boolean isOpenForWrite() {
|
||||
state == State.BETWEEN_LOG_SEGMENTS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the log is currently open in write mode.
|
||||
* This method is not synchronized and must be used only for metrics.
|
||||
* @return true if the log is currently open in write mode, regardless
|
||||
* of whether it actually has an open segment.
|
||||
*/
|
||||
boolean isOpenForWriteWithoutLock() {
|
||||
return state == State.IN_SEGMENT ||
|
||||
state == State.BETWEEN_LOG_SEGMENTS;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the log is open in write mode and has a segment open
|
||||
* ready to take edits.
|
||||
@ -347,6 +360,16 @@ synchronized boolean isSegmentOpen() {
|
||||
return state == State.IN_SEGMENT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true the state is IN_SEGMENT.
|
||||
* This method is not synchronized and must be used only for metrics.
|
||||
* @return true if the log is open in write mode and has a segment open
|
||||
* ready to take edits.
|
||||
*/
|
||||
boolean isSegmentOpenWithoutLock() {
|
||||
return state == State.IN_SEGMENT;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the log is open in read mode.
|
||||
*/
|
||||
@ -523,6 +546,15 @@ public synchronized long getLastWrittenTxId() {
|
||||
return txid;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the transaction ID of the last transaction written to the log.
|
||||
* This method is not synchronized and must be used only for metrics.
|
||||
* @return The transaction ID of the last transaction written to the log
|
||||
*/
|
||||
long getLastWrittenTxIdWithoutLock() {
|
||||
return txid;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the first transaction ID in the current log segment
|
||||
*/
|
||||
@ -532,6 +564,15 @@ synchronized long getCurSegmentTxId() {
|
||||
return curSegmentTxId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the first transaction ID in the current log segment.
|
||||
* This method is not synchronized and must be used only for metrics.
|
||||
* @return The first transaction ID in the current log segment
|
||||
*/
|
||||
long getCurSegmentTxIdWithoutLock() {
|
||||
return curSegmentTxId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the transaction ID to use for the next transaction written.
|
||||
*/
|
||||
@ -1182,7 +1223,9 @@ void logRemoveXAttrs(String src, List<XAttr> xAttrs, boolean toLogRpcIds) {
|
||||
/**
|
||||
* Get all the journals this edit log is currently operating on.
|
||||
*/
|
||||
synchronized List<JournalAndStream> getJournals() {
|
||||
List<JournalAndStream> getJournals() {
|
||||
// The list implementation is CopyOnWriteArrayList,
|
||||
// so we don't need to synchronize this method.
|
||||
return journalSet.getAllJournalStreams();
|
||||
}
|
||||
|
||||
@ -1190,7 +1233,7 @@ synchronized List<JournalAndStream> getJournals() {
|
||||
* Used only by tests.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
synchronized public JournalSet getJournalSet() {
|
||||
public JournalSet getJournalSet() {
|
||||
return journalSet;
|
||||
}
|
||||
|
||||
@ -1731,11 +1774,18 @@ public void restart() {
|
||||
* Return total number of syncs happened on this edit log.
|
||||
* @return long - count
|
||||
*/
|
||||
public synchronized long getTotalSyncCount() {
|
||||
if (editLogStream != null) {
|
||||
return editLogStream.getNumSync();
|
||||
} else {
|
||||
public long getTotalSyncCount() {
|
||||
// Avoid NPE as possible.
|
||||
if (editLogStream == null) {
|
||||
return 0;
|
||||
}
|
||||
long count = 0;
|
||||
try {
|
||||
count = editLogStream.getNumSync();
|
||||
} catch (NullPointerException ignore) {
|
||||
// This method is used for metrics, so we don't synchronize it.
|
||||
// Therefore NPE can happen even if there is a null check before.
|
||||
}
|
||||
return count;
|
||||
}
|
||||
}
|
||||
|
@ -924,7 +924,7 @@ public void saveLegacyOIVImage(FSNamesystem source, String targetDir,
|
||||
Canceler canceler) throws IOException {
|
||||
FSImageCompression compression =
|
||||
FSImageCompression.createCompression(conf);
|
||||
long txid = getLastAppliedOrWrittenTxId();
|
||||
long txid = getCorrectLastAppliedOrWrittenTxId();
|
||||
SaveNamespaceContext ctx = new SaveNamespaceContext(source, txid,
|
||||
canceler);
|
||||
FSImageFormat.Saver saver = new FSImageFormat.Saver(ctx);
|
||||
@ -1019,7 +1019,7 @@ public synchronized boolean saveNamespace(long timeWindow, long txGap,
|
||||
final long checkpointTxId = image.getCheckpointTxId();
|
||||
final long checkpointAge = Time.now() - imageFile.lastModified();
|
||||
if (checkpointAge <= timeWindow * 1000 &&
|
||||
checkpointTxId >= this.getLastAppliedOrWrittenTxId() - txGap) {
|
||||
checkpointTxId >= this.getCorrectLastAppliedOrWrittenTxId() - txGap) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -1046,7 +1046,7 @@ public synchronized void saveNamespace(FSNamesystem source, NameNodeFile nnf,
|
||||
if (editLogWasOpen) {
|
||||
editLog.endCurrentLogSegment(true);
|
||||
}
|
||||
long imageTxId = getLastAppliedOrWrittenTxId();
|
||||
long imageTxId = getCorrectLastAppliedOrWrittenTxId();
|
||||
if (!addToCheckpointing(imageTxId)) {
|
||||
throw new IOException(
|
||||
"FS image is being downloaded from another NN at txid " + imageTxId);
|
||||
@ -1417,6 +1417,15 @@ public synchronized long getLastAppliedTxId() {
|
||||
}
|
||||
|
||||
public long getLastAppliedOrWrittenTxId() {
|
||||
return Math.max(lastAppliedTxId,
|
||||
editLog != null ? editLog.getLastWrittenTxIdWithoutLock() : 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method holds a lock of FSEditLog to get the correct value.
|
||||
* This method must not be used for metrics.
|
||||
*/
|
||||
public long getCorrectLastAppliedOrWrittenTxId() {
|
||||
return Math.max(lastAppliedTxId,
|
||||
editLog != null ? editLog.getLastWrittenTxId() : 0);
|
||||
}
|
||||
|
@ -3654,7 +3654,7 @@ nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
|
||||
//create ha status
|
||||
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
|
||||
haContext.getState().getServiceState(),
|
||||
getFSImage().getLastAppliedOrWrittenTxId());
|
||||
getFSImage().getCorrectLastAppliedOrWrittenTxId());
|
||||
|
||||
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
|
||||
blockReportLeaseId);
|
||||
@ -3779,7 +3779,7 @@ public NameNodeEditLogRoller(long rollThreshold, int sleepIntervalMs) {
|
||||
public void run() {
|
||||
while (fsRunning && shouldRun) {
|
||||
try {
|
||||
long numEdits = getTransactionsSinceLastLogRoll();
|
||||
long numEdits = getCorrectTransactionsSinceLastLogRoll();
|
||||
if (numEdits > rollThreshold) {
|
||||
FSNamesystem.LOG.info("NameNode rolling its own edit log because"
|
||||
+ " number of edits in open segment exceeds threshold of "
|
||||
@ -3928,6 +3928,19 @@ public long getTransactionsSinceLastCheckpoint() {
|
||||
@Metric({"TransactionsSinceLastLogRoll",
|
||||
"Number of transactions since last edit log roll"})
|
||||
public long getTransactionsSinceLastLogRoll() {
|
||||
if (isInStandbyState() || !getEditLog().isSegmentOpenWithoutLock()) {
|
||||
return 0;
|
||||
} else {
|
||||
return getEditLog().getLastWrittenTxIdWithoutLock() -
|
||||
getEditLog().getCurSegmentTxIdWithoutLock() + 1;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the correct number of transactions since last edit log roll.
|
||||
* This method holds a lock of FSEditLog and must not be used for metrics.
|
||||
*/
|
||||
private long getCorrectTransactionsSinceLastLogRoll() {
|
||||
if (isInStandbyState() || !getEditLog().isSegmentOpen()) {
|
||||
return 0;
|
||||
} else {
|
||||
@ -3938,7 +3951,7 @@ public long getTransactionsSinceLastLogRoll() {
|
||||
|
||||
@Metric({"LastWrittenTransactionId", "Transaction ID written to the edit log"})
|
||||
public long getLastWrittenTransactionId() {
|
||||
return getEditLog().getLastWrittenTxId();
|
||||
return getEditLog().getLastWrittenTxIdWithoutLock();
|
||||
}
|
||||
|
||||
@Metric({"LastCheckpointTime",
|
||||
@ -5613,7 +5626,9 @@ public String getNameJournalStatus() {
|
||||
List<Map<String, String>> jasList = new ArrayList<Map<String, String>>();
|
||||
FSEditLog log = getFSImage().getEditLog();
|
||||
if (log != null) {
|
||||
boolean openForWrite = log.isOpenForWrite();
|
||||
// This flag can be false because we cannot hold a lock of FSEditLog
|
||||
// for metrics.
|
||||
boolean openForWrite = log.isOpenForWriteWithoutLock();
|
||||
for (JournalAndStream jas : log.getJournals()) {
|
||||
final Map<String, String> jasMap = new HashMap<String, String>();
|
||||
String manager = jas.getManager().toString();
|
||||
|
@ -1215,7 +1215,7 @@ public long getTransactionID() throws IOException {
|
||||
checkNNStartup();
|
||||
namesystem.checkOperation(OperationCategory.UNCHECKED);
|
||||
namesystem.checkSuperuserPrivilege();
|
||||
return namesystem.getFSImage().getLastAppliedOrWrittenTxId();
|
||||
return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
|
||||
}
|
||||
|
||||
@Override // NamenodeProtocol
|
||||
|
@ -172,7 +172,7 @@ private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, I
|
||||
FSImage img = namesystem.getFSImage();
|
||||
|
||||
long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();
|
||||
long thisCheckpointTxId = img.getLastAppliedOrWrittenTxId();
|
||||
long thisCheckpointTxId = img.getCorrectLastAppliedOrWrittenTxId();
|
||||
assert thisCheckpointTxId >= prevCheckpointTxId;
|
||||
if (thisCheckpointTxId == prevCheckpointTxId) {
|
||||
LOG.info("A checkpoint was triggered but the Standby Node has not " +
|
||||
@ -315,7 +315,7 @@ static int getCanceledCount() {
|
||||
|
||||
private long countUncheckpointedTxns() {
|
||||
FSImage img = namesystem.getFSImage();
|
||||
return img.getLastAppliedOrWrittenTxId() -
|
||||
return img.getCorrectLastAppliedOrWrittenTxId() -
|
||||
img.getStorage().getMostRecentCheckpointTxId();
|
||||
}
|
||||
|
||||
|
@ -162,6 +162,30 @@ public void testWithFSNamesystemWriteLock() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
// The test makes sure JMX request can be processed even if FSEditLog
|
||||
// is synchronized.
|
||||
@Test
|
||||
public void testWithFSEditLogLock() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
cluster.waitActive();
|
||||
synchronized (cluster.getNameNode().getFSImage().getEditLog()) {
|
||||
MBeanClient client = new MBeanClient();
|
||||
client.start();
|
||||
client.join(20000);
|
||||
assertTrue("JMX calls are blocked when FSEditLog" +
|
||||
" is synchronized by another thread", client.succeeded);
|
||||
client.interrupt();
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 120000)
|
||||
public void testFsEditLogMetrics() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
|
Loading…
Reference in New Issue
Block a user