From a7371a779c591893700df1df279330589474960c Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Thu, 1 Aug 2019 13:01:30 +0200 Subject: [PATCH] MAPREDUCE-7225: Fix broken current folder expansion during MR job start. Contributed by Peter Bacsko. --- .../hadoop/mapreduce/JobResourceUploader.java | 25 +++++++- .../mapreduce/TestJobResourceUploader.java | 64 ++++++++++++++++++- 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java index e106a5465e..c8686d7162 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java @@ -59,6 +59,8 @@ class JobResourceUploader { protected static final Logger LOG = LoggerFactory.getLogger(JobResourceUploader.class); + private static final String ROOT_PATH = "/"; + private final boolean useWildcard; private final FileSystem jtFs; private SharedCacheClient scClient = null; @@ -674,9 +676,30 @@ Path copyRemoteFiles(Path parentDir, Path originalPath, if (FileUtil.compareFs(remoteFs, jtFs)) { return originalPath; } + + boolean root = false; + if (ROOT_PATH.equals(originalPath.toUri().getPath())) { + // "/" needs special treatment + root = true; + } else { + // If originalPath ends in a "/", then remove it so + // that originalPath.getName() does not return an empty string + String uriString = originalPath.toUri().toString(); + if (uriString.endsWith("/")) { + try { + URI strippedURI = + new URI(uriString.substring(0, uriString.length() - 1)); + originalPath = new Path(strippedURI); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Error processing URI", e); + } + } + } + // this might have name collisions. copy will throw an exception // parse the original path to create new path - Path newPath = new Path(parentDir, originalPath.getName()); + Path newPath = root ? + parentDir : new Path(parentDir, originalPath.getName()); FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf); jtFs.setReplication(newPath, replication); jtFs.makeQualified(newPath); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java index 80338970d2..bbfe2fb03c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java @@ -18,16 +18,19 @@ package org.apache.hadoop.mapreduce; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.spy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -46,8 +49,10 @@ import org.apache.hadoop.mapred.JobConf; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.verification.VerificationMode; + /** * A class for unit testing JobResourceUploader. */ @@ -375,6 +380,50 @@ public void testErasureCodingDisabled() throws IOException { testErasureCodingSetting(false); } + @Test + public void testOriginalPathEndsInSlash() + throws IOException, URISyntaxException { + testOriginalPathWithTrailingSlash( + new Path(new URI("file:/local/mapred/test/")), + new Path("hdfs://localhost:1234/home/hadoop/test/")); + } + + @Test + public void testOriginalPathIsRoot() throws IOException, URISyntaxException { + testOriginalPathWithTrailingSlash( + new Path(new URI("file:/")), + new Path("hdfs://localhost:1234/home/hadoop/")); + } + + private void testOriginalPathWithTrailingSlash(Path path, + Path expectedRemotePath) throws IOException, URISyntaxException { + Path dstPath = new Path("hdfs://localhost:1234/home/hadoop/"); + DistributedFileSystem fs = mock(DistributedFileSystem.class); + // make sure that FileUtils.copy() doesn't try to copy anything + when(fs.mkdirs(any(Path.class))).thenReturn(false); + when(fs.getUri()).thenReturn(dstPath.toUri()); + + JobResourceUploader uploader = new StubedUploader(fs, true, true); + JobConf jConf = new JobConf(); + Path originalPath = spy(path); + FileSystem localFs = mock(FileSystem.class); + FileStatus fileStatus = mock(FileStatus.class); + when(localFs.getFileStatus(any(Path.class))).thenReturn(fileStatus); + when(fileStatus.isDirectory()).thenReturn(true); + when(fileStatus.getPath()).thenReturn(originalPath); + + doReturn(localFs).when(originalPath) + .getFileSystem(any(Configuration.class)); + when(localFs.getUri()).thenReturn(path.toUri()); + + uploader.copyRemoteFiles(dstPath, + originalPath, jConf, (short) 1); + + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + verify(fs).makeQualified(pathCaptor.capture()); + Assert.assertEquals("Path", expectedRemotePath, pathCaptor.getValue()); + } + private void testErasureCodingSetting(boolean defaultBehavior) throws IOException { JobConf jConf = new JobConf(); @@ -387,7 +436,7 @@ private void testErasureCodingSetting(boolean defaultBehavior) DistributedFileSystem fs = mock(DistributedFileSystem.class); Path path = new Path("/"); when(fs.makeQualified(any(Path.class))).thenReturn(path); - JobResourceUploader uploader = new StubedUploader(fs, true); + JobResourceUploader uploader = new StubedUploader(fs, true, false); Job job = Job.getInstance(jConf); uploader.uploadResources(job, new Path("/test")); @@ -728,6 +777,8 @@ private String buildPathStringSub(String pathPrefix, String processedPath, } private class StubedUploader extends JobResourceUploader { + private boolean callOriginalCopy = false; + StubedUploader(JobConf conf) throws IOException { this(conf, false); } @@ -736,8 +787,10 @@ private class StubedUploader extends JobResourceUploader { super(FileSystem.getLocal(conf), useWildcard); } - StubedUploader(FileSystem fs, boolean useWildcard) throws IOException { + StubedUploader(FileSystem fs, boolean useWildcard, + boolean callOriginalCopy) throws IOException { super(fs, useWildcard); + this.callOriginalCopy = callOriginalCopy; } @Override @@ -757,7 +810,12 @@ boolean mkdirs(FileSystem fs, Path dir, FsPermission permission) @Override Path copyRemoteFiles(Path parentDir, Path originalPath, Configuration conf, short replication) throws IOException { - return new Path(destinationPathPrefix + originalPath.getName()); + if (callOriginalCopy) { + return super.copyRemoteFiles( + parentDir, originalPath, conf, replication); + } else { + return new Path(destinationPathPrefix + originalPath.getName()); + } } @Override