diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index 09bceadbc5..4db1d4eb46 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.EnumSet; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -36,6 +37,7 @@ import org.apache.hadoop.tools.DistCpOptionSwitch; import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; +import org.apache.hadoop.tools.mapred.RetriableFileCopyCommand.CopyReadException; import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.util.StringUtils; @@ -251,8 +253,8 @@ private void handleFailures(IOException exception, LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " + target, exception); - if (ignoreFailures && exception.getCause() instanceof - RetriableFileCopyCommand.CopyReadException) { + if (ignoreFailures && + ExceptionUtils.indexOfType(exception, CopyReadException.class) != -1) { incrementCounter(context, Counter.FAIL, 1); incrementCounter(context, Counter.BYTESFAILED, sourceFileStatus.getLen()); context.write(null, new Text("FAIL: " + sourceFileStatus.getPath() + " - " + diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java index 4d0752fef1..866ad6e178 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java @@ -392,6 +392,8 @@ public void testMakeDirFailure() { public void testIgnoreFailures() { doTestIgnoreFailures(true); doTestIgnoreFailures(false); + doTestIgnoreFailuresDoubleWrapped(true); + doTestIgnoreFailuresDoubleWrapped(false); } @Test(timeout=40000) @@ -800,6 +802,89 @@ private void doTestIgnoreFailures(boolean ignoreFailures) { } } + /** + * This test covers the case where the CopyReadException is double-wrapped and + * the mapper should be able to ignore this nested read exception. + * @see #doTestIgnoreFailures + */ + private void doTestIgnoreFailuresDoubleWrapped(final boolean ignoreFailures) { + try { + deleteState(); + createSourceData(); + + final UserGroupInformation tmpUser = UserGroupInformation + .createRemoteUser("guest"); + + final CopyMapper copyMapper = new CopyMapper(); + + final Mapper.Context context = + tmpUser.doAs(new PrivilegedAction< + Mapper.Context>() { + @Override + public Mapper.Context + run() { + try { + StubContext stubContext = new StubContext( + getConfiguration(), null, 0); + return stubContext.getContext(); + } catch (Exception e) { + LOG.error("Exception encountered when get stub context", e); + throw new RuntimeException(e); + } + } + }); + + touchFile(SOURCE_PATH + "/src/file"); + mkdirs(TARGET_PATH); + cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"), + new FsPermission(FsAction.NONE, FsAction.NONE, FsAction.NONE)); + cluster.getFileSystem().setPermission(new Path(TARGET_PATH), + new FsPermission((short)511)); + + context.getConfiguration().setBoolean( + DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), ignoreFailures); + + final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction() { + @Override + public FileSystem run() { + try { + return FileSystem.get(configuration); + } catch (IOException e) { + LOG.error("Exception encountered when get FileSystem.", e); + throw new RuntimeException(e); + } + } + }); + + tmpUser.doAs(new PrivilegedAction() { + @Override + public Integer run() { + try { + copyMapper.setup(context); + copyMapper.map(new Text("/src/file"), + new CopyListingFileStatus(tmpFS.getFileStatus( + new Path(SOURCE_PATH + "/src/file"))), + context); + Assert.assertTrue("Should have thrown an IOException if not " + + "ignoring failures", ignoreFailures); + } catch (IOException e) { + LOG.error("Unexpected exception encountered. ", e); + Assert.assertFalse("Should not have thrown an IOException if " + + "ignoring failures", ignoreFailures); + // the IOException is not thrown again as it's expected + } catch (Exception e) { + LOG.error("Exception encountered when the mapper copies file.", e); + throw new RuntimeException(e); + } + return null; + } + }); + } catch (Exception e) { + LOG.error("Unexpected exception encountered. ", e); + Assert.fail("Test failed: " + e.getMessage()); + } + } + private static void deleteState() throws IOException { pathList.clear(); nFiles = 0;