HDFS-11711. DN should not delete the block On "Too many open files" Exception. Contributed by Brahma Reddy Battula.
This commit is contained in:
parent
b65100c14b
commit
1869e1771c
@ -302,6 +302,7 @@ class BlockSender implements java.io.Closeable {
|
|||||||
LengthInputStream metaIn = null;
|
LengthInputStream metaIn = null;
|
||||||
boolean keepMetaInOpen = false;
|
boolean keepMetaInOpen = false;
|
||||||
try {
|
try {
|
||||||
|
DataNodeFaultInjector.get().throwTooManyOpenFiles();
|
||||||
metaIn = datanode.data.getMetaDataInputStream(block);
|
metaIn = datanode.data.getMetaDataInputStream(block);
|
||||||
if (!corruptChecksumOk || metaIn != null) {
|
if (!corruptChecksumOk || metaIn != null) {
|
||||||
if (metaIn == null) {
|
if (metaIn == null) {
|
||||||
@ -331,10 +332,14 @@ class BlockSender implements java.io.Closeable {
|
|||||||
LOG.warn("Could not find metadata file for " + block);
|
LOG.warn("Could not find metadata file for " + block);
|
||||||
}
|
}
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
// The replica is on its volume map but not on disk
|
if ((e.getMessage() != null) && !(e.getMessage()
|
||||||
datanode.notifyNamenodeDeletedBlock(block, replica.getStorageUuid());
|
.contains("Too many open files"))) {
|
||||||
datanode.data.invalidate(block.getBlockPoolId(),
|
// The replica is on its volume map but not on disk
|
||||||
new Block[]{block.getLocalBlock()});
|
datanode
|
||||||
|
.notifyNamenodeDeletedBlock(block, replica.getStorageUuid());
|
||||||
|
datanode.data.invalidate(block.getBlockPoolId(),
|
||||||
|
new Block[] {block.getLocalBlock()});
|
||||||
|
}
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
if (!keepMetaInOpen) {
|
if (!keepMetaInOpen) {
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -85,4 +86,7 @@ public void failPipeline(ReplicaInPipeline replicaInfo,
|
|||||||
public void startOfferService() throws Exception {}
|
public void startOfferService() throws Exception {}
|
||||||
|
|
||||||
public void endOfferService() throws Exception {}
|
public void endOfferService() throws Exception {}
|
||||||
|
|
||||||
|
public void throwTooManyOpenFiles() throws FileNotFoundException {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,8 +24,10 @@
|
|||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -45,8 +47,10 @@
|
|||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
@ -365,4 +369,62 @@ public void testDatanodeActiveXceiversCount() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDNShouldNotDeleteBlockONTooManyOpenFiles()
|
||||||
|
throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
||||||
|
conf.setLong(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 1);
|
||||||
|
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
|
||||||
|
MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
final DataNodeFaultInjector injector =
|
||||||
|
Mockito.mock(DataNodeFaultInjector.class);
|
||||||
|
try {
|
||||||
|
// wait until the cluster is up
|
||||||
|
cluster.waitActive();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
Path p = new Path("/testShouldThrowTMP");
|
||||||
|
DFSTestUtil.writeFile(fs, p, new String("testdata"));
|
||||||
|
//Before DN throws too many open files
|
||||||
|
verifyBlockLocations(fs, p, 1);
|
||||||
|
Mockito.doThrow(new FileNotFoundException("Too many open files")).
|
||||||
|
when(injector).
|
||||||
|
throwTooManyOpenFiles();
|
||||||
|
DataNodeFaultInjector.set(injector);
|
||||||
|
ExtendedBlock b =
|
||||||
|
fs.getClient().getLocatedBlocks(p.toString(), 0).get(0).getBlock();
|
||||||
|
try {
|
||||||
|
new BlockSender(b, 0, -1, false, true, true,
|
||||||
|
cluster.getDataNodes().get(0), null,
|
||||||
|
CachingStrategy.newDefaultStrategy());
|
||||||
|
fail("Must throw FileNotFoundException");
|
||||||
|
} catch (FileNotFoundException fe) {
|
||||||
|
assertTrue("Should throw too many open files",
|
||||||
|
fe.getMessage().contains("Too many open files"));
|
||||||
|
}
|
||||||
|
cluster.triggerHeartbeats(); // IBR delete ack
|
||||||
|
//After DN throws too many open files
|
||||||
|
assertTrue(cluster.getDataNodes().get(0).getFSDataset().isValidBlock(b));
|
||||||
|
verifyBlockLocations(fs, p, 1);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
DataNodeFaultInjector.set(oldInjector);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyBlockLocations(DistributedFileSystem fs, Path p,
|
||||||
|
int expected) throws IOException, TimeoutException, InterruptedException {
|
||||||
|
final LocatedBlock lb =
|
||||||
|
fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return lb.getLocations().length == expected;
|
||||||
|
}
|
||||||
|
}, 1000, 6000);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user