MAPREDUCE-7225: Fix broken current folder expansion during MR job start. Contributed by Peter Bacsko.

This commit is contained in:
Szilard Nemeth 2019-08-01 13:01:30 +02:00
parent 89b102f916
commit a7371a779c
2 changed files with 85 additions and 4 deletions

View File

@ -59,6 +59,8 @@
class JobResourceUploader {
protected static final Logger LOG =
LoggerFactory.getLogger(JobResourceUploader.class);
private static final String ROOT_PATH = "/";
private final boolean useWildcard;
private final FileSystem jtFs;
private SharedCacheClient scClient = null;
@ -674,9 +676,30 @@ Path copyRemoteFiles(Path parentDir, Path originalPath,
if (FileUtil.compareFs(remoteFs, jtFs)) {
return originalPath;
}
boolean root = false;
if (ROOT_PATH.equals(originalPath.toUri().getPath())) {
// "/" needs special treatment
root = true;
} else {
// If originalPath ends in a "/", then remove it so
// that originalPath.getName() does not return an empty string
String uriString = originalPath.toUri().toString();
if (uriString.endsWith("/")) {
try {
URI strippedURI =
new URI(uriString.substring(0, uriString.length() - 1));
originalPath = new Path(strippedURI);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Error processing URI", e);
}
}
}
// this might have name collisions. copy will throw an exception
// parse the original path to create new path
Path newPath = new Path(parentDir, originalPath.getName());
Path newPath = root ?
parentDir : new Path(parentDir, originalPath.getName());
FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf);
jtFs.setReplication(newPath, replication);
jtFs.makeQualified(newPath);

View File

@ -18,16 +18,19 @@
package org.apache.hadoop.mapreduce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.spy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@ -46,8 +49,10 @@
import org.apache.hadoop.mapred.JobConf;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.verification.VerificationMode;
/**
* A class for unit testing JobResourceUploader.
*/
@ -375,6 +380,50 @@ public void testErasureCodingDisabled() throws IOException {
testErasureCodingSetting(false);
}
@Test
public void testOriginalPathEndsInSlash()
throws IOException, URISyntaxException {
testOriginalPathWithTrailingSlash(
new Path(new URI("file:/local/mapred/test/")),
new Path("hdfs://localhost:1234/home/hadoop/test/"));
}
@Test
public void testOriginalPathIsRoot() throws IOException, URISyntaxException {
testOriginalPathWithTrailingSlash(
new Path(new URI("file:/")),
new Path("hdfs://localhost:1234/home/hadoop/"));
}
private void testOriginalPathWithTrailingSlash(Path path,
Path expectedRemotePath) throws IOException, URISyntaxException {
Path dstPath = new Path("hdfs://localhost:1234/home/hadoop/");
DistributedFileSystem fs = mock(DistributedFileSystem.class);
// make sure that FileUtils.copy() doesn't try to copy anything
when(fs.mkdirs(any(Path.class))).thenReturn(false);
when(fs.getUri()).thenReturn(dstPath.toUri());
JobResourceUploader uploader = new StubedUploader(fs, true, true);
JobConf jConf = new JobConf();
Path originalPath = spy(path);
FileSystem localFs = mock(FileSystem.class);
FileStatus fileStatus = mock(FileStatus.class);
when(localFs.getFileStatus(any(Path.class))).thenReturn(fileStatus);
when(fileStatus.isDirectory()).thenReturn(true);
when(fileStatus.getPath()).thenReturn(originalPath);
doReturn(localFs).when(originalPath)
.getFileSystem(any(Configuration.class));
when(localFs.getUri()).thenReturn(path.toUri());
uploader.copyRemoteFiles(dstPath,
originalPath, jConf, (short) 1);
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
verify(fs).makeQualified(pathCaptor.capture());
Assert.assertEquals("Path", expectedRemotePath, pathCaptor.getValue());
}
private void testErasureCodingSetting(boolean defaultBehavior)
throws IOException {
JobConf jConf = new JobConf();
@ -387,7 +436,7 @@ private void testErasureCodingSetting(boolean defaultBehavior)
DistributedFileSystem fs = mock(DistributedFileSystem.class);
Path path = new Path("/");
when(fs.makeQualified(any(Path.class))).thenReturn(path);
JobResourceUploader uploader = new StubedUploader(fs, true);
JobResourceUploader uploader = new StubedUploader(fs, true, false);
Job job = Job.getInstance(jConf);
uploader.uploadResources(job, new Path("/test"));
@ -728,6 +777,8 @@ private String buildPathStringSub(String pathPrefix, String processedPath,
}
private class StubedUploader extends JobResourceUploader {
private boolean callOriginalCopy = false;
StubedUploader(JobConf conf) throws IOException {
this(conf, false);
}
@ -736,8 +787,10 @@ private class StubedUploader extends JobResourceUploader {
super(FileSystem.getLocal(conf), useWildcard);
}
StubedUploader(FileSystem fs, boolean useWildcard) throws IOException {
StubedUploader(FileSystem fs, boolean useWildcard,
boolean callOriginalCopy) throws IOException {
super(fs, useWildcard);
this.callOriginalCopy = callOriginalCopy;
}
@Override
@ -757,7 +810,12 @@ boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
@Override
Path copyRemoteFiles(Path parentDir, Path originalPath, Configuration conf,
short replication) throws IOException {
return new Path(destinationPathPrefix + originalPath.getName());
if (callOriginalCopy) {
return super.copyRemoteFiles(
parentDir, originalPath, conf, replication);
} else {
return new Path(destinationPathPrefix + originalPath.getName());
}
}
@Override