HDFS-4725. Fix HDFS file handle leaks in FSEditLog, NameNode, OfflineEditsBinaryLoader and some tests. Contributed by Chris Nauroth

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1470771 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-04-23 01:18:36 +00:00
parent 009af54d51
commit fd24c6e833
9 changed files with 191 additions and 153 deletions

View File

@ -360,6 +360,9 @@ Trunk (Unreleased)
HDFS-4674. TestBPOfferService fails on Windows due to failure parsing HDFS-4674. TestBPOfferService fails on Windows due to failure parsing
datanode data directory as URI. (Chris Nauroth via suresh) datanode data directory as URI. (Chris Nauroth via suresh)
HDFS-4725. Fix HDFS file handle leaks in FSEditLog, NameNode,
OfflineEditsBinaryLoader and some tests. (Chris Nauroth via szetszwo)
BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes. HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.

View File

@ -317,21 +317,23 @@ synchronized void close() {
LOG.debug("Closing log when already closed"); LOG.debug("Closing log when already closed");
return; return;
} }
if (state == State.IN_SEGMENT) {
assert editLogStream != null;
waitForSyncToFinish();
endCurrentLogSegment(true);
}
if (journalSet != null && !journalSet.isEmpty()) {
try {
journalSet.close();
} catch (IOException ioe) {
LOG.warn("Error closing journalSet", ioe);
}
}
state = State.CLOSED; try {
if (state == State.IN_SEGMENT) {
assert editLogStream != null;
waitForSyncToFinish();
endCurrentLogSegment(true);
}
} finally {
if (journalSet != null && !journalSet.isEmpty()) {
try {
journalSet.close();
} catch (IOException ioe) {
LOG.warn("Error closing journalSet", ioe);
}
}
state = State.CLOSED;
}
} }
@ -583,6 +585,7 @@ public void logSync() {
"due to " + e.getMessage() + ". " + "due to " + e.getMessage() + ". " +
"Unsynced transactions: " + (txid - synctxid); "Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception()); LOG.fatal(msg, new Exception());
IOUtils.cleanup(LOG, journalSet);
terminate(1, msg); terminate(1, msg);
} }
} finally { } finally {
@ -606,6 +609,7 @@ public void logSync() {
"Could not sync enough journals to persistent storage. " "Could not sync enough journals to persistent storage. "
+ "Unsynced transactions: " + (txid - synctxid); + "Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception()); LOG.fatal(msg, new Exception());
IOUtils.cleanup(LOG, journalSet);
terminate(1, msg); terminate(1, msg);
} }
} }

View File

@ -651,13 +651,14 @@ public void stop() {
} }
} catch (ServiceFailedException e) { } catch (ServiceFailedException e) {
LOG.warn("Encountered exception while exiting state ", e); LOG.warn("Encountered exception while exiting state ", e);
} } finally {
stopCommonServices(); stopCommonServices();
if (metrics != null) { if (metrics != null) {
metrics.shutdown(); metrics.shutdown();
} }
if (namesystem != null) { if (namesystem != null) {
namesystem.shutdown(); namesystem.shutdown();
}
} }
} }

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.io.IOUtils;
/** /**
* OfflineEditsBinaryLoader loads edits from a binary edits file * OfflineEditsBinaryLoader loads edits from a binary edits file
@ -59,43 +60,49 @@ public OfflineEditsBinaryLoader(OfflineEditsVisitor visitor,
*/ */
@Override @Override
public void loadEdits() throws IOException { public void loadEdits() throws IOException {
visitor.start(inputStream.getVersion()); try {
while (true) { visitor.start(inputStream.getVersion());
try { while (true) {
FSEditLogOp op = inputStream.readOp(); try {
if (op == null) FSEditLogOp op = inputStream.readOp();
break; if (op == null)
if (fixTxIds) { break;
if (nextTxId <= 0) { if (fixTxIds) {
nextTxId = op.getTransactionId();
if (nextTxId <= 0) { if (nextTxId <= 0) {
nextTxId = 1; nextTxId = op.getTransactionId();
if (nextTxId <= 0) {
nextTxId = 1;
}
} }
op.setTransactionId(nextTxId);
nextTxId++;
} }
op.setTransactionId(nextTxId); visitor.visitOp(op);
nextTxId++; } catch (IOException e) {
if (!recoveryMode) {
// Tell the visitor to clean up, then re-throw the exception
LOG.error("Got IOException at position " +
inputStream.getPosition());
visitor.close(e);
throw e;
}
LOG.error("Got IOException while reading stream! Resyncing.", e);
inputStream.resync();
} catch (RuntimeException e) {
if (!recoveryMode) {
// Tell the visitor to clean up, then re-throw the exception
LOG.error("Got RuntimeException at position " +
inputStream.getPosition());
visitor.close(e);
throw e;
}
LOG.error("Got RuntimeException while reading stream! Resyncing.", e);
inputStream.resync();
} }
visitor.visitOp(op);
} catch (IOException e) {
if (!recoveryMode) {
// Tell the visitor to clean up, then re-throw the exception
LOG.error("Got IOException at position " + inputStream.getPosition());
visitor.close(e);
throw e;
}
LOG.error("Got IOException while reading stream! Resyncing.", e);
inputStream.resync();
} catch (RuntimeException e) {
if (!recoveryMode) {
// Tell the visitor to clean up, then re-throw the exception
LOG.error("Got RuntimeException at position " + inputStream.getPosition());
visitor.close(e);
throw e;
}
LOG.error("Got RuntimeException while reading stream! Resyncing.", e);
inputStream.resync();
} }
visitor.close(null);
} finally {
IOUtils.cleanup(LOG, inputStream);
} }
visitor.close(null);
} }
} }

