HDFS-6289. HA failover can fail if there are pending DN messages for DNs which no longer exist. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1591413 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0689363343
commit
e5d6fba47d
@ -421,6 +421,9 @@ Release 2.5.0 - UNRELEASED
|
|||||||
HDFS-6288. DFSInputStream Pread doesn't update ReadStatistics.
|
HDFS-6288. DFSInputStream Pread doesn't update ReadStatistics.
|
||||||
(Juan Yu via wang)
|
(Juan Yu via wang)
|
||||||
|
|
||||||
|
HDFS-6289. HA failover can fail if there are pending DN messages for DNs
|
||||||
|
which no longer exist. (atm)
|
||||||
|
|
||||||
Release 2.4.1 - UNRELEASED
|
Release 2.4.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -1007,6 +1007,8 @@ void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
|
|||||||
while(it.hasNext()) {
|
while(it.hasNext()) {
|
||||||
removeStoredBlock(it.next(), node);
|
removeStoredBlock(it.next(), node);
|
||||||
}
|
}
|
||||||
|
// Remove all pending DN messages referencing this DN.
|
||||||
|
pendingDNMessages.removeAllMessagesForDatanode(node);
|
||||||
|
|
||||||
node.resetBlocks();
|
node.resetBlocks();
|
||||||
invalidateBlocks.remove(node.getDatanodeUuid());
|
invalidateBlocks.remove(node.getDatanodeUuid());
|
||||||
@ -1081,7 +1083,8 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
|
|||||||
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
|
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
|
||||||
if (node == null) {
|
if (node == null) {
|
||||||
throw new IOException("Cannot mark " + b
|
throw new IOException("Cannot mark " + b
|
||||||
+ " as corrupt because datanode " + dn + " does not exist");
|
+ " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
|
||||||
|
+ ") does not exist");
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockCollection bc = b.corrupted.getBlockCollection();
|
BlockCollection bc = b.corrupted.getBlockCollection();
|
||||||
@ -1981,6 +1984,9 @@ private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
|
|||||||
// If the block is an out-of-date generation stamp or state,
|
// If the block is an out-of-date generation stamp or state,
|
||||||
// but we're the standby, we shouldn't treat it as corrupt,
|
// but we're the standby, we shouldn't treat it as corrupt,
|
||||||
// but instead just queue it for later processing.
|
// but instead just queue it for later processing.
|
||||||
|
// TODO: Pretty confident this should be s/storedBlock/block below,
|
||||||
|
// since we should be postponing the info of the reported block, not
|
||||||
|
// the stored block. See HDFS-6289 for more context.
|
||||||
queueReportedBlock(dn, storageID, storedBlock, reportedState,
|
queueReportedBlock(dn, storageID, storedBlock, reportedState,
|
||||||
QUEUE_REASON_CORRUPT_STATE);
|
QUEUE_REASON_CORRUPT_STATE);
|
||||||
} else {
|
} else {
|
||||||
|
@ -76,6 +76,27 @@ public String toString() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove all pending DN messages which reference the given DN.
|
||||||
|
* @param dn the datanode whose messages we should remove.
|
||||||
|
*/
|
||||||
|
void removeAllMessagesForDatanode(DatanodeDescriptor dn) {
|
||||||
|
for (Map.Entry<Block, Queue<ReportedBlockInfo>> entry :
|
||||||
|
queueByBlockId.entrySet()) {
|
||||||
|
Queue<ReportedBlockInfo> newQueue = Lists.newLinkedList();
|
||||||
|
Queue<ReportedBlockInfo> oldQueue = entry.getValue();
|
||||||
|
while (!oldQueue.isEmpty()) {
|
||||||
|
ReportedBlockInfo rbi = oldQueue.remove();
|
||||||
|
if (!rbi.getNode().equals(dn)) {
|
||||||
|
newQueue.add(rbi);
|
||||||
|
} else {
|
||||||
|
count--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
queueByBlockId.put(entry.getKey(), newQueue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
|
void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
|
||||||
ReplicaState reportedState) {
|
ReplicaState reportedState) {
|
||||||
block = new Block(block);
|
block = new Block(block);
|
||||||
|
@ -50,7 +50,7 @@ static File getMetaFile(File f, long gs) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Find the corresponding meta data file from a given block file */
|
/** Find the corresponding meta data file from a given block file */
|
||||||
static File findMetaFile(final File blockFile) throws IOException {
|
public static File findMetaFile(final File blockFile) throws IOException {
|
||||||
final String prefix = blockFile.getName() + "_";
|
final String prefix = blockFile.getName() + "_";
|
||||||
final File parent = blockFile.getParentFile();
|
final File parent = blockFile.getParentFile();
|
||||||
final File[] matches = parent.listFiles(new FilenameFilter() {
|
final File[] matches = parent.listFiles(new FilenameFilter() {
|
||||||
|
@ -83,10 +83,12 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
|
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
@ -1705,6 +1707,14 @@ public static boolean corruptBlock(File blockFile) throws IOException {
|
|||||||
LOG.warn("Corrupting the block " + blockFile);
|
LOG.warn("Corrupting the block " + blockFile);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk,
|
||||||
|
long newGenStamp) throws IOException {
|
||||||
|
File blockFile = getBlockFile(dnIndex, blk);
|
||||||
|
File metaFile = FsDatasetUtil.findMetaFile(blockFile);
|
||||||
|
return metaFile.renameTo(new File(DatanodeUtil.getMetaName(
|
||||||
|
blockFile.getAbsolutePath(), newGenStamp)));
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Shutdown a particular datanode
|
* Shutdown a particular datanode
|
||||||
|
@ -0,0 +1,133 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
|
import org.apache.hadoop.util.ThreadUtil;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestPendingCorruptDnMessages {
|
||||||
|
|
||||||
|
private static final Path filePath = new Path("/foo.txt");
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChangedStorageId() throws IOException, URISyntaxException,
|
||||||
|
InterruptedException {
|
||||||
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(1)
|
||||||
|
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||||
|
OutputStream out = fs.create(filePath);
|
||||||
|
out.write("foo bar baz".getBytes());
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
|
||||||
|
cluster.getNameNode(1));
|
||||||
|
|
||||||
|
// Change the gen stamp of the block on datanode to go back in time (gen
|
||||||
|
// stamps start at 1000)
|
||||||
|
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
|
||||||
|
assertTrue(MiniDFSCluster.changeGenStampOfBlock(0, block, 900));
|
||||||
|
|
||||||
|
// Stop the DN so the replica with the changed gen stamp will be reported
|
||||||
|
// when this DN starts up.
|
||||||
|
DataNodeProperties dnProps = cluster.stopDataNode(0);
|
||||||
|
|
||||||
|
// Restart the namenode so that when the DN comes up it will see an initial
|
||||||
|
// block report.
|
||||||
|
cluster.restartNameNode(1, false);
|
||||||
|
assertTrue(cluster.restartDataNode(dnProps, true));
|
||||||
|
|
||||||
|
// Wait until the standby NN queues up the corrupt block in the pending DN
|
||||||
|
// message queue.
|
||||||
|
while (cluster.getNamesystem(1).getBlockManager()
|
||||||
|
.getPendingDataNodeMessageCount() < 1) {
|
||||||
|
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(1, cluster.getNamesystem(1).getBlockManager()
|
||||||
|
.getPendingDataNodeMessageCount());
|
||||||
|
String oldStorageId = getRegisteredDatanodeUid(cluster, 1);
|
||||||
|
|
||||||
|
// Reformat/restart the DN.
|
||||||
|
assertTrue(wipeAndRestartDn(cluster, 0));
|
||||||
|
|
||||||
|
// Give the DN time to start up and register, which will cause the
|
||||||
|
// DatanodeManager to dissociate the old storage ID from the DN xfer addr.
|
||||||
|
String newStorageId = "";
|
||||||
|
do {
|
||||||
|
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
|
||||||
|
newStorageId = getRegisteredDatanodeUid(cluster, 1);
|
||||||
|
System.out.println("====> oldStorageId: " + oldStorageId +
|
||||||
|
" newStorageId: " + newStorageId);
|
||||||
|
} while (newStorageId.equals(oldStorageId));
|
||||||
|
|
||||||
|
assertEquals(0, cluster.getNamesystem(1).getBlockManager()
|
||||||
|
.getPendingDataNodeMessageCount());
|
||||||
|
|
||||||
|
// Now try to fail over.
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getRegisteredDatanodeUid(
|
||||||
|
MiniDFSCluster cluster, int nnIndex) {
|
||||||
|
List<DatanodeDescriptor> registeredDatanodes = cluster.getNamesystem(nnIndex)
|
||||||
|
.getBlockManager().getDatanodeManager()
|
||||||
|
.getDatanodeListForReport(DatanodeReportType.ALL);
|
||||||
|
assertEquals(1, registeredDatanodes.size());
|
||||||
|
return registeredDatanodes.get(0).getDatanodeUuid();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean wipeAndRestartDn(MiniDFSCluster cluster, int dnIndex)
|
||||||
|
throws IOException {
|
||||||
|
// stop the DN, reformat it, then start it again with the same xfer port.
|
||||||
|
DataNodeProperties dnProps = cluster.stopDataNode(dnIndex);
|
||||||
|
cluster.formatDataNodeDirs();
|
||||||
|
return cluster.restartDataNode(dnProps, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user