HADOOP-10129. Distcp may succeed when it fails (daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1548175 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Daryn Sharp 2013-12-05 15:47:55 +00:00
parent 99aed805f8
commit 9ea61e4415
6 changed files with 113 additions and 7 deletions

View File

@ -460,6 +460,8 @@ Release 2.4.0 - UNRELEASED
HADOOP-10135 writes to swift fs over partition size leave temp files and
empty output file (David Dobbins via stevel)
HADOOP-10129. Distcp may succeed when it fails (daryn)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -2355,6 +2357,8 @@ Release 0.23.10 - UNRELEASED
BUG FIXES
HADOOP-10129. Distcp may succeed when it fails (daryn)
Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES

View File

@ -95,6 +95,11 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -32,6 +32,8 @@
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import com.google.common.annotations.VisibleForTesting;
import java.io.*;
import java.util.Stack;
@ -107,12 +109,13 @@ protected void validatePaths(DistCpOptions options)
/** {@inheritDoc} */
@Override
public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException {
doBuildListing(getWriter(pathToListingFile), options);
}
SequenceFile.Writer fileListWriter = null;
@VisibleForTesting
public void doBuildListing(SequenceFile.Writer fileListWriter,
DistCpOptions options) throws IOException {
try {
fileListWriter = getWriter(pathToListingFile);
for (Path path: options.getSourcePaths()) {
FileSystem sourceFS = path.getFileSystem(getConf());
path = makeQualified(path);
@ -143,8 +146,10 @@ public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws
localFile, options);
}
}
fileListWriter.close();
fileListWriter = null;
} finally {
IOUtils.closeStream(fileListWriter);
IOUtils.cleanup(LOG, fileListWriter);
}
}

View File

@ -30,6 +30,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.annotations.VisibleForTesting;
import java.io.*;
import java.util.EnumSet;
@ -176,7 +178,8 @@ private Path getTmpFile(Path target, Mapper.Context context) {
return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
}
private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
@VisibleForTesting
long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
int bufferSize, Mapper.Context context)
throws IOException {
Path source = sourceFileStatus.getPath();
@ -193,6 +196,8 @@ private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
updateContextStatus(totalBytesRead, context, sourceFileStatus);
bytesRead = inStream.read(buf);
}
outStream.close();
outStream = null;
} finally {
IOUtils.cleanup(LOG, outStream, inStream);
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.tools;
import static org.mockito.Mockito.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@ -36,6 +38,7 @@
import org.junit.BeforeClass;
import org.junit.AfterClass;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
@ -282,4 +285,29 @@ public void testBuildListingForSingleFile() {
IOUtils.closeStream(reader);
}
}
@Test
public void testFailOnCloseError() throws IOException {
File inFile = File.createTempFile("TestCopyListingIn", null);
inFile.deleteOnExit();
File outFile = File.createTempFile("TestCopyListingOut", null);
outFile.deleteOnExit();
List<Path> srcs = new ArrayList<Path>();
srcs.add(new Path(inFile.toURI()));
Exception expectedEx = new IOException("boom");
SequenceFile.Writer writer = mock(SequenceFile.Writer.class);
doThrow(expectedEx).when(writer).close();
SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
DistCpOptions options = new DistCpOptions(srcs, new Path(outFile.toURI()));
Exception actualEx = null;
try {
listing.doBuildListing(writer, options);
} catch (Exception e) {
actualEx = e;
}
Assert.assertNotNull("close writer didn't fail", actualEx);
Assert.assertEquals(expectedEx, actualEx);
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
public class TestRetriableFileCopyCommand {
@SuppressWarnings("rawtypes")
@Test
public void testFailOnCloseError() throws Exception {
Mapper.Context context = mock(Mapper.Context.class);
doReturn(new Configuration()).when(context).getConfiguration();
Exception expectedEx = new IOException("boom");
OutputStream out = mock(OutputStream.class);
doThrow(expectedEx).when(out).close();
File f = File.createTempFile(this.getClass().getSimpleName(), null);
f.deleteOnExit();
FileStatus stat =
new FileStatus(1L, false, 1, 1024, 0, new Path(f.toURI()));
Exception actualEx = null;
try {
new RetriableFileCopyCommand("testFailOnCloseError")
.copyBytes(stat, out, 512, context);
} catch (Exception e) {
actualEx = e;
}
assertNotNull("close didn't fail", actualEx);
assertEquals(expectedEx, actualEx);
}
}