View File

@ -48,6 +48,8 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -91,6 +93,8 @@
/** Utilities for HDFS tests */ /** Utilities for HDFS tests */
public class DFSTestUtil { public class DFSTestUtil {
private static final Log LOG = LogFactory.getLog(DFSTestUtil.class);
private static Random gen = new Random(); private static Random gen = new Random();
private static String[] dirNames = { private static String[] dirNames = {
@ -723,7 +727,11 @@ public static byte[] loadFile(String filename) throws IOException {
File file = new File(filename); File file = new File(filename);
DataInputStream in = new DataInputStream(new FileInputStream(file)); DataInputStream in = new DataInputStream(new FileInputStream(file));
byte[] content = new byte[(int)file.length()]; byte[] content = new byte[(int)file.length()];
in.readFully(content); try {
in.readFully(content);
} finally {
IOUtils.cleanup(LOG, in);
}
return content; return content;
} }

View File

@ -631,44 +631,48 @@ public void testGetFileBlockStorageLocationsBatching() throws Exception {
true); true);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build(); .numDataNodes(2).build();
DistributedFileSystem fs = cluster.getFileSystem(); try {
// Create two files DistributedFileSystem fs = cluster.getFileSystem();
Path tmpFile1 = new Path("/tmpfile1.dat"); // Create two files
Path tmpFile2 = new Path("/tmpfile2.dat"); Path tmpFile1 = new Path("/tmpfile1.dat");
DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl); Path tmpFile2 = new Path("/tmpfile2.dat");
DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl); DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl);
// Get locations of blocks of both files and concat together DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl);
BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024); // Get locations of blocks of both files and concat together
BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024); BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024);
BlockLocation[] blockLocs = (BlockLocation[]) ArrayUtils.addAll(blockLocs1, BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024);
blockLocs2); BlockLocation[] blockLocs = (BlockLocation[]) ArrayUtils.addAll(blockLocs1,
// Fetch VolumeBlockLocations in batch blockLocs2);
BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays // Fetch VolumeBlockLocations in batch
.asList(blockLocs)); BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays
int counter = 0; .asList(blockLocs));
// Print out the list of ids received for each block int counter = 0;
for (BlockStorageLocation l : locs) { // Print out the list of ids received for each block
for (int i = 0; i < l.getVolumeIds().length; i++) { for (BlockStorageLocation l : locs) {
VolumeId id = l.getVolumeIds()[i]; for (int i = 0; i < l.getVolumeIds().length; i++) {
String name = l.getNames()[i]; VolumeId id = l.getVolumeIds()[i];
if (id != null) { String name = l.getNames()[i];
System.out.println("Datanode " + name + " has block " + counter if (id != null) {
+ " on volume id " + id.toString()); System.out.println("Datanode " + name + " has block " + counter
+ " on volume id " + id.toString());
}
}
counter++;
}
assertEquals("Expected two HdfsBlockLocations for two 1-block files", 2,
locs.length);
for (BlockStorageLocation l : locs) {
assertEquals("Expected two replicas for each block", 2,
l.getVolumeIds().length);
for (int i = 0; i < l.getVolumeIds().length; i++) {
VolumeId id = l.getVolumeIds()[i];
String name = l.getNames()[i];
assertTrue("Expected block to be valid on datanode " + name,
id.isValid());
} }
} }
counter++; } finally {
} cluster.shutdown();
assertEquals("Expected two HdfsBlockLocations for two 1-block files", 2,
locs.length);
for (BlockStorageLocation l : locs) {
assertEquals("Expected two replicas for each block", 2,
l.getVolumeIds().length);
for (int i = 0; i < l.getVolumeIds().length; i++) {
VolumeId id = l.getVolumeIds()[i];
String name = l.getNames()[i];
assertTrue("Expected block to be valid on datanode " + name,
id.isValid());
}
} }
} }
@ -683,27 +687,31 @@ public void testGetFileBlockStorageLocationsError() throws Exception {
true); true);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build(); .numDataNodes(2).build();
cluster.getDataNodes(); try {
DistributedFileSystem fs = cluster.getFileSystem(); cluster.getDataNodes();
// Create a file DistributedFileSystem fs = cluster.getFileSystem();
Path tmpFile = new Path("/tmpfile1.dat"); // Create a file
DFSTestUtil.createFile(fs, tmpFile, 1024, (short) 2, 0xDEADDEADl); Path tmpFile = new Path("/tmpfile1.dat");
// Get locations of blocks of the file DFSTestUtil.createFile(fs, tmpFile, 1024, (short) 2, 0xDEADDEADl);
BlockLocation[] blockLocs = fs.getFileBlockLocations(tmpFile, 0, 1024); // Get locations of blocks of the file
// Stop a datanode to simulate a failure BlockLocation[] blockLocs = fs.getFileBlockLocations(tmpFile, 0, 1024);
cluster.stopDataNode(0); // Stop a datanode to simulate a failure
// Fetch VolumeBlockLocations cluster.stopDataNode(0);
BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays // Fetch VolumeBlockLocations
.asList(blockLocs)); BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays
.asList(blockLocs));
assertEquals("Expected one HdfsBlockLocation for one 1-block file", 1, assertEquals("Expected one HdfsBlockLocation for one 1-block file", 1,
locs.length); locs.length);
for (BlockStorageLocation l : locs) { for (BlockStorageLocation l : locs) {
assertEquals("Expected two replicas for each block", 2, assertEquals("Expected two replicas for each block", 2,
l.getVolumeIds().length); l.getVolumeIds().length);
assertTrue("Expected one valid and one invalid replica", assertTrue("Expected one valid and one invalid replica",
(l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid())); (l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid()));
}
} finally {
cluster.shutdown();
} }
} }

