HDFS-2684. Fix up some failing unit tests on HA branch. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1215241 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cdb9f01ad4
commit
371f4228e8
@ -61,3 +61,5 @@ HDFS-2689. HA: BookKeeperEditLogInputStream doesn't implement isInProgress() (at
|
|||||||
HDFS-2602. NN should log newly-allocated blocks without losing BlockInfo (atm)
|
HDFS-2602. NN should log newly-allocated blocks without losing BlockInfo (atm)
|
||||||
|
|
||||||
HDFS-2667. Fix transition from active to standby (todd)
|
HDFS-2667. Fix transition from active to standby (todd)
|
||||||
|
|
||||||
|
HDFS-2684. Fix up some failing unit tests on HA branch (todd)
|
||||||
|
@ -325,6 +325,8 @@ void registrationSucceeded(BPServiceActor bpServiceActor,
|
|||||||
} else {
|
} else {
|
||||||
bpRegistration = reg;
|
bpRegistration = reg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -199,6 +199,11 @@ public void stop() {
|
|||||||
checkpointManager.interrupt();
|
checkpointManager.interrupt();
|
||||||
checkpointManager = null;
|
checkpointManager = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Abort current log segment - otherwise the NN shutdown code
|
||||||
|
// will close it gracefully, which is incorrect.
|
||||||
|
getFSImage().getEditLog().abortCurrentLogSegment();
|
||||||
|
|
||||||
// Stop name-node threads
|
// Stop name-node threads
|
||||||
super.stop();
|
super.stop();
|
||||||
}
|
}
|
||||||
|
@ -916,6 +916,7 @@ synchronized void abortCurrentLogSegment() {
|
|||||||
if (editLogStream != null) {
|
if (editLogStream != null) {
|
||||||
editLogStream.abort();
|
editLogStream.abort();
|
||||||
editLogStream = null;
|
editLogStream = null;
|
||||||
|
state = State.BETWEEN_LOG_SEGMENTS;
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("All journals failed to abort", e);
|
LOG.warn("All journals failed to abort", e);
|
||||||
|
@ -495,7 +495,7 @@ void startActiveServices() throws IOException {
|
|||||||
try {
|
try {
|
||||||
FSEditLog editLog = dir.fsImage.getEditLog();
|
FSEditLog editLog = dir.fsImage.getEditLog();
|
||||||
|
|
||||||
if (!editLog.isSegmentOpen()) {
|
if (!editLog.isOpenForWrite()) {
|
||||||
// During startup, we're already open for write during initialization.
|
// During startup, we're already open for write during initialization.
|
||||||
// TODO(HA): consider adding a startup state?
|
// TODO(HA): consider adding a startup state?
|
||||||
editLog.initJournalsForWrite();
|
editLog.initJournalsForWrite();
|
||||||
@ -2774,7 +2774,7 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
|
|||||||
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
|
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
|
||||||
nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
|
nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
|
||||||
xceiverCount, maxTransfer, failedVolumes);
|
xceiverCount, maxTransfer, failedVolumes);
|
||||||
if (cmds == null) {
|
if (cmds == null || cmds.length == 0) {
|
||||||
DatanodeCommand cmd = upgradeManager.getBroadcastCommand();
|
DatanodeCommand cmd = upgradeManager.getBroadcastCommand();
|
||||||
if (cmd != null) {
|
if (cmd != null) {
|
||||||
cmds = new DatanodeCommand[] {cmd};
|
cmds = new DatanodeCommand[] {cmd};
|
||||||
|
@ -52,7 +52,7 @@
|
|||||||
*/
|
*/
|
||||||
public class TestDFSUpgrade {
|
public class TestDFSUpgrade {
|
||||||
|
|
||||||
private static final int EXPECTED_TXID = 33;
|
private static final int EXPECTED_TXID = 49;
|
||||||
private static final Log LOG = LogFactory.getLog(TestDFSUpgrade.class.getName());
|
private static final Log LOG = LogFactory.getLog(TestDFSUpgrade.class.getName());
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private int testCounter = 0;
|
private int testCounter = 0;
|
||||||
|
@ -302,7 +302,7 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
|
|||||||
testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
|
testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
|
||||||
"Cannot create a RBW block", true);
|
"Cannot create a RBW block", true);
|
||||||
// test PIPELINE_SETUP_APPEND on an existing block
|
// test PIPELINE_SETUP_APPEND on an existing block
|
||||||
newGS = newBlock.getGenerationStamp() + 1;
|
newGS = firstBlock.getGenerationStamp() + 1;
|
||||||
testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND,
|
testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND,
|
||||||
newGS, "Cannot append to a RBW replica", true);
|
newGS, "Cannot append to a RBW replica", true);
|
||||||
// test PIPELINE_SETUP_APPEND on an existing block
|
// test PIPELINE_SETUP_APPEND on an existing block
|
||||||
|
@ -110,7 +110,7 @@ public void testHeartbeat() throws Exception {
|
|||||||
|
|
||||||
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem)
|
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem)
|
||||||
.getCommands();
|
.getCommands();
|
||||||
assertEquals(null, cmds);
|
assertEquals(0, cmds.length);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
namesystem.writeUnlock();
|
namesystem.writeUnlock();
|
||||||
|
@ -240,9 +240,9 @@ public void testBackupNode() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void testCheckpoint(StartupOption op) throws Exception {
|
void testCheckpoint(StartupOption op) throws Exception {
|
||||||
Path file1 = new Path("checkpoint.dat");
|
Path file1 = new Path("/checkpoint.dat");
|
||||||
Path file2 = new Path("checkpoint2.dat");
|
Path file2 = new Path("/checkpoint2.dat");
|
||||||
Path file3 = new Path("backup.dat");
|
Path file3 = new Path("/backup.dat");
|
||||||
|
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
short replication = (short)conf.getInt("dfs.replication", 3);
|
short replication = (short)conf.getInt("dfs.replication", 3);
|
||||||
@ -341,11 +341,13 @@ void testCheckpoint(StartupOption op) throws Exception {
|
|||||||
TestCheckpoint.checkFile(fileSys, file3, replication);
|
TestCheckpoint.checkFile(fileSys, file3, replication);
|
||||||
// should also be on BN right away
|
// should also be on BN right away
|
||||||
assertTrue("file3 does not exist on BackupNode",
|
assertTrue("file3 does not exist on BackupNode",
|
||||||
op != StartupOption.BACKUP || bnFS.exists(file3));
|
op != StartupOption.BACKUP ||
|
||||||
|
backup.getNamesystem().getFileInfo(
|
||||||
|
file3.toUri().getPath(), false) != null);
|
||||||
|
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
LOG.error("Error in TestBackupNode:", e);
|
LOG.error("Error in TestBackupNode:", e);
|
||||||
assertTrue(e.getLocalizedMessage(), false);
|
throw new AssertionError(e);
|
||||||
} finally {
|
} finally {
|
||||||
if(backup != null) backup.stop();
|
if(backup != null) backup.stop();
|
||||||
if(fileSys != null) fileSys.close();
|
if(fileSys != null) fileSys.close();
|
||||||
|
@ -923,10 +923,12 @@ public void testSaveNamespace() throws IOException {
|
|||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final int EXPECTED_TXNS_FIRST_SEG = 12;
|
||||||
|
|
||||||
// the following steps should have happened:
|
// the following steps should have happened:
|
||||||
// edits_inprogress_1 -> edits_1-8 (finalized)
|
// edits_inprogress_1 -> edits_1-12 (finalized)
|
||||||
// fsimage_8 created
|
// fsimage_12 created
|
||||||
// edits_inprogress_9 created
|
// edits_inprogress_13 created
|
||||||
//
|
//
|
||||||
for(URI uri : editsDirs) {
|
for(URI uri : editsDirs) {
|
||||||
File ed = new File(uri.getPath());
|
File ed = new File(uri.getPath());
|
||||||
@ -938,19 +940,21 @@ public void testSaveNamespace() throws IOException {
|
|||||||
NNStorage.getInProgressEditsFileName(1));
|
NNStorage.getInProgressEditsFileName(1));
|
||||||
assertFalse(originalEdits.exists());
|
assertFalse(originalEdits.exists());
|
||||||
File finalizedEdits = new File(curDir,
|
File finalizedEdits = new File(curDir,
|
||||||
NNStorage.getFinalizedEditsFileName(1,8));
|
NNStorage.getFinalizedEditsFileName(1, EXPECTED_TXNS_FIRST_SEG));
|
||||||
assertTrue(finalizedEdits.exists());
|
GenericTestUtils.assertExists(finalizedEdits);
|
||||||
assertTrue(finalizedEdits.length() > Integer.SIZE/Byte.SIZE);
|
assertTrue(finalizedEdits.length() > Integer.SIZE/Byte.SIZE);
|
||||||
|
|
||||||
assertTrue(new File(ed, "current/"
|
GenericTestUtils.assertExists(new File(ed, "current/"
|
||||||
+ NNStorage.getInProgressEditsFileName(9)).exists());
|
+ NNStorage.getInProgressEditsFileName(
|
||||||
|
EXPECTED_TXNS_FIRST_SEG + 1)));
|
||||||
}
|
}
|
||||||
|
|
||||||
Collection<URI> imageDirs = cluster.getNameDirs(0);
|
Collection<URI> imageDirs = cluster.getNameDirs(0);
|
||||||
for (URI uri : imageDirs) {
|
for (URI uri : imageDirs) {
|
||||||
File imageDir = new File(uri.getPath());
|
File imageDir = new File(uri.getPath());
|
||||||
File savedImage = new File(imageDir, "current/"
|
File savedImage = new File(imageDir, "current/"
|
||||||
+ NNStorage.getImageFileName(8));
|
+ NNStorage.getImageFileName(
|
||||||
|
EXPECTED_TXNS_FIRST_SEG));
|
||||||
assertTrue("Should have saved image at " + savedImage,
|
assertTrue("Should have saved image at " + savedImage,
|
||||||
savedImage.exists());
|
savedImage.exists());
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user