HDFS-5356. MiniDFSCluster should close all open FileSystems when shutdown() (Contributed by Rakesh R)
This commit is contained in:
parent
e5370477c2
commit
018893e81e
@ -327,6 +327,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
|
HDFS-5356. MiniDFSCluster should close all open FileSystems when shutdown()
|
||||||
|
(Rakesh R via vinayakumarb)
|
||||||
|
|
||||||
Release 2.7.0 - UNRELEASED
|
Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -60,6 +60,7 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@ -118,6 +119,7 @@
|
|||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class creates a single-process DFS cluster for junit testing.
|
* This class creates a single-process DFS cluster for junit testing.
|
||||||
@ -523,7 +525,8 @@ public void setDnArgs(String ... args) {
|
|||||||
private boolean federation;
|
private boolean federation;
|
||||||
private boolean checkExitOnShutdown = true;
|
private boolean checkExitOnShutdown = true;
|
||||||
protected final int storagesPerDatanode;
|
protected final int storagesPerDatanode;
|
||||||
|
private Set<FileSystem> fileSystems = Sets.newHashSet();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A unique instance identifier for the cluster. This
|
* A unique instance identifier for the cluster. This
|
||||||
* is used to disambiguate HA filesystems in the case where
|
* is used to disambiguate HA filesystems in the case where
|
||||||
@ -1705,6 +1708,13 @@ public void shutdown() {
|
|||||||
* Shutdown all the nodes in the cluster.
|
* Shutdown all the nodes in the cluster.
|
||||||
*/
|
*/
|
||||||
public void shutdown(boolean deleteDfsDir) {
|
public void shutdown(boolean deleteDfsDir) {
|
||||||
|
shutdown(deleteDfsDir, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown all the nodes in the cluster.
|
||||||
|
*/
|
||||||
|
public void shutdown(boolean deleteDfsDir, boolean closeFileSystem) {
|
||||||
LOG.info("Shutting down the Mini HDFS Cluster");
|
LOG.info("Shutting down the Mini HDFS Cluster");
|
||||||
if (checkExitOnShutdown) {
|
if (checkExitOnShutdown) {
|
||||||
if (ExitUtil.terminateCalled()) {
|
if (ExitUtil.terminateCalled()) {
|
||||||
@ -1714,6 +1724,16 @@ public void shutdown(boolean deleteDfsDir) {
|
|||||||
throw new AssertionError("Test resulted in an unexpected exit");
|
throw new AssertionError("Test resulted in an unexpected exit");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (closeFileSystem) {
|
||||||
|
for (FileSystem fs : fileSystems) {
|
||||||
|
try {
|
||||||
|
fs.close();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("Exception while closing file system", ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fileSystems.clear();
|
||||||
|
}
|
||||||
shutdownDataNodes();
|
shutdownDataNodes();
|
||||||
for (NameNodeInfo nnInfo : nameNodes) {
|
for (NameNodeInfo nnInfo : nameNodes) {
|
||||||
if (nnInfo == null) continue;
|
if (nnInfo == null) continue;
|
||||||
@ -2144,8 +2164,10 @@ public DistributedFileSystem getFileSystem() throws IOException {
|
|||||||
* Get a client handle to the DFS cluster for the namenode at given index.
|
* Get a client handle to the DFS cluster for the namenode at given index.
|
||||||
*/
|
*/
|
||||||
public DistributedFileSystem getFileSystem(int nnIndex) throws IOException {
|
public DistributedFileSystem getFileSystem(int nnIndex) throws IOException {
|
||||||
return (DistributedFileSystem)FileSystem.get(getURI(nnIndex),
|
DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(
|
||||||
nameNodes[nnIndex].conf);
|
getURI(nnIndex), nameNodes[nnIndex].conf);
|
||||||
|
fileSystems.add(dfs);
|
||||||
|
return dfs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -2153,7 +2175,9 @@ public DistributedFileSystem getFileSystem(int nnIndex) throws IOException {
|
|||||||
* This simulating different threads working on different FileSystem instances.
|
* This simulating different threads working on different FileSystem instances.
|
||||||
*/
|
*/
|
||||||
public FileSystem getNewFileSystemInstance(int nnIndex) throws IOException {
|
public FileSystem getNewFileSystemInstance(int nnIndex) throws IOException {
|
||||||
return FileSystem.newInstance(getURI(nnIndex), nameNodes[nnIndex].conf);
|
FileSystem dfs = FileSystem.newInstance(getURI(nnIndex), nameNodes[nnIndex].conf);
|
||||||
|
fileSystems.add(dfs);
|
||||||
|
return dfs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -675,7 +675,7 @@ public void testFileCreationNamenodeRestart() throws IOException {
|
|||||||
|
|
||||||
// restart cluster with the same namenode port as before.
|
// restart cluster with the same namenode port as before.
|
||||||
// This ensures that leases are persisted in fsimage.
|
// This ensures that leases are persisted in fsimage.
|
||||||
cluster.shutdown();
|
cluster.shutdown(false, false);
|
||||||
try {
|
try {
|
||||||
Thread.sleep(2*MAX_IDLE_TIME);
|
Thread.sleep(2*MAX_IDLE_TIME);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
@ -687,7 +687,7 @@ public void testFileCreationNamenodeRestart() throws IOException {
|
|||||||
|
|
||||||
// restart cluster yet again. This triggers the code to read in
|
// restart cluster yet again. This triggers the code to read in
|
||||||
// persistent leases from fsimage.
|
// persistent leases from fsimage.
|
||||||
cluster.shutdown();
|
cluster.shutdown(false, false);
|
||||||
try {
|
try {
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -519,8 +519,8 @@ private void restartClusterAndCheckImage(boolean compareQuota)
|
|||||||
File fsnAfter = new File(testDir, "dumptree_after");
|
File fsnAfter = new File(testDir, "dumptree_after");
|
||||||
|
|
||||||
SnapshotTestHelper.dumpTree2File(fsdir, fsnBefore);
|
SnapshotTestHelper.dumpTree2File(fsdir, fsnBefore);
|
||||||
|
|
||||||
cluster.shutdown();
|
cluster.shutdown(false, false);
|
||||||
cluster = new MiniDFSCluster.Builder(conf).format(false)
|
cluster = new MiniDFSCluster.Builder(conf).format(false)
|
||||||
.numDataNodes(REPL).build();
|
.numDataNodes(REPL).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
Loading…
Reference in New Issue
Block a user