HDFS-7225. Remove stale block invalidation work when DN re-registers with different UUID. (Zhe Zhang and Andrew Wang)
This commit is contained in:
parent
79301e80d7
commit
406c09ad11
@ -453,6 +453,9 @@ Release 2.7.0 - UNRELEASED
|
||||
HDFS-7406. SimpleHttpProxyHandler puts incorrect "Connection: Close"
|
||||
header. (wheat9)
|
||||
|
||||
HDFS-7225. Remove stale block invalidation work when DN re-registers with
|
||||
different UUID. (Zhe Zhang and Andrew Wang)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -1111,6 +1111,18 @@ private void addToInvalidates(Block b) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove all block invalidation tasks under this datanode UUID;
|
||||
* used when a datanode registers with a new UUID and the old one
|
||||
* is wiped.
|
||||
*/
|
||||
void removeFromInvalidates(final DatanodeInfo datanode) {
|
||||
if (!namesystem.isPopulatingReplQueues()) {
|
||||
return;
|
||||
}
|
||||
invalidateBlocks.remove(datanode);
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark the block belonging to datanode as corrupt
|
||||
* @param blk Block to be marked as corrupt
|
||||
@ -3382,7 +3394,14 @@ private int invalidateWorkForOneNode(DatanodeInfo dn) {
|
||||
return 0;
|
||||
}
|
||||
try {
|
||||
toInvalidate = invalidateBlocks.invalidateWork(datanodeManager.getDatanode(dn));
|
||||
DatanodeDescriptor dnDescriptor = datanodeManager.getDatanode(dn);
|
||||
if (dnDescriptor == null) {
|
||||
LOG.warn("DataNode " + dn + " cannot be found with UUID " +
|
||||
dn.getDatanodeUuid() + ", removing block invalidation work.");
|
||||
invalidateBlocks.remove(dn);
|
||||
return 0;
|
||||
}
|
||||
toInvalidate = invalidateBlocks.invalidateWork(dnDescriptor);
|
||||
|
||||
if (toInvalidate == null) {
|
||||
return 0;
|
||||
|
@ -600,6 +600,8 @@ private void wipeDatanode(final DatanodeID node) {
|
||||
synchronized (datanodeMap) {
|
||||
host2DatanodeMap.remove(datanodeMap.remove(key));
|
||||
}
|
||||
// Also remove all block invalidation tasks under this node
|
||||
blockManager.removeFromInvalidates(new DatanodeInfo(node));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getClass().getSimpleName() + ".wipeDatanode("
|
||||
+ node + "): storage " + key
|
||||
|
@ -17,66 +17,161 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
/**
|
||||
* Test if FSNamesystem handles heartbeat right
|
||||
*/
|
||||
public class TestComputeInvalidateWork {
|
||||
/**
|
||||
* Test if {@link FSNamesystem#computeInvalidateWork(int)}
|
||||
* can schedule invalidate work correctly
|
||||
*/
|
||||
@Test
|
||||
public void testCompInvalidate() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
final int NUM_OF_DATANODES = 3;
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
final FSNamesystem namesystem = cluster.getNamesystem();
|
||||
final BlockManager bm = namesystem.getBlockManager();
|
||||
final int blockInvalidateLimit = bm.getDatanodeManager().blockInvalidateLimit;
|
||||
final DatanodeDescriptor[] nodes = bm.getDatanodeManager(
|
||||
).getHeartbeatManager().getDatanodes();
|
||||
assertEquals(nodes.length, NUM_OF_DATANODES);
|
||||
|
||||
namesystem.writeLock();
|
||||
try {
|
||||
for (int i=0; i<nodes.length; i++) {
|
||||
for(int j=0; j<3*blockInvalidateLimit+1; j++) {
|
||||
Block block = new Block(i*(blockInvalidateLimit+1)+j, 0,
|
||||
GenerationStamp.LAST_RESERVED_STAMP);
|
||||
bm.addToInvalidates(block, nodes[i]);
|
||||
}
|
||||
}
|
||||
private Configuration conf;
|
||||
private final int NUM_OF_DATANODES = 3;
|
||||
private MiniDFSCluster cluster;
|
||||
private FSNamesystem namesystem;
|
||||
private BlockManager bm;
|
||||
private DatanodeDescriptor[] nodes;
|
||||
|
||||
assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
|
||||
bm.computeInvalidateWork(NUM_OF_DATANODES+1));
|
||||
assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
|
||||
bm.computeInvalidateWork(NUM_OF_DATANODES));
|
||||
assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1),
|
||||
bm.computeInvalidateWork(NUM_OF_DATANODES-1));
|
||||
int workCount = bm.computeInvalidateWork(1);
|
||||
if (workCount == 1) {
|
||||
assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2));
|
||||
} else {
|
||||
assertEquals(workCount, blockInvalidateLimit);
|
||||
assertEquals(2, bm.computeInvalidateWork(2));
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
}
|
||||
} finally {
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
conf = new HdfsConfiguration();
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
namesystem = cluster.getNamesystem();
|
||||
bm = namesystem.getBlockManager();
|
||||
nodes = bm.getDatanodeManager().getHeartbeatManager().getDatanodes();
|
||||
assertEquals(nodes.length, NUM_OF_DATANODES);
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if {@link BlockManager#computeInvalidateWork(int)}
|
||||
* can schedule invalidate work correctly
|
||||
*/
|
||||
@Test(timeout=120000)
|
||||
public void testCompInvalidate() throws Exception {
|
||||
final int blockInvalidateLimit = bm.getDatanodeManager()
|
||||
.blockInvalidateLimit;
|
||||
namesystem.writeLock();
|
||||
try {
|
||||
for (int i=0; i<nodes.length; i++) {
|
||||
for(int j=0; j<3*blockInvalidateLimit+1; j++) {
|
||||
Block block = new Block(i*(blockInvalidateLimit+1)+j, 0,
|
||||
GenerationStamp.LAST_RESERVED_STAMP);
|
||||
bm.addToInvalidates(block, nodes[i]);
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
|
||||
bm.computeInvalidateWork(NUM_OF_DATANODES+1));
|
||||
assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
|
||||
bm.computeInvalidateWork(NUM_OF_DATANODES));
|
||||
assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1),
|
||||
bm.computeInvalidateWork(NUM_OF_DATANODES-1));
|
||||
int workCount = bm.computeInvalidateWork(1);
|
||||
if (workCount == 1) {
|
||||
assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2));
|
||||
} else {
|
||||
assertEquals(workCount, blockInvalidateLimit);
|
||||
assertEquals(2, bm.computeInvalidateWork(2));
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reformatted DataNodes will replace the original UUID in the
|
||||
* {@link DatanodeManager#datanodeMap}. This tests if block
|
||||
* invalidation work on the original DataNode can be skipped.
|
||||
*/
|
||||
@Test(timeout=120000)
|
||||
public void testDatanodeReformat() throws Exception {
|
||||
namesystem.writeLock();
|
||||
try {
|
||||
Block block = new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP);
|
||||
bm.addToInvalidates(block, nodes[0]);
|
||||
// Change the datanode UUID to emulate a reformation
|
||||
nodes[0].setDatanodeUuidForTesting("fortesting");
|
||||
// Since UUID has changed, the invalidation work should be skipped
|
||||
assertEquals(0, bm.computeInvalidateWork(1));
|
||||
assertEquals(0, bm.getPendingDeletionBlocksCount());
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=12000)
|
||||
public void testDatanodeReRegistration() throws Exception {
|
||||
// Create a test file
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final Path path = new Path("/testRR");
|
||||
// Create a file and shutdown the DNs, which populates InvalidateBlocks
|
||||
DFSTestUtil.createFile(dfs, path, dfs.getDefaultBlockSize(),
|
||||
(short) NUM_OF_DATANODES, 0xED0ED0);
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
dn.shutdown();
|
||||
}
|
||||
dfs.delete(path, false);
|
||||
namesystem.writeLock();
|
||||
InvalidateBlocks invalidateBlocks;
|
||||
int expected = NUM_OF_DATANODES;
|
||||
try {
|
||||
invalidateBlocks = (InvalidateBlocks) Whitebox
|
||||
.getInternalState(cluster.getNamesystem().getBlockManager(),
|
||||
"invalidateBlocks");
|
||||
assertEquals("Expected invalidate blocks to be the number of DNs",
|
||||
(long) expected, invalidateBlocks.numBlocks());
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
}
|
||||
// Re-register each DN and see that it wipes the invalidation work
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
DatanodeID did = dn.getDatanodeId();
|
||||
did.setDatanodeUuidForTesting(UUID.randomUUID().toString());
|
||||
DatanodeRegistration reg = new DatanodeRegistration(did,
|
||||
new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE),
|
||||
new ExportedBlockKeys(),
|
||||
VersionInfo.getVersion());
|
||||
namesystem.writeLock();
|
||||
try {
|
||||
bm.getDatanodeManager().registerDatanode(reg);
|
||||
expected--;
|
||||
assertEquals("Expected number of invalidate blocks to decrease",
|
||||
(long) expected, invalidateBlocks.numBlocks());
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user