HDFS-2118. Couple dfs data dir improvements. Contributed by Eli Collins
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1141713 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5147e283ad
commit
3af51887b4
@ -552,6 +552,8 @@ Trunk (unreleased changes)
|
|||||||
|
|
||||||
HDFS-2056. Update fetchdt usage. (Tanping Wang via jitendra)
|
HDFS-2056. Update fetchdt usage. (Tanping Wang via jitendra)
|
||||||
|
|
||||||
|
HDFS-2118. Couple dfs data dir improvements. (eli)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HDFS-1955. FSImage.doUpgrade() was made too fault-tolerant by HDFS-1826.
|
HDFS-1955. FSImage.doUpgrade() was made too fault-tolerant by HDFS-1826.
|
||||||
|
@ -2186,20 +2186,21 @@ public class DataNode extends Configured
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// drop any (illegal) authority in the URI for backwards compatibility
|
// drop any (illegal) authority in the URI for backwards compatibility
|
||||||
File data = new File(dirURI.getPath());
|
File dir = new File(dirURI.getPath());
|
||||||
try {
|
try {
|
||||||
DiskChecker.checkDir(localFS, new Path(data.toURI()), permission);
|
DiskChecker.checkDir(localFS, new Path(dir.toURI()), permission);
|
||||||
dirs.add(data);
|
dirs.add(dir);
|
||||||
} catch (IOException e) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Invalid directory in: "
|
LOG.warn("Invalid " + DFS_DATANODE_DATA_DIR_KEY + " "
|
||||||
+ DFS_DATANODE_DATA_DIR_KEY + ": ", e);
|
+ dir + " : ", ioe);
|
||||||
invalidDirs.append("\"").append(data.getCanonicalPath()).append("\" ");
|
invalidDirs.append("\"").append(dir.getCanonicalPath()).append("\" ");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (dirs.size() == 0)
|
if (dirs.size() == 0) {
|
||||||
throw new IOException("All directories in "
|
throw new IOException("All directories in "
|
||||||
+ DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
|
+ DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
|
||||||
+ invalidDirs);
|
+ invalidDirs);
|
||||||
|
}
|
||||||
return dirs;
|
return dirs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,7 +125,7 @@ class DataXceiver extends Receiver implements Runnable, FSConstants {
|
|||||||
DataNode getDataNode() {return datanode;}
|
DataNode getDataNode() {return datanode;}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read/write data from/to the DataXceiveServer.
|
* Read/write data from/to the DataXceiverServer.
|
||||||
*/
|
*/
|
||||||
public void run() {
|
public void run() {
|
||||||
updateCurrentThreadName("Waiting for operation");
|
updateCurrentThreadName("Waiting for operation");
|
||||||
|
@ -146,10 +146,10 @@ class DataXceiverServer implements Runnable, FSConstants {
|
|||||||
} catch (SocketTimeoutException ignored) {
|
} catch (SocketTimeoutException ignored) {
|
||||||
// wake up to see if should continue to run
|
// wake up to see if should continue to run
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
LOG.warn(datanode.getMachineName() + ":DataXceiveServer: ", ie);
|
LOG.warn(datanode.getMachineName() + ":DataXceiverServer: ", ie);
|
||||||
} catch (Throwable te) {
|
} catch (Throwable te) {
|
||||||
LOG.error(datanode.getMachineName()
|
LOG.error(datanode.getMachineName()
|
||||||
+ ":DataXceiveServer: Exiting due to: ", te);
|
+ ":DataXceiverServer: Exiting due to: ", te);
|
||||||
datanode.shouldRun = false;
|
datanode.shouldRun = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -157,7 +157,7 @@ class DataXceiverServer implements Runnable, FSConstants {
|
|||||||
ss.close();
|
ss.close();
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
LOG.warn(datanode.getMachineName()
|
LOG.warn(datanode.getMachineName()
|
||||||
+ ":DataXceiveServer: Close exception due to: ", ie);
|
+ ":DataXceiverServer: Close exception due to: ", ie);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -167,7 +167,7 @@ class DataXceiverServer implements Runnable, FSConstants {
|
|||||||
try {
|
try {
|
||||||
this.ss.close();
|
this.ss.close();
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
LOG.warn(datanode.getMachineName() + ":DataXceiveServer.kill(): "
|
LOG.warn(datanode.getMachineName() + ":DataXceiverServer.kill(): "
|
||||||
+ StringUtils.stringifyException(ie));
|
+ StringUtils.stringifyException(ie));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1150,7 +1150,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|||||||
conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
|
conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
|
||||||
|
|
||||||
String[] dataDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
|
String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
|
||||||
|
|
||||||
int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
|
int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
|
||||||
|
|
||||||
|
@ -133,17 +133,17 @@ public class TestDataNodeVolumeFailureToleration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restart the cluster with a new volume tolerated value.
|
* Restart the datanodes with a new volume tolerated value.
|
||||||
* @param volTolerated
|
* @param volTolerated number of dfs data dir failures to tolerate
|
||||||
* @param manageCluster
|
* @param manageDfsDirs whether the mini cluster should manage data dirs
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void restartCluster(int volTolerated, boolean manageCluster)
|
private void restartDatanodes(int volTolerated, boolean manageDfsDirs)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
//Make sure no datanode is running
|
//Make sure no datanode is running
|
||||||
cluster.shutdownDataNodes();
|
cluster.shutdownDataNodes();
|
||||||
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, volTolerated);
|
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, volTolerated);
|
||||||
cluster.startDataNodes(conf, 1, manageCluster, null, null);
|
cluster.startDataNodes(conf, 1, manageDfsDirs, null, null);
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -174,19 +174,14 @@ public class TestDataNodeVolumeFailureToleration {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests for a given volumes to be tolerated and volumes failed.
|
* Tests for a given volumes to be tolerated and volumes failed.
|
||||||
*
|
|
||||||
* @param volumesTolerated
|
|
||||||
* @param volumesFailed
|
|
||||||
* @param expectedBPServiceState
|
|
||||||
* @param clusterManaged
|
|
||||||
* @throws IOException
|
|
||||||
* @throws InterruptedException
|
|
||||||
*/
|
*/
|
||||||
private void testVolumeConfig(int volumesTolerated, int volumesFailed,
|
private void testVolumeConfig(int volumesTolerated, int volumesFailed,
|
||||||
boolean expectedBPServiceState, boolean clusterManaged)
|
boolean expectedBPServiceState, boolean manageDfsDirs)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
|
assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
|
||||||
final int dnIndex = 0;
|
final int dnIndex = 0;
|
||||||
|
// Fail the current directory since invalid storage directory perms
|
||||||
|
// get fixed up automatically on datanode startup.
|
||||||
File[] dirs = {
|
File[] dirs = {
|
||||||
new File(MiniDFSCluster.getStorageDir(dnIndex, 0), "current"),
|
new File(MiniDFSCluster.getStorageDir(dnIndex, 0), "current"),
|
||||||
new File(MiniDFSCluster.getStorageDir(dnIndex, 1), "current") };
|
new File(MiniDFSCluster.getStorageDir(dnIndex, 1), "current") };
|
||||||
@ -195,11 +190,10 @@ public class TestDataNodeVolumeFailureToleration {
|
|||||||
for (int i = 0; i < volumesFailed; i++) {
|
for (int i = 0; i < volumesFailed; i++) {
|
||||||
prepareDirToFail(dirs[i]);
|
prepareDirToFail(dirs[i]);
|
||||||
}
|
}
|
||||||
restartCluster(volumesTolerated, clusterManaged);
|
restartDatanodes(volumesTolerated, manageDfsDirs);
|
||||||
assertEquals(expectedBPServiceState, cluster.getDataNodes().get(0)
|
assertEquals(expectedBPServiceState, cluster.getDataNodes().get(0)
|
||||||
.isBPServiceAlive(cluster.getNamesystem().getBlockPoolId()));
|
.isBPServiceAlive(cluster.getNamesystem().getBlockPoolId()));
|
||||||
} finally {
|
} finally {
|
||||||
// restore its old permission
|
|
||||||
for (File dir : dirs) {
|
for (File dir : dirs) {
|
||||||
FileUtil.chmod(dir.toString(), "755");
|
FileUtil.chmod(dir.toString(), "755");
|
||||||
}
|
}
|
||||||
@ -215,8 +209,7 @@ public class TestDataNodeVolumeFailureToleration {
|
|||||||
private void prepareDirToFail(File dir) throws IOException,
|
private void prepareDirToFail(File dir) throws IOException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
dir.mkdirs();
|
dir.mkdirs();
|
||||||
assertTrue("Couldn't chmod local vol", FileUtil
|
assertEquals("Couldn't chmod local vol", 0,
|
||||||
.chmod(dir.toString(), "000") == 0);
|
FileUtil.chmod(dir.toString(), "000"));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user