MAPREDUCE-6032. Made MR jobs write job history files on the default FS when the current context’s FS is different. Contributed by Benjamin Zhitomirsky.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1618269 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
31e478905a
commit
84bc2fe402
@ -224,6 +224,9 @@ Release 2.6.0 - UNRELEASED
|
|||||||
|
|
||||||
MAPREDUCE-5999. Fix dead link in InputFormat javadoc (Akira AJISAKA via aw)
|
MAPREDUCE-5999. Fix dead link in InputFormat javadoc (Akira AJISAKA via aw)
|
||||||
|
|
||||||
|
MAPREDUCE-6032. Made MR jobs write job history files on the default FS when
|
||||||
|
the current context’s FS is different. (Benjamin Zhitomirsky via zjshen)
|
||||||
|
|
||||||
Release 2.5.0 - UNRELEASED
|
Release 2.5.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -73,6 +73,12 @@
|
|||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-mapreduce-client-shuffle</artifactId>
|
<artifactId>hadoop-mapreduce-client-shuffle</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-hdfs</artifactId>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
@ -28,13 +28,13 @@
|
|||||||
import java.util.TimerTask;
|
import java.util.TimerTask;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
@ -74,7 +74,9 @@ public class JobHistoryEventHandler extends AbstractService
|
|||||||
|
|
||||||
private int eventCounter;
|
private int eventCounter;
|
||||||
|
|
||||||
//TODO Does the FS object need to be different ?
|
// Those file systems may differ from the job configuration
|
||||||
|
// See org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils
|
||||||
|
// #ensurePathInDefaultFileSystem
|
||||||
private FileSystem stagingDirFS; // log Dir FileSystem
|
private FileSystem stagingDirFS; // log Dir FileSystem
|
||||||
private FileSystem doneDirFS; // done Dir FileSystem
|
private FileSystem doneDirFS; // done Dir FileSystem
|
||||||
|
|
||||||
@ -141,7 +143,7 @@ protected void serviceInit(Configuration conf) throws Exception {
|
|||||||
//Check for the existence of the history staging dir. Maybe create it.
|
//Check for the existence of the history staging dir. Maybe create it.
|
||||||
try {
|
try {
|
||||||
stagingDirPath =
|
stagingDirPath =
|
||||||
FileSystem.get(conf).makeQualified(new Path(stagingDirStr));
|
FileContext.getFileContext(conf).makeQualified(new Path(stagingDirStr));
|
||||||
stagingDirFS = FileSystem.get(stagingDirPath.toUri(), conf);
|
stagingDirFS = FileSystem.get(stagingDirPath.toUri(), conf);
|
||||||
mkdir(stagingDirFS, stagingDirPath, new FsPermission(
|
mkdir(stagingDirFS, stagingDirPath, new FsPermission(
|
||||||
JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
|
JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
|
||||||
@ -154,7 +156,7 @@ protected void serviceInit(Configuration conf) throws Exception {
|
|||||||
//Check for the existence of intermediate done dir.
|
//Check for the existence of intermediate done dir.
|
||||||
Path doneDirPath = null;
|
Path doneDirPath = null;
|
||||||
try {
|
try {
|
||||||
doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirStr));
|
doneDirPath = FileContext.getFileContext(conf).makeQualified(new Path(doneDirStr));
|
||||||
doneDirFS = FileSystem.get(doneDirPath.toUri(), conf);
|
doneDirFS = FileSystem.get(doneDirPath.toUri(), conf);
|
||||||
// This directory will be in a common location, or this may be a cluster
|
// This directory will be in a common location, or this may be a cluster
|
||||||
// meant for a single user. Creating based on the conf. Should ideally be
|
// meant for a single user. Creating based on the conf. Should ideally be
|
||||||
@ -194,7 +196,7 @@ protected void serviceInit(Configuration conf) throws Exception {
|
|||||||
//Check/create user directory under intermediate done dir.
|
//Check/create user directory under intermediate done dir.
|
||||||
try {
|
try {
|
||||||
doneDirPrefixPath =
|
doneDirPrefixPath =
|
||||||
FileSystem.get(conf).makeQualified(new Path(userDoneDirStr));
|
FileContext.getFileContext(conf).makeQualified(new Path(userDoneDirStr));
|
||||||
mkdir(doneDirFS, doneDirPrefixPath, new FsPermission(
|
mkdir(doneDirFS, doneDirPrefixPath, new FsPermission(
|
||||||
JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
|
JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -35,8 +36,13 @@
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.mapreduce.Counters;
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
@ -52,6 +58,10 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
@ -60,6 +70,26 @@ public class TestJobHistoryEventHandler {
|
|||||||
|
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
.getLog(TestJobHistoryEventHandler.class);
|
.getLog(TestJobHistoryEventHandler.class);
|
||||||
|
private static MiniDFSCluster dfsCluster = null;
|
||||||
|
private static String coreSitePath;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpClass() throws Exception {
|
||||||
|
coreSitePath = "." + File.separator + "target" + File.separator +
|
||||||
|
"test-classes" + File.separator + "core-site.xml";
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
dfsCluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void cleanUpClass() throws Exception {
|
||||||
|
dfsCluster.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanTest() throws Exception {
|
||||||
|
new File(coreSitePath).delete();
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout=50000)
|
@Test (timeout=50000)
|
||||||
public void testFirstFlushOnCompletionEvent() throws Exception {
|
public void testFirstFlushOnCompletionEvent() throws Exception {
|
||||||
@ -325,6 +355,50 @@ public void testProcessDoneFilesNotLastAMRetry() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout=50000)
|
||||||
|
public void testDefaultFsIsUsedForHistory() throws Exception {
|
||||||
|
// Create default configuration pointing to the minicluster
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
|
||||||
|
dfsCluster.getURI().toString());
|
||||||
|
FileOutputStream os = new FileOutputStream(coreSitePath);
|
||||||
|
conf.writeXml(os);
|
||||||
|
os.close();
|
||||||
|
|
||||||
|
// simulate execution under a non-default namenode
|
||||||
|
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
|
||||||
|
"file:///");
|
||||||
|
|
||||||
|
TestParams t = new TestParams();
|
||||||
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.dfsWorkDir);
|
||||||
|
|
||||||
|
JHEvenHandlerForTest realJheh =
|
||||||
|
new JHEvenHandlerForTest(t.mockAppContext, 0, false);
|
||||||
|
JHEvenHandlerForTest jheh = spy(realJheh);
|
||||||
|
jheh.init(conf);
|
||||||
|
|
||||||
|
try {
|
||||||
|
jheh.start();
|
||||||
|
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
|
||||||
|
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
|
||||||
|
|
||||||
|
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
|
||||||
|
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
|
||||||
|
new Counters(), new Counters())));
|
||||||
|
|
||||||
|
// If we got here then event handler worked but we don't know with which
|
||||||
|
// file system. Now we check that history stuff was written to minicluster
|
||||||
|
FileSystem dfsFileSystem = dfsCluster.getFileSystem();
|
||||||
|
assertTrue("Minicluster contains some history files",
|
||||||
|
dfsFileSystem.globStatus(new Path(t.dfsWorkDir + "/*")).length != 0);
|
||||||
|
FileSystem localFileSystem = LocalFileSystem.get(conf);
|
||||||
|
assertFalse("No history directory on non-default file system",
|
||||||
|
localFileSystem.exists(new Path(t.dfsWorkDir)));
|
||||||
|
} finally {
|
||||||
|
jheh.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
|
private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
|
||||||
jheh.handle(event);
|
jheh.handle(event);
|
||||||
}
|
}
|
||||||
@ -372,6 +446,7 @@ private AppContext mockAppContext(ApplicationId appId, boolean isLastAMRetry) {
|
|||||||
private class TestParams {
|
private class TestParams {
|
||||||
boolean isLastAMRetry;
|
boolean isLastAMRetry;
|
||||||
String workDir = setupTestWorkDir();
|
String workDir = setupTestWorkDir();
|
||||||
|
String dfsWorkDir = "/" + this.getClass().getCanonicalName();
|
||||||
ApplicationId appId = ApplicationId.newInstance(200, 1);
|
ApplicationId appId = ApplicationId.newInstance(200, 1);
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
ApplicationAttemptId.newInstance(appId, 1);
|
ApplicationAttemptId.newInstance(appId, 1);
|
||||||
@ -451,10 +526,16 @@ public void testSigTermedFunctionality() throws IOException {
|
|||||||
class JHEvenHandlerForTest extends JobHistoryEventHandler {
|
class JHEvenHandlerForTest extends JobHistoryEventHandler {
|
||||||
|
|
||||||
private EventWriter eventWriter;
|
private EventWriter eventWriter;
|
||||||
|
private boolean mockHistoryProcessing = true;
|
||||||
public JHEvenHandlerForTest(AppContext context, int startCount) {
|
public JHEvenHandlerForTest(AppContext context, int startCount) {
|
||||||
super(context, startCount);
|
super(context, startCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHistoryProcessing) {
|
||||||
|
super(context, startCount);
|
||||||
|
this.mockHistoryProcessing = mockHistoryProcessing;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() {
|
protected void serviceStart() {
|
||||||
}
|
}
|
||||||
@ -462,7 +543,12 @@ protected void serviceStart() {
|
|||||||
@Override
|
@Override
|
||||||
protected EventWriter createEventWriter(Path historyFilePath)
|
protected EventWriter createEventWriter(Path historyFilePath)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.eventWriter = mock(EventWriter.class);
|
if (mockHistoryProcessing) {
|
||||||
|
this.eventWriter = mock(EventWriter.class);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
this.eventWriter = super.createEventWriter(historyFilePath);
|
||||||
|
}
|
||||||
return this.eventWriter;
|
return this.eventWriter;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -475,8 +561,13 @@ public EventWriter getEventWriter() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void processDoneFiles(JobId jobId){
|
protected void processDoneFiles(JobId jobId) throws IOException {
|
||||||
// do nothing
|
if (!mockHistoryProcessing) {
|
||||||
|
super.processDoneFiles(jobId);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,20 +22,24 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.PathFilter;
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
@ -117,6 +121,7 @@ public class JobHistoryUtils {
|
|||||||
public static final String TIMESTAMP_DIR_REGEX = "\\d{4}" + "\\" + Path.SEPARATOR + "\\d{2}" + "\\" + Path.SEPARATOR + "\\d{2}";
|
public static final String TIMESTAMP_DIR_REGEX = "\\d{4}" + "\\" + Path.SEPARATOR + "\\d{2}" + "\\" + Path.SEPARATOR + "\\d{2}";
|
||||||
public static final Pattern TIMESTAMP_DIR_PATTERN = Pattern.compile(TIMESTAMP_DIR_REGEX);
|
public static final Pattern TIMESTAMP_DIR_PATTERN = Pattern.compile(TIMESTAMP_DIR_REGEX);
|
||||||
private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d";
|
private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d";
|
||||||
|
private static final Log LOG = LogFactory.getLog(JobHistoryUtils.class);
|
||||||
|
|
||||||
private static final PathFilter CONF_FILTER = new PathFilter() {
|
private static final PathFilter CONF_FILTER = new PathFilter() {
|
||||||
@Override
|
@Override
|
||||||
@ -183,7 +188,7 @@ public static PathFilter getHistoryFileFilter() {
|
|||||||
Path stagingPath = MRApps.getStagingAreaDir(conf, user);
|
Path stagingPath = MRApps.getStagingAreaDir(conf, user);
|
||||||
Path path = new Path(stagingPath, jobId);
|
Path path = new Path(stagingPath, jobId);
|
||||||
String logDir = path.toString();
|
String logDir = path.toString();
|
||||||
return logDir;
|
return ensurePathInDefaultFileSystem(logDir, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -200,7 +205,7 @@ public static String getConfiguredHistoryIntermediateDoneDirPrefix(
|
|||||||
MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
|
MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
|
||||||
+ "/history/done_intermediate";
|
+ "/history/done_intermediate";
|
||||||
}
|
}
|
||||||
return doneDirPrefix;
|
return ensurePathInDefaultFileSystem(doneDirPrefix, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -216,7 +221,69 @@ public static String getConfiguredHistoryServerDoneDirPrefix(
|
|||||||
MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
|
MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
|
||||||
+ "/history/done";
|
+ "/history/done";
|
||||||
}
|
}
|
||||||
return doneDirPrefix;
|
return ensurePathInDefaultFileSystem(doneDirPrefix, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get default file system URI for the cluster (used to ensure consistency
|
||||||
|
* of history done/staging locations) over different context
|
||||||
|
*
|
||||||
|
* @return Default file context
|
||||||
|
*/
|
||||||
|
private static FileContext getDefaultFileContext() {
|
||||||
|
// If FS_DEFAULT_NAME_KEY was set solely by core-default.xml then we ignore
|
||||||
|
// ignore it. This prevents defaulting history paths to file system specified
|
||||||
|
// by core-default.xml which would not make sense in any case. For a test
|
||||||
|
// case to exploit this functionality it should create core-site.xml
|
||||||
|
FileContext fc = null;
|
||||||
|
Configuration defaultConf = new Configuration();
|
||||||
|
String[] sources;
|
||||||
|
sources = defaultConf.getPropertySources(
|
||||||
|
CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
|
||||||
|
if (sources != null &&
|
||||||
|
(!Arrays.asList(sources).contains("core-default.xml") ||
|
||||||
|
sources.length > 1)) {
|
||||||
|
try {
|
||||||
|
fc = FileContext.getFileContext(defaultConf);
|
||||||
|
LOG.info("Default file system [" +
|
||||||
|
fc.getDefaultFileSystem().getUri() + "]");
|
||||||
|
} catch (UnsupportedFileSystemException e) {
|
||||||
|
LOG.error("Unable to create default file context [" +
|
||||||
|
defaultConf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) +
|
||||||
|
"]",
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
LOG.info("Default file system is set solely " +
|
||||||
|
"by core-default.xml therefore - ignoring");
|
||||||
|
}
|
||||||
|
|
||||||
|
return fc;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure that path belongs to cluster's default file system unless
|
||||||
|
* 1. it is already fully qualified.
|
||||||
|
* 2. current job configuration uses default file system
|
||||||
|
* 3. running from a test case without core-site.xml
|
||||||
|
*
|
||||||
|
* @param sourcePath source path
|
||||||
|
* @param conf the job configuration
|
||||||
|
* @return full qualified path (if necessary) in default file system
|
||||||
|
*/
|
||||||
|
private static String ensurePathInDefaultFileSystem(String sourcePath, Configuration conf) {
|
||||||
|
Path path = new Path(sourcePath);
|
||||||
|
FileContext fc = getDefaultFileContext();
|
||||||
|
if (fc == null ||
|
||||||
|
fc.getDefaultFileSystem().getUri().toString().equals(
|
||||||
|
conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "")) ||
|
||||||
|
path.toUri().getAuthority() != null ||
|
||||||
|
path.toUri().getScheme()!= null) {
|
||||||
|
return sourcePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
return fc.makeQualified(path).toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -19,42 +19,74 @@
|
|||||||
package org.apache.hadoop.mapreduce.v2.hs;
|
package org.apache.hadoop.mapreduce.v2.hs;
|
||||||
|
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.util.UUID;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
|
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||||
|
import org.apache.hadoop.test.CoreTestDriver;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.SystemClock;
|
import org.apache.hadoop.yarn.util.SystemClock;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
public class TestHistoryFileManager {
|
public class TestHistoryFileManager {
|
||||||
private static MiniDFSCluster dfsCluster = null;
|
private static MiniDFSCluster dfsCluster = null;
|
||||||
|
private static MiniDFSCluster dfsCluster2 = null;
|
||||||
|
private static String coreSitePath;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpClass() throws Exception {
|
public static void setUpClass() throws Exception {
|
||||||
|
coreSitePath = "." + File.separator + "target" + File.separator +
|
||||||
|
"test-classes" + File.separator + "core-site.xml";
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
Configuration conf2 = new HdfsConfiguration();
|
||||||
dfsCluster = new MiniDFSCluster.Builder(conf).build();
|
dfsCluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
conf2.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
|
||||||
|
conf.get(MiniDFSCluster.HDFS_MINIDFS_BASEDIR) + "_2");
|
||||||
|
dfsCluster2 = new MiniDFSCluster.Builder(conf2).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void cleanUpClass() throws Exception {
|
public static void cleanUpClass() throws Exception {
|
||||||
dfsCluster.shutdown();
|
dfsCluster.shutdown();
|
||||||
|
dfsCluster2.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanTest() throws Exception {
|
||||||
|
new File(coreSitePath).delete();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getDoneDirNameForTest() {
|
||||||
|
return "/" + name.getMethodName();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getIntermediateDoneDirNameForTest() {
|
||||||
|
return "/intermediate_" + name.getMethodName();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testTryCreateHistoryDirs(Configuration conf, boolean expected)
|
private void testTryCreateHistoryDirs(Configuration conf, boolean expected)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, "/" + UUID.randomUUID());
|
conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, getDoneDirNameForTest());
|
||||||
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/" + UUID.randomUUID());
|
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, getIntermediateDoneDirNameForTest());
|
||||||
HistoryFileManager hfm = new HistoryFileManager();
|
HistoryFileManager hfm = new HistoryFileManager();
|
||||||
hfm.conf = conf;
|
hfm.conf = conf;
|
||||||
Assert.assertEquals(expected, hfm.tryCreatingHistoryDirs(false));
|
Assert.assertEquals(expected, hfm.tryCreatingHistoryDirs(false));
|
||||||
@ -75,6 +107,36 @@ public void testCreateDirsWithFileSystem() throws Exception {
|
|||||||
testTryCreateHistoryDirs(dfsCluster.getConfiguration(0), true);
|
testTryCreateHistoryDirs(dfsCluster.getConfiguration(0), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateDirsWithAdditionalFileSystem() throws Exception {
|
||||||
|
dfsCluster.getFileSystem().setSafeMode(
|
||||||
|
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
|
||||||
|
dfsCluster2.getFileSystem().setSafeMode(
|
||||||
|
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
|
||||||
|
Assert.assertFalse(dfsCluster.getFileSystem().isInSafeMode());
|
||||||
|
Assert.assertFalse(dfsCluster2.getFileSystem().isInSafeMode());
|
||||||
|
|
||||||
|
// Set default configuration to the first cluster
|
||||||
|
Configuration conf = new Configuration(false);
|
||||||
|
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
|
||||||
|
dfsCluster.getURI().toString());
|
||||||
|
FileOutputStream os = new FileOutputStream(coreSitePath);
|
||||||
|
conf.writeXml(os);
|
||||||
|
os.close();
|
||||||
|
|
||||||
|
testTryCreateHistoryDirs(dfsCluster2.getConfiguration(0), true);
|
||||||
|
|
||||||
|
// Directories should be created only in the default file system (dfsCluster)
|
||||||
|
Assert.assertTrue(dfsCluster.getFileSystem()
|
||||||
|
.exists(new Path(getDoneDirNameForTest())));
|
||||||
|
Assert.assertTrue(dfsCluster.getFileSystem()
|
||||||
|
.exists(new Path(getIntermediateDoneDirNameForTest())));
|
||||||
|
Assert.assertFalse(dfsCluster2.getFileSystem()
|
||||||
|
.exists(new Path(getDoneDirNameForTest())));
|
||||||
|
Assert.assertFalse(dfsCluster2.getFileSystem()
|
||||||
|
.exists(new Path(getIntermediateDoneDirNameForTest())));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateDirsWithFileSystemInSafeMode() throws Exception {
|
public void testCreateDirsWithFileSystemInSafeMode() throws Exception {
|
||||||
dfsCluster.getFileSystem().setSafeMode(
|
dfsCluster.getFileSystem().setSafeMode(
|
||||||
|
Loading…
Reference in New Issue
Block a user