HDFS-7097. Allow block reports to be processed during checkpointing on standby name node. (kihwal via wang)
This commit is contained in:
parent
56f3eecc12
commit
f43a20c529
@ -486,6 +486,9 @@ Release 2.7.0 - UNRELEASED
|
||||
HDFS-7303. NN UI fails to distinguish datanodes on the same host.
|
||||
(Benoy Antony via wheat9)
|
||||
|
||||
HDFS-7097. Allow block reports to be processed during checkpointing on
|
||||
standby name node. (kihwal via wang)
|
||||
|
||||
Release 2.6.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -1183,9 +1183,11 @@ private static byte[] renameReservedRootComponentOnUpgrade(byte[] component,
|
||||
@Deprecated
|
||||
static class Saver {
|
||||
private static final int LAYOUT_VERSION = -51;
|
||||
public static final int CHECK_CANCEL_INTERVAL = 4096;
|
||||
private final SaveNamespaceContext context;
|
||||
/** Set to true once an image has been written */
|
||||
private boolean saved = false;
|
||||
private long checkCancelCounter = 0;
|
||||
|
||||
/** The MD5 checksum of the file that was written */
|
||||
private MD5Hash savedDigest;
|
||||
@ -1322,7 +1324,6 @@ private int saveChildren(ReadOnlyList<INode> children,
|
||||
// Write normal children INode.
|
||||
out.writeInt(children.size());
|
||||
int dirNum = 0;
|
||||
int i = 0;
|
||||
for(INode child : children) {
|
||||
// print all children first
|
||||
// TODO: for HDFS-5428, we cannot change the format/content of fsimage
|
||||
@ -1335,7 +1336,7 @@ private int saveChildren(ReadOnlyList<INode> children,
|
||||
&& child.asFile().isUnderConstruction()) {
|
||||
this.snapshotUCMap.put(child.getId(), child.asFile());
|
||||
}
|
||||
if (i++ % 50 == 0) {
|
||||
if (checkCancelCounter++ % CHECK_CANCEL_INTERVAL == 0) {
|
||||
context.checkCancelled();
|
||||
}
|
||||
}
|
||||
|
@ -496,6 +496,15 @@ private void logAuditEvent(boolean succeeded,
|
||||
/** Lock to protect FSNamesystem. */
|
||||
private final FSNamesystemLock fsLock;
|
||||
|
||||
/**
|
||||
* Checkpoint lock to protect FSNamesystem modification on standby NNs.
|
||||
* Unlike fsLock, it does not affect block updates. On active NNs, this lock
|
||||
* does not provide proper protection, because there are operations that
|
||||
* modify both block and name system state. Even on standby, fsLock is
|
||||
* used when block state changes need to be blocked.
|
||||
*/
|
||||
private final ReentrantLock cpLock;
|
||||
|
||||
/**
|
||||
* Used when this NN is in standby state to read from the shared edit log.
|
||||
*/
|
||||
@ -758,6 +767,8 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
|
||||
LOG.info("fsLock is fair:" + fair);
|
||||
fsLock = new FSNamesystemLock(fair);
|
||||
cond = fsLock.writeLock().newCondition();
|
||||
cpLock = new ReentrantLock();
|
||||
|
||||
this.fsImage = fsImage;
|
||||
try {
|
||||
resourceRecheckInterval = conf.getLong(
|
||||
@ -1554,6 +1565,22 @@ public int getWriteHoldCount() {
|
||||
return this.fsLock.getWriteHoldCount();
|
||||
}
|
||||
|
||||
/** Lock the checkpoint lock */
|
||||
public void cpLock() {
|
||||
this.cpLock.lock();
|
||||
}
|
||||
|
||||
/** Lock the checkpoint lock interrupibly */
|
||||
public void cpLockInterruptibly() throws InterruptedException {
|
||||
this.cpLock.lockInterruptibly();
|
||||
}
|
||||
|
||||
/** Unlock the checkpoint lock */
|
||||
public void cpUnlock() {
|
||||
this.cpLock.unlock();
|
||||
}
|
||||
|
||||
|
||||
NamespaceInfo getNamespaceInfo() {
|
||||
readLock();
|
||||
try {
|
||||
@ -5274,7 +5301,7 @@ void saveNamespace() throws AccessControlException, IOException {
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
checkSuperuserPrivilege();
|
||||
|
||||
|
||||
cpLock(); // Block if a checkpointing is in progress on standby.
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
@ -5286,6 +5313,7 @@ void saveNamespace() throws AccessControlException, IOException {
|
||||
getFSImage().saveNamespace(this);
|
||||
} finally {
|
||||
readUnlock();
|
||||
cpUnlock();
|
||||
}
|
||||
LOG.info("New namespace image has been created");
|
||||
}
|
||||
@ -5300,6 +5328,7 @@ boolean restoreFailedStorage(String arg) throws AccessControlException,
|
||||
StandbyException {
|
||||
checkSuperuserPrivilege();
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
cpLock(); // Block if a checkpointing is in progress on standby.
|
||||
writeLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
@ -5314,6 +5343,7 @@ boolean restoreFailedStorage(String arg) throws AccessControlException,
|
||||
return val;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
cpUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -5324,12 +5354,14 @@ Date getStartTime() {
|
||||
void finalizeUpgrade() throws IOException {
|
||||
checkSuperuserPrivilege();
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
cpLock(); // Block if a checkpointing is in progress on standby.
|
||||
writeLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
getFSImage().finalizeUpgrade(this.isHaEnabled() && inActiveState());
|
||||
} finally {
|
||||
writeUnlock();
|
||||
cpUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -7542,6 +7574,11 @@ public ReentrantLock getLongReadLockForTests() {
|
||||
return fsLock.longReadLock;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ReentrantLock getCpLockForTests() {
|
||||
return cpLock;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public SafeModeInfo getSafeModeInfoForTests() {
|
||||
return safeMode;
|
||||
|
@ -183,6 +183,8 @@ public void catchupDuringFailover() throws IOException {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
try {
|
||||
// It is already under the full name system lock and the checkpointer
|
||||
// thread is already stopped. No need to acqure any other lock.
|
||||
doTailEdits();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
@ -321,7 +323,15 @@ private void doWork() {
|
||||
if (!shouldRun) {
|
||||
break;
|
||||
}
|
||||
doTailEdits();
|
||||
// Prevent reading of name system while being modified. The full
|
||||
// name system lock will be acquired to further block even the block
|
||||
// state updates.
|
||||
namesystem.cpLockInterruptibly();
|
||||
try {
|
||||
doTailEdits();
|
||||
} finally {
|
||||
namesystem.cpUnlock();
|
||||
}
|
||||
} catch (EditLogInputException elie) {
|
||||
LOG.warn("Error while reading edits from disk. Will try again.", elie);
|
||||
} catch (InterruptedException ie) {
|
||||
|
@ -153,7 +153,10 @@ private void doCheckpoint() throws InterruptedException, IOException {
|
||||
final long txid;
|
||||
final NameNodeFile imageType;
|
||||
|
||||
namesystem.longReadLockInterruptibly();
|
||||
// Acquire cpLock to make sure no one is modifying the name system.
|
||||
// It does not need the full namesystem write lock, since the only thing
|
||||
// that modifies namesystem on standby node is edit log replaying.
|
||||
namesystem.cpLockInterruptibly();
|
||||
try {
|
||||
assert namesystem.getEditLog().isOpenForRead() :
|
||||
"Standby Checkpointer should only attempt a checkpoint when " +
|
||||
@ -190,7 +193,7 @@ private void doCheckpoint() throws InterruptedException, IOException {
|
||||
img.saveLegacyOIVImage(namesystem, outputDir, canceler);
|
||||
}
|
||||
} finally {
|
||||
namesystem.longReadUnlock();
|
||||
namesystem.cpUnlock();
|
||||
}
|
||||
|
||||
// Upload the saved checkpoint back to the active
|
||||
@ -226,8 +229,11 @@ public Void call() throws IOException {
|
||||
* minute or so.
|
||||
*/
|
||||
public void cancelAndPreventCheckpoints(String msg) throws ServiceFailedException {
|
||||
thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS);
|
||||
synchronized (cancelLock) {
|
||||
// The checkpointer thread takes this lock and checks if checkpointing is
|
||||
// postponed.
|
||||
thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS);
|
||||
|
||||
// Before beginning a checkpoint, the checkpointer thread
|
||||
// takes this lock, and creates a canceler object.
|
||||
// If the canceler is non-null, then a checkpoint is in
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
@ -91,7 +92,7 @@ public void setupCluster() throws Exception {
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.nnTopology(topology)
|
||||
.numDataNodes(0)
|
||||
.numDataNodes(1)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
|
||||
@ -359,6 +360,13 @@ public void testStandbyExceptionThrownDuringCheckpoint() throws Exception {
|
||||
} catch (StandbyException se) {
|
||||
GenericTestUtils.assertExceptionContains("is not supported", se);
|
||||
}
|
||||
|
||||
// Make sure new incremental block reports are processed during
|
||||
// checkpointing on the SBN.
|
||||
assertEquals(0, cluster.getNamesystem(1).getPendingDataNodeMessageCount());
|
||||
doCreate();
|
||||
Thread.sleep(1000);
|
||||
assertTrue(cluster.getNamesystem(1).getPendingDataNodeMessageCount() > 0);
|
||||
|
||||
// Make sure that the checkpoint is still going on, implying that the client
|
||||
// RPC to the SBN happened during the checkpoint.
|
||||
@ -410,7 +418,7 @@ public void run() {
|
||||
|
||||
assertFalse(nn1.getNamesystem().getFsLockForTests().hasQueuedThreads());
|
||||
assertFalse(nn1.getNamesystem().getFsLockForTests().isWriteLocked());
|
||||
assertTrue(nn1.getNamesystem().getLongReadLockForTests().hasQueuedThreads());
|
||||
assertTrue(nn1.getNamesystem().getCpLockForTests().hasQueuedThreads());
|
||||
|
||||
// Get /jmx of the standby NN web UI, which will cause the FSNS read lock to
|
||||
// be taken.
|
||||
@ -437,6 +445,15 @@ private void doEdits(int start, int stop) throws IOException {
|
||||
fs.mkdirs(p);
|
||||
}
|
||||
}
|
||||
|
||||
private void doCreate() throws IOException {
|
||||
Path p = new Path("/testFile");
|
||||
fs.delete(p, false);
|
||||
FSDataOutputStream out = fs.create(p, (short)1);
|
||||
out.write(42);
|
||||
out.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A codec which just slows down the saving of the image significantly
|
||||
|
Loading…
Reference in New Issue
Block a user