MAPREDUCE-2463. Job history files are not moved to done folder when job history location is hdfs. Contributed by Devaraj K
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1151722 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ffbe9e5972
commit
60c41c55e7
@ -359,6 +359,9 @@ Trunk (unreleased changes)
|
|||||||
MAPREDUCE-2732. Remove directly accessing FSNamesystem.LOG from
|
MAPREDUCE-2732. Remove directly accessing FSNamesystem.LOG from
|
||||||
TestCopyFiles and TestDistCh. (szetszwo)
|
TestCopyFiles and TestDistCh. (szetszwo)
|
||||||
|
|
||||||
|
MAPREDUCE-2463. Job history files are not moved to done folder when job
|
||||||
|
history location is hdfs. (Devaraj K via szetszwo)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -43,6 +43,7 @@
|
|||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
@ -354,7 +355,8 @@ private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
|
|||||||
if (logDirFs.exists(fromPath)) {
|
if (logDirFs.exists(fromPath)) {
|
||||||
LOG.info("Moving " + fromPath.toString() + " to " +
|
LOG.info("Moving " + fromPath.toString() + " to " +
|
||||||
toPath.toString());
|
toPath.toString());
|
||||||
doneDirFs.moveFromLocalFile(fromPath, toPath);
|
FileUtil.copy(logDirFs, fromPath, doneDirFs, toPath, true, false,
|
||||||
|
jobTracker.getConf());
|
||||||
doneDirFs.setPermission(toPath,
|
doneDirFs.setPermission(toPath,
|
||||||
new FsPermission(JobHistory.HISTORY_FILE_PERMISSION));
|
new FsPermission(JobHistory.HISTORY_FILE_PERMISSION));
|
||||||
}
|
}
|
||||||
|
@ -76,6 +76,11 @@ public class TestJobHistory extends TestCase {
|
|||||||
|
|
||||||
private static String TEST_ROOT_DIR = new File(System.getProperty(
|
private static String TEST_ROOT_DIR = new File(System.getProperty(
|
||||||
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
|
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
|
||||||
|
|
||||||
|
private static final String LOG_DIR = System.getProperty("hadoop.log.dir");
|
||||||
|
|
||||||
|
private static final String LOCAL_LOG_DIR_URI = new File(LOG_DIR).toURI()
|
||||||
|
.toString().replace(' ', '+') + "/history";
|
||||||
|
|
||||||
private static final String DIGITS = "[0-9]+";
|
private static final String DIGITS = "[0-9]+";
|
||||||
|
|
||||||
@ -607,15 +612,32 @@ public static void validateJobHistoryFileContent(MiniMRCluster mr,
|
|||||||
assertTrue(jobInfo.getJobQueueName().equals(conf.getQueueName()));
|
assertTrue(jobInfo.getJobQueueName().equals(conf.getQueueName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the case where the log directory is on local disk, the done folder is on HDFS,
|
||||||
|
* and the default FS is local.
|
||||||
|
*/
|
||||||
public void testDoneFolderOnHDFS() throws IOException, InterruptedException {
|
public void testDoneFolderOnHDFS() throws IOException, InterruptedException {
|
||||||
runDoneFolderTest("history_done");
|
runDoneFolderTest("history_done", LOCAL_LOG_DIR_URI);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the case where the log directory and done folder is on local disk
|
||||||
|
* and the default FS is local.
|
||||||
|
*/
|
||||||
public void testDoneFolderNotOnDefaultFileSystem() throws IOException, InterruptedException {
|
public void testDoneFolderNotOnDefaultFileSystem() throws IOException, InterruptedException {
|
||||||
runDoneFolderTest("file://" + System.getProperty("test.build.data", "tmp") + "/history_done");
|
runDoneFolderTest(TEST_ROOT_DIR + "/history_done", LOCAL_LOG_DIR_URI);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the case where the log directory is on HDFS and done folder is on local disk
|
||||||
|
* and the default FS is local.
|
||||||
|
*/
|
||||||
|
public void testHistoryFolderOnHDFS() throws IOException, InterruptedException {
|
||||||
|
String logDir = "hdfs://localhost:%d/history";
|
||||||
|
runDoneFolderTest(TEST_ROOT_DIR + "/done", logDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runDoneFolderTest(String doneFolder) throws IOException, InterruptedException {
|
private void runDoneFolderTest(String doneFolder, String historyFolder) throws IOException, InterruptedException {
|
||||||
MiniMRCluster mr = null;
|
MiniMRCluster mr = null;
|
||||||
MiniDFSCluster dfsCluster = null;
|
MiniDFSCluster dfsCluster = null;
|
||||||
try {
|
try {
|
||||||
@ -627,9 +649,11 @@ private void runDoneFolderTest(String doneFolder) throws IOException, Interrupte
|
|||||||
//set the done folder location
|
//set the done folder location
|
||||||
conf.set(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION, doneFolder);
|
conf.set(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION, doneFolder);
|
||||||
|
|
||||||
String logDir =
|
dfsCluster = new MiniDFSCluster(conf, 2, true, null);
|
||||||
"file:///" + new File(System.getProperty("hadoop.log.dir")).
|
String logDir = String.format(historyFolder, dfsCluster.getNameNodePort());
|
||||||
getAbsolutePath() + File.separator + "history";
|
|
||||||
|
//set the history folder location
|
||||||
|
conf.set(JTConfig.JT_JOBHISTORY_LOCATION, logDir);
|
||||||
|
|
||||||
Path logDirPath = new Path(logDir);
|
Path logDirPath = new Path(logDir);
|
||||||
FileSystem logDirFs = logDirPath.getFileSystem(conf);
|
FileSystem logDirFs = logDirPath.getFileSystem(conf);
|
||||||
@ -647,7 +671,6 @@ private void runDoneFolderTest(String doneFolder) throws IOException, Interrupte
|
|||||||
assertEquals("No of file in logDir not correct", 2,
|
assertEquals("No of file in logDir not correct", 2,
|
||||||
logDirFs.listStatus(logDirPath).length);
|
logDirFs.listStatus(logDirPath).length);
|
||||||
|
|
||||||
dfsCluster = new MiniDFSCluster(conf, 2, true, null);
|
|
||||||
mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
|
mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
|
||||||
3, null, null, conf);
|
3, null, null, conf);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user