From 9ea61e44153b938309841b1499488360e9abd176 Mon Sep 17 00:00:00 2001 From: Daryn Sharp Date: Thu, 5 Dec 2013 15:47:55 +0000 Subject: [PATCH] HADOOP-10129. Distcp may succeed when it fails (daryn) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1548175 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 4 ++ hadoop-tools/hadoop-distcp/pom.xml | 5 ++ .../hadoop/tools/SimpleCopyListing.java | 17 ++++-- .../mapred/RetriableFileCopyCommand.java | 7 ++- .../apache/hadoop/tools/TestCopyListing.java | 28 +++++++++ .../mapred/TestRetriableFileCopyCommand.java | 59 +++++++++++++++++++ 6 files changed, 113 insertions(+), 7 deletions(-) create mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index a73b182635..b8576d7e75 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -460,6 +460,8 @@ Release 2.4.0 - UNRELEASED HADOOP-10135 writes to swift fs over partition size leave temp files and empty output file (David Dobbins via stevel) + HADOOP-10129. Distcp may succeed when it fails (daryn) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -2355,6 +2357,8 @@ Release 0.23.10 - UNRELEASED BUG FIXES + HADOOP-10129. Distcp may succeed when it fails (daryn) + Release 0.23.9 - 2013-07-08 INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-distcp/pom.xml b/hadoop-tools/hadoop-distcp/pom.xml index 1768a265d5..9284592ca6 100644 --- a/hadoop-tools/hadoop-distcp/pom.xml +++ b/hadoop-tools/hadoop-distcp/pom.xml @@ -95,6 +95,11 @@ test test-jar + + org.mockito + mockito-all + test + diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index 08552fbd60..c494995fb4 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -32,6 +32,8 @@ import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.security.Credentials; +import com.google.common.annotations.VisibleForTesting; + import java.io.*; import java.util.Stack; @@ -107,12 +109,13 @@ protected void validatePaths(DistCpOptions options) /** {@inheritDoc} */ @Override public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException { - - SequenceFile.Writer fileListWriter = null; - + doBuildListing(getWriter(pathToListingFile), options); + } + + @VisibleForTesting + public void doBuildListing(SequenceFile.Writer fileListWriter, + DistCpOptions options) throws IOException { try { - fileListWriter = getWriter(pathToListingFile); - for (Path path: options.getSourcePaths()) { FileSystem sourceFS = path.getFileSystem(getConf()); path = makeQualified(path); @@ -143,8 +146,10 @@ public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws localFile, options); } } + fileListWriter.close(); + fileListWriter = null; } finally { - IOUtils.closeStream(fileListWriter); + IOUtils.cleanup(LOG, fileListWriter); } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 87fb2d4511..580229cf8e 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -30,6 +30,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.google.common.annotations.VisibleForTesting; + import java.io.*; import java.util.EnumSet; @@ -176,7 +178,8 @@ private Path getTmpFile(Path target, Mapper.Context context) { return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()); } - private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream, + @VisibleForTesting + long copyBytes(FileStatus sourceFileStatus, OutputStream outStream, int bufferSize, Mapper.Context context) throws IOException { Path source = sourceFileStatus.getPath(); @@ -193,6 +196,8 @@ private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream, updateContextStatus(totalBytesRead, context, sourceFileStatus); bytesRead = inStream.read(buf); } + outStream.close(); + outStream = null; } finally { IOUtils.cleanup(LOG, outStream, inStream); } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java index fb327c32f0..11cf7821e3 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java @@ -18,6 +18,8 @@ package org.apache.hadoop.tools; +import static org.mockito.Mockito.*; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -36,6 +38,7 @@ import org.junit.BeforeClass; import org.junit.AfterClass; +import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.util.List; @@ -282,4 +285,29 @@ public void testBuildListingForSingleFile() { IOUtils.closeStream(reader); } } + + @Test + public void testFailOnCloseError() throws IOException { + File inFile = File.createTempFile("TestCopyListingIn", null); + inFile.deleteOnExit(); + File outFile = File.createTempFile("TestCopyListingOut", null); + outFile.deleteOnExit(); + List srcs = new ArrayList(); + srcs.add(new Path(inFile.toURI())); + + Exception expectedEx = new IOException("boom"); + SequenceFile.Writer writer = mock(SequenceFile.Writer.class); + doThrow(expectedEx).when(writer).close(); + + SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS); + DistCpOptions options = new DistCpOptions(srcs, new Path(outFile.toURI())); + Exception actualEx = null; + try { + listing.doBuildListing(writer, options); + } catch (Exception e) { + actualEx = e; + } + Assert.assertNotNull("close writer didn't fail", actualEx); + Assert.assertEquals(expectedEx, actualEx); + } } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java new file mode 100644 index 0000000000..c5ec513bec --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.*; +import org.junit.Test; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; + +public class TestRetriableFileCopyCommand { + @SuppressWarnings("rawtypes") + @Test + public void testFailOnCloseError() throws Exception { + Mapper.Context context = mock(Mapper.Context.class); + doReturn(new Configuration()).when(context).getConfiguration(); + + Exception expectedEx = new IOException("boom"); + OutputStream out = mock(OutputStream.class); + doThrow(expectedEx).when(out).close(); + + File f = File.createTempFile(this.getClass().getSimpleName(), null); + f.deleteOnExit(); + FileStatus stat = + new FileStatus(1L, false, 1, 1024, 0, new Path(f.toURI())); + + Exception actualEx = null; + try { + new RetriableFileCopyCommand("testFailOnCloseError") + .copyBytes(stat, out, 512, context); + } catch (Exception e) { + actualEx = e; + } + assertNotNull("close didn't fail", actualEx); + assertEquals(expectedEx, actualEx); + } +}