HDFS-11817. A faulty node can cause a lease leak and NPE on accessing data. Contributed by Kihwal Lee.
This commit is contained in:
parent
87590090c8
commit
2b5ad48762
@ -223,10 +223,17 @@ List<ReplicaUnderConstruction> getStaleReplicas(long genStamp) {
|
|||||||
* Initialize lease recovery for this block.
|
* Initialize lease recovery for this block.
|
||||||
* Find the first alive data-node starting from the previous primary and
|
* Find the first alive data-node starting from the previous primary and
|
||||||
* make it primary.
|
* make it primary.
|
||||||
|
* @param blockInfo Block to be recovered
|
||||||
|
* @param recoveryId Recovery ID (new gen stamp)
|
||||||
|
* @param startRecovery Issue recovery command to datanode if true.
|
||||||
*/
|
*/
|
||||||
public void initializeBlockRecovery(BlockInfo blockInfo, long recoveryId) {
|
public void initializeBlockRecovery(BlockInfo blockInfo, long recoveryId,
|
||||||
|
boolean startRecovery) {
|
||||||
setBlockUCState(BlockUCState.UNDER_RECOVERY);
|
setBlockUCState(BlockUCState.UNDER_RECOVERY);
|
||||||
blockRecoveryId = recoveryId;
|
blockRecoveryId = recoveryId;
|
||||||
|
if (!startRecovery) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (replicas.length == 0) {
|
if (replicas.length == 0) {
|
||||||
NameNode.blockStateChangeLog.warn("BLOCK*" +
|
NameNode.blockStateChangeLog.warn("BLOCK*" +
|
||||||
" BlockUnderConstructionFeature.initializeBlockRecovery:" +
|
" BlockUnderConstructionFeature.initializeBlockRecovery:" +
|
||||||
|
@ -642,10 +642,11 @@ public DatanodeStorageInfo[] getDatanodeStorageInfos(
|
|||||||
String format, Object... args) throws UnregisteredNodeException {
|
String format, Object... args) throws UnregisteredNodeException {
|
||||||
storageIDs = storageIDs == null ? new String[0] : storageIDs;
|
storageIDs = storageIDs == null ? new String[0] : storageIDs;
|
||||||
if (datanodeID.length != storageIDs.length) {
|
if (datanodeID.length != storageIDs.length) {
|
||||||
|
// Error for pre-2.0.0-alpha clients.
|
||||||
final String err = (storageIDs.length == 0?
|
final String err = (storageIDs.length == 0?
|
||||||
"Missing storageIDs: It is likely that the HDFS client,"
|
"Missing storageIDs: It is likely that the HDFS client,"
|
||||||
+ " who made this call, is running in an older version of Hadoop"
|
+ " who made this call, is running in an older version of Hadoop"
|
||||||
+ " which does not support storageIDs."
|
+ "(pre-2.0.0-alpha) which does not support storageIDs."
|
||||||
: "Length mismatched: storageIDs.length=" + storageIDs.length + " != "
|
: "Length mismatched: storageIDs.length=" + storageIDs.length + " != "
|
||||||
) + " datanodeID.length=" + datanodeID.length;
|
) + " datanodeID.length=" + datanodeID.length;
|
||||||
throw new HadoopIllegalArgumentException(
|
throw new HadoopIllegalArgumentException(
|
||||||
|
@ -270,7 +270,7 @@ static Block prepareFileForTruncate(FSNamesystem fsn, INodesInPath iip,
|
|||||||
}
|
}
|
||||||
if (shouldRecoverNow) {
|
if (shouldRecoverNow) {
|
||||||
truncatedBlockUC.getUnderConstructionFeature().initializeBlockRecovery(
|
truncatedBlockUC.getUnderConstructionFeature().initializeBlockRecovery(
|
||||||
truncatedBlockUC, newBlock.getGenerationStamp());
|
truncatedBlockUC, newBlock.getGenerationStamp(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
return newBlock;
|
return newBlock;
|
||||||
|
@ -3250,7 +3250,7 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
|
|||||||
} else if(truncateRecovery) {
|
} else if(truncateRecovery) {
|
||||||
recoveryBlock.setGenerationStamp(blockRecoveryId);
|
recoveryBlock.setGenerationStamp(blockRecoveryId);
|
||||||
}
|
}
|
||||||
uc.initializeBlockRecovery(lastBlock, blockRecoveryId);
|
uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
|
||||||
leaseManager.renewLease(lease);
|
leaseManager.renewLease(lease);
|
||||||
// Cannot close file right now, since the last block requires recovery.
|
// Cannot close file right now, since the last block requires recovery.
|
||||||
// This may potentially cause infinite loop in lease recovery
|
// This may potentially cause infinite loop in lease recovery
|
||||||
|
@ -475,9 +475,16 @@ synchronized boolean checkLeases() {
|
|||||||
if (!p.startsWith("/")) {
|
if (!p.startsWith("/")) {
|
||||||
throw new IOException("Invalid path in the lease " + p);
|
throw new IOException("Invalid path in the lease " + p);
|
||||||
}
|
}
|
||||||
boolean completed = fsnamesystem.internalReleaseLease(
|
boolean completed = false;
|
||||||
leaseToCheck, p, iip,
|
try {
|
||||||
HdfsServerConstants.NAMENODE_LEASE_HOLDER);
|
completed = fsnamesystem.internalReleaseLease(
|
||||||
|
leaseToCheck, p, iip,
|
||||||
|
HdfsServerConstants.NAMENODE_LEASE_HOLDER);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Cannot release the path " + p + " in the lease "
|
||||||
|
+ leaseToCheck + ". It will be retried.", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
if (completed) {
|
if (completed) {
|
||||||
LOG.debug("Lease recovery for inode " + id + " is complete. " +
|
LOG.debug("Lease recovery for inode " + id + " is complete. " +
|
||||||
@ -491,7 +498,7 @@ synchronized boolean checkLeases() {
|
|||||||
needSync = true;
|
needSync = true;
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Cannot release the path " + p + " in the lease "
|
LOG.warn("Removing lease with an invalid path: " + p + ","
|
||||||
+ leaseToCheck, e);
|
+ leaseToCheck, e);
|
||||||
removing.add(id);
|
removing.add(id);
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,7 @@ public void testInitializeBlockRecovery() throws Exception {
|
|||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -3 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -3 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -2 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -2 * 1000);
|
||||||
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 1);
|
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 1, true);
|
||||||
BlockInfo[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1);
|
BlockInfo[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1);
|
||||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||||
|
|
||||||
@ -58,7 +58,7 @@ public void testInitializeBlockRecovery() throws Exception {
|
|||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -3 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -3 * 1000);
|
||||||
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 2);
|
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 2, true);
|
||||||
blockInfoRecovery = dd1.getLeaseRecoveryCommand(1);
|
blockInfoRecovery = dd1.getLeaseRecoveryCommand(1);
|
||||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||||
|
|
||||||
@ -66,7 +66,7 @@ public void testInitializeBlockRecovery() throws Exception {
|
|||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -3 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -3 * 1000);
|
||||||
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 3);
|
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 3, true);
|
||||||
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
|
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
|
||||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||||
|
|
||||||
@ -75,7 +75,7 @@ public void testInitializeBlockRecovery() throws Exception {
|
|||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
|
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
|
||||||
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 3);
|
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo, 3, true);
|
||||||
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
|
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
|
||||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||||
}
|
}
|
||||||
|
@ -38,12 +38,16 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
|
||||||
public class TestBlockUnderConstruction {
|
public class TestBlockUnderConstruction {
|
||||||
static final String BASE_DIR = "/test/TestBlockUnderConstruction";
|
static final String BASE_DIR = "/test/TestBlockUnderConstruction";
|
||||||
static final int BLOCK_SIZE = 8192; // same as TestFileCreation.blocksize
|
static final int BLOCK_SIZE = 8192; // same as TestFileCreation.blocksize
|
||||||
@ -183,4 +187,45 @@ public void testGetBlockLocations() throws IOException {
|
|||||||
// close file
|
// close file
|
||||||
out.close();
|
out.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A storage ID can be invalid if the storage failed or the node
|
||||||
|
* reregisters. When the node heart-beats, the storage report in it
|
||||||
|
* causes storage volumes to be added back. An invalid storage ID
|
||||||
|
* should not cause an NPE.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testEmptyExpectedLocations() throws Exception {
|
||||||
|
final NamenodeProtocols namenode = cluster.getNameNodeRpc();
|
||||||
|
final FSNamesystem fsn = cluster.getNamesystem();
|
||||||
|
final BlockManager bm = fsn.getBlockManager();
|
||||||
|
final Path p = new Path(BASE_DIR, "file2.dat");
|
||||||
|
final String src = p.toString();
|
||||||
|
final FSDataOutputStream out = TestFileCreation.createFile(hdfs, p, 1);
|
||||||
|
writeFile(p, out, 256);
|
||||||
|
out.hflush();
|
||||||
|
|
||||||
|
// make sure the block is readable
|
||||||
|
LocatedBlocks lbs = namenode.getBlockLocations(src, 0, 256);
|
||||||
|
LocatedBlock lastLB = lbs.getLocatedBlocks().get(0);
|
||||||
|
final Block b = lastLB.getBlock().getLocalBlock();
|
||||||
|
|
||||||
|
// fake a block recovery
|
||||||
|
long blockRecoveryId = bm.nextGenerationStamp(false);
|
||||||
|
BlockUnderConstructionFeature uc = bm.getStoredBlock(b).
|
||||||
|
getUnderConstructionFeature();
|
||||||
|
uc.initializeBlockRecovery(null, blockRecoveryId, false);
|
||||||
|
|
||||||
|
try {
|
||||||
|
String[] storages = { "invalid-storage-id1" };
|
||||||
|
fsn.commitBlockSynchronization(lastLB.getBlock(), blockRecoveryId, 256L,
|
||||||
|
true, false, lastLB.getLocations(), storages);
|
||||||
|
} catch (java.lang.IllegalStateException ise) {
|
||||||
|
// Although a failure is expected as of now, future commit policy
|
||||||
|
// changes may make it not fail. This is not critical to the test.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invalid storage should not trigger an exception.
|
||||||
|
lbs = namenode.getBlockLocations(src, 0, 256);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -73,7 +73,7 @@ private FSNamesystem makeNameSystemSpy(Block block, INodeFile file)
|
|||||||
blockInfo.setBlockCollectionId(file.getId());
|
blockInfo.setBlockCollectionId(file.getId());
|
||||||
blockInfo.setGenerationStamp(genStamp);
|
blockInfo.setGenerationStamp(genStamp);
|
||||||
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo,
|
blockInfo.getUnderConstructionFeature().initializeBlockRecovery(blockInfo,
|
||||||
genStamp);
|
genStamp, true);
|
||||||
doReturn(blockInfo).when(file).removeLastBlock(any(Block.class));
|
doReturn(blockInfo).when(file).removeLastBlock(any(Block.class));
|
||||||
doReturn(true).when(file).isUnderConstruction();
|
doReturn(true).when(file).isUnderConstruction();
|
||||||
doReturn(new BlockInfoContiguous[1]).when(file).getBlocks();
|
doReturn(new BlockInfoContiguous[1]).when(file).getBlocks();
|
||||||
|
Loading…
Reference in New Issue
Block a user