View File

@ -782,49 +782,53 @@ private static void checkContentSummary(final ContentSummary expected,
public void testMaxSpaceQuotas() throws Exception { public void testMaxSpaceQuotas() throws Exception {
final Configuration conf = new HdfsConfiguration(); final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
final FileSystem fs = cluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(),
fs instanceof DistributedFileSystem);
final DistributedFileSystem dfs = (DistributedFileSystem)fs;
// create test directory
final Path testFolder = new Path("/testFolder");
assertTrue(dfs.mkdirs(testFolder));
// setting namespace quota to Long.MAX_VALUE - 1 should work
dfs.setQuota(testFolder, Long.MAX_VALUE - 1, 10);
ContentSummary c = dfs.getContentSummary(testFolder);
assertTrue("Quota not set properly", c.getQuota() == Long.MAX_VALUE - 1);
// setting diskspace quota to Long.MAX_VALUE - 1 should work
dfs.setQuota(testFolder, 10, Long.MAX_VALUE - 1);
c = dfs.getContentSummary(testFolder);
assertTrue("Quota not set properly", c.getSpaceQuota() == Long.MAX_VALUE - 1);
// setting namespace quota to Long.MAX_VALUE should not work + no error
dfs.setQuota(testFolder, Long.MAX_VALUE, 10);
c = dfs.getContentSummary(testFolder);
assertTrue("Quota should not have changed", c.getQuota() == 10);
// setting diskspace quota to Long.MAX_VALUE should not work + no error
dfs.setQuota(testFolder, 10, Long.MAX_VALUE);
c = dfs.getContentSummary(testFolder);
assertTrue("Quota should not have changed", c.getSpaceQuota() == 10);
// setting namespace quota to Long.MAX_VALUE + 1 should not work + error
try { try {
dfs.setQuota(testFolder, Long.MAX_VALUE + 1, 10); final FileSystem fs = cluster.getFileSystem();
fail("Exception not thrown"); assertTrue("Not a HDFS: "+fs.getUri(),
} catch (IllegalArgumentException e) { fs instanceof DistributedFileSystem);
// Expected final DistributedFileSystem dfs = (DistributedFileSystem)fs;
}
// setting diskspace quota to Long.MAX_VALUE + 1 should not work + error // create test directory
try { final Path testFolder = new Path("/testFolder");
dfs.setQuota(testFolder, 10, Long.MAX_VALUE + 1); assertTrue(dfs.mkdirs(testFolder));
fail("Exception not thrown");
} catch (IllegalArgumentException e) { // setting namespace quota to Long.MAX_VALUE - 1 should work
// Expected dfs.setQuota(testFolder, Long.MAX_VALUE - 1, 10);
ContentSummary c = dfs.getContentSummary(testFolder);
assertTrue("Quota not set properly", c.getQuota() == Long.MAX_VALUE - 1);
// setting diskspace quota to Long.MAX_VALUE - 1 should work
dfs.setQuota(testFolder, 10, Long.MAX_VALUE - 1);
c = dfs.getContentSummary(testFolder);
assertTrue("Quota not set properly", c.getSpaceQuota() == Long.MAX_VALUE - 1);
// setting namespace quota to Long.MAX_VALUE should not work + no error
dfs.setQuota(testFolder, Long.MAX_VALUE, 10);
c = dfs.getContentSummary(testFolder);
assertTrue("Quota should not have changed", c.getQuota() == 10);
// setting diskspace quota to Long.MAX_VALUE should not work + no error
dfs.setQuota(testFolder, 10, Long.MAX_VALUE);
c = dfs.getContentSummary(testFolder);
assertTrue("Quota should not have changed", c.getSpaceQuota() == 10);
// setting namespace quota to Long.MAX_VALUE + 1 should not work + error
try {
dfs.setQuota(testFolder, Long.MAX_VALUE + 1, 10);
fail("Exception not thrown");
} catch (IllegalArgumentException e) {
// Expected
}
// setting diskspace quota to Long.MAX_VALUE + 1 should not work + error
try {
dfs.setQuota(testFolder, 10, Long.MAX_VALUE + 1);
fail("Exception not thrown");
} catch (IllegalArgumentException e) {
// Expected
}
} finally {
cluster.shutdown();
} }
} }

View File

@ -255,7 +255,6 @@ private void invalidateEditsDirAtIndex(int index,
doThrow(new IOException("fail on setReadyToFlush()")).when(spyElos) doThrow(new IOException("fail on setReadyToFlush()")).when(spyElos)
.setReadyToFlush(); .setReadyToFlush();
} }
doNothing().when(spyElos).abort();
} }
private EditLogFileOutputStream spyOnStream(JournalAndStream jas) { private EditLogFileOutputStream spyOnStream(JournalAndStream jas) {

View File

@ -530,7 +530,11 @@ public void testCorruptImageFallback() throws IOException {
.manageDataDfsDirs(false) .manageDataDfsDirs(false)
.manageNameDfsDirs(false) .manageNameDfsDirs(false)
.build(); .build();
cluster.waitActive(); try {
cluster.waitActive();
} finally {
cluster.shutdown();
}
} }
/** /**