diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index e20f20626a..f0adc78214 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -171,4 +171,10 @@ public final class DistCpConstants { /** Filename of sorted target listing. */ public static final String TARGET_SORTED_FILE = "target_sorted.seq"; + + public static final String LENGTH_MISMATCH_ERROR_MSG = + "Mismatch in length of source:"; + + public static final String CHECKSUM_MISMATCH_ERROR_MSG = + "Checksum mismatch between "; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 546062ff8e..139bd08fd7 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -252,7 +252,7 @@ public class CopyCommitter extends FileOutputCommitter { // This is the last chunk of the splits, consolidate allChunkPaths try { concatFileChunks(conf, srcFileStatus.getPath(), targetFile, - allChunkPaths); + allChunkPaths, srcFileStatus); } catch (IOException e) { // If the concat failed because a chunk file doesn't exist, // then we assume that the CopyMapper has skipped copying this @@ -609,7 +609,8 @@ public class CopyCommitter extends FileOutputCommitter { * Concat the passed chunk files into one and rename it the targetFile. */ private void concatFileChunks(Configuration conf, Path sourceFile, - Path targetFile, LinkedList allChunkPaths) + Path targetFile, LinkedList allChunkPaths, + CopyListingFileStatus srcFileStatus) throws IOException { if (allChunkPaths.size() == 1) { return; @@ -637,8 +638,9 @@ public class CopyCommitter extends FileOutputCommitter { LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile)); } rename(dstfs, firstChunkFile, targetFile); - DistCpUtils.compareFileLengthsAndChecksums( - srcfs, sourceFile, null, dstfs, targetFile, skipCrc); + DistCpUtils.compareFileLengthsAndChecksums(srcFileStatus.getLen(), + srcfs, sourceFile, null, dstfs, + targetFile, skipCrc, srcFileStatus.getLen()); } /** diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index 336779eef2..f3c5b4ba7a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -139,7 +139,6 @@ public class CopyMapper extends Mapper public void map(Text relPath, CopyListingFileStatus sourceFileStatus, Context context) throws IOException, InterruptedException { Path sourcePath = sourceFileStatus.getPath(); - if (LOG.isDebugEnabled()) LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath); @@ -354,7 +353,7 @@ public class CopyMapper extends Mapper if (sameLength && sameBlockSize) { return skipCrc || DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null, - targetFS, target.getPath()); + targetFS, target.getPath(), source.getLen()); } else { return false; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index fa9193077a..4683cdda78 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -143,8 +143,9 @@ public class RetriableFileCopyCommand extends RetriableCommand { offset, context, fileAttributes, sourceChecksum); if (!source.isSplit()) { - DistCpUtils.compareFileLengthsAndChecksums(sourceFS, sourcePath, - sourceChecksum, targetFS, targetPath, skipCrc); + DistCpUtils.compareFileLengthsAndChecksums(source.getLen(), sourceFS, + sourcePath, sourceChecksum, targetFS, + targetPath, skipCrc, source.getLen()); } // it's not append or direct write (preferred for s3a) case, thus we first // write to a temporary file, then rename it to the target path. @@ -247,24 +248,27 @@ public class RetriableFileCopyCommand extends RetriableCommand { boolean finished = false; try { inStream = getInputStream(source, context.getConfiguration()); + long fileLength = source2.getLen(); + int numBytesToRead = (int) getNumBytesToRead(fileLength, sourceOffset, + bufferSize); seekIfRequired(inStream, sourceOffset); - int bytesRead = readBytes(inStream, buf); - while (bytesRead >= 0) { + int bytesRead = readBytes(inStream, buf, numBytesToRead); + while (bytesRead > 0) { if (chunkLength > 0 && (totalBytesRead + bytesRead) >= chunkLength) { bytesRead = (int)(chunkLength - totalBytesRead); finished = true; } totalBytesRead += bytesRead; - if (action == FileAction.APPEND) { - sourceOffset += bytesRead; - } + sourceOffset += bytesRead; outStream.write(buf, 0, bytesRead); updateContextStatus(totalBytesRead, context, source2); if (finished) { break; } - bytesRead = readBytes(inStream, buf); + numBytesToRead = (int) getNumBytesToRead(fileLength, sourceOffset, + bufferSize); + bytesRead = readBytes(inStream, buf, numBytesToRead); } outStream.close(); outStream = null; @@ -274,6 +278,15 @@ public class RetriableFileCopyCommand extends RetriableCommand { return totalBytesRead; } + @VisibleForTesting + long getNumBytesToRead(long fileLength, long position, long bufLength) { + if (position + bufLength < fileLength) { + return bufLength; + } else { + return fileLength - position; + } + } + private void updateContextStatus(long totalBytesRead, Mapper.Context context, CopyListingFileStatus source2) { StringBuilder message = new StringBuilder(DistCpUtils.getFormatter() @@ -287,10 +300,11 @@ public class RetriableFileCopyCommand extends RetriableCommand { context.setStatus(message.toString()); } - private static int readBytes(ThrottledInputStream inStream, byte buf[]) + private static int readBytes(ThrottledInputStream inStream, byte[] buf, + int numBytes) throws IOException { try { - return inStream.read(buf); + return inStream.read(buf, 0, numBytes); } catch (IOException e) { throw new CopyReadException(e); } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java index 3ba9802f88..73c49bb8f1 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.CopyListing.AclsNotSupportedException; import org.apache.hadoop.tools.CopyListing.XAttrsNotSupportedException; import org.apache.hadoop.tools.CopyListingFileStatus; @@ -565,13 +566,15 @@ public class DistCpUtils { * @throws IOException if there's an exception while retrieving checksums. */ public static boolean checksumsAreEqual(FileSystem sourceFS, Path source, - FileChecksum sourceChecksum, FileSystem targetFS, Path target) + FileChecksum sourceChecksum, + FileSystem targetFS, + Path target, long sourceLen) throws IOException { FileChecksum targetChecksum = null; try { sourceChecksum = sourceChecksum != null ? sourceChecksum - : sourceFS.getFileChecksum(source); + : sourceFS.getFileChecksum(source, sourceLen); if (sourceChecksum != null) { // iff there's a source checksum, look for one at the destination. targetChecksum = targetFS.getFileChecksum(target); @@ -595,23 +598,22 @@ public class DistCpUtils { * @param skipCrc The flag to indicate whether to skip checksums. * @throws IOException if there's a mismatch in file lengths or checksums. */ - public static void compareFileLengthsAndChecksums( - FileSystem sourceFS, Path source, FileChecksum sourceChecksum, - FileSystem targetFS, Path target, boolean skipCrc) throws IOException { - long srcLen = sourceFS.getFileStatus(source).getLen(); - long tgtLen = targetFS.getFileStatus(target).getLen(); - if (srcLen != tgtLen) { + public static void compareFileLengthsAndChecksums(long srcLen, + FileSystem sourceFS, Path source, FileChecksum sourceChecksum, + FileSystem targetFS, Path target, boolean skipCrc, + long targetLen) throws IOException { + if (srcLen != targetLen) { throw new IOException( - "Mismatch in length of source:" + source + " (" + srcLen - + ") and target:" + target + " (" + tgtLen + ")"); + DistCpConstants.LENGTH_MISMATCH_ERROR_MSG + source + " (" + srcLen + + ") and target:" + target + " (" + targetLen + ")"); } //At this point, src & dest lengths are same. if length==0, we skip checksum if ((srcLen != 0) && (!skipCrc)) { if (!checksumsAreEqual(sourceFS, source, sourceChecksum, - targetFS, target)) { + targetFS, target, srcLen)) { StringBuilder errorMessage = - new StringBuilder("Checksum mismatch between ") + new StringBuilder(DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG) .append(source).append(" and ").append(target).append("."); boolean addSkipHint = false; String srcScheme = sourceFS.getScheme(); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index f4566a6e55..11118c1f72 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -473,9 +473,12 @@ public class TestCopyCommitter { if (!skipCrc) { Assert.fail("Expected commit to fail"); } + Path sourcePath = new Path(sourceBase + srcFilename); + CopyListingFileStatus sourceCurrStatus = + new CopyListingFileStatus(fs.getFileStatus(sourcePath)); Assert.assertFalse(DistCpUtils.checksumsAreEqual( fs, new Path(sourceBase + srcFilename), null, - fs, new Path(targetBase + srcFilename))); + fs, new Path(targetBase + srcFilename), sourceCurrStatus.getLen())); } catch(IOException exception) { if (skipCrc) { LOG.error("Unexpected exception is found", exception); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java index b4a267db6c..51eebbb2a3 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java @@ -21,11 +21,16 @@ package org.apache.hadoop.tools.mapred; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.StringWriter; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +58,8 @@ import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.StubContext; import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.StringUtils; + import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -444,6 +451,55 @@ public class TestCopyMapper { } } + @Test(timeout = 40000) + public void testCopyWhileAppend() throws Exception { + deleteState(); + mkdirs(SOURCE_PATH + "/1"); + touchFile(SOURCE_PATH + "/1/3"); + CopyMapper copyMapper = new CopyMapper(); + StubContext stubContext = new StubContext(getConfiguration(), null, 0); + Mapper.Context context = + stubContext.getContext(); + copyMapper.setup(context); + final Path path = new Path(SOURCE_PATH + "/1/3"); + int manyBytes = 100000000; + appendFile(path, manyBytes); + ScheduledExecutorService scheduledExecutorService = + Executors.newSingleThreadScheduledExecutor(); + Runnable task = new Runnable() { + public void run() { + try { + int maxAppendAttempts = 20; + int appendCount = 0; + while (appendCount < maxAppendAttempts) { + appendFile(path, 1000); + Thread.sleep(200); + appendCount++; + } + } catch (IOException | InterruptedException e) { + LOG.error("Exception encountered ", e); + Assert.fail("Test failed: " + e.getMessage()); + } + } + }; + scheduledExecutorService.schedule(task, 10, TimeUnit.MILLISECONDS); + try { + copyMapper.map(new Text(DistCpUtils.getRelativePath( + new Path(SOURCE_PATH), path)), + new CopyListingFileStatus(cluster.getFileSystem().getFileStatus( + path)), context); + } catch (Exception ex) { + LOG.error("Exception encountered ", ex); + String exceptionAsString = StringUtils.stringifyException(ex); + if (exceptionAsString.contains(DistCpConstants.LENGTH_MISMATCH_ERROR_MSG) || + exceptionAsString.contains(DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG)) { + Assert.fail("Test failed: " + exceptionAsString); + } + } finally { + scheduledExecutorService.shutdown(); + } + } + @Test(timeout=40000) public void testMakeDirFailure() { try { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java index 1f8a915f5f..d29447b903 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java @@ -24,6 +24,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.mapred.CopyMapper.FileAction; + +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -57,5 +59,26 @@ public class TestRetriableFileCopyCommand { } assertNotNull("close didn't fail", actualEx); assertEquals(expectedEx, actualEx); - } + } + + @Test(timeout = 40000) + public void testGetNumBytesToRead() { + long pos = 100; + long buffLength = 1024; + long fileLength = 2058; + RetriableFileCopyCommand retriableFileCopyCommand = + new RetriableFileCopyCommand("Testing NumBytesToRead ", + FileAction.OVERWRITE); + long numBytes = retriableFileCopyCommand + .getNumBytesToRead(fileLength, pos, buffLength); + Assert.assertEquals(1024, numBytes); + pos += numBytes; + numBytes = retriableFileCopyCommand + .getNumBytesToRead(fileLength, pos, buffLength); + Assert.assertEquals(934, numBytes); + pos += numBytes; + numBytes = retriableFileCopyCommand + .getNumBytesToRead(fileLength, pos, buffLength); + Assert.assertEquals(0, numBytes); + } } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java index 5cf184052b..6ce8e3e1ea 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java @@ -33,8 +33,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.tools.ECAdmin; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.tools.CopyListingFileStatus; +import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpOptionSwitch; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.util.ToolRunner; @@ -63,6 +63,7 @@ import static org.apache.hadoop.fs.permission.FsAction.READ; import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE; import static org.apache.hadoop.fs.permission.FsAction.READ_WRITE; import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -1208,7 +1209,7 @@ public class TestDistCpUtils { } @Test - public void testCompareFileLengthsAndChecksums() throws IOException { + public void testCompareFileLengthsAndChecksums() throws Throwable { String base = "/tmp/verify-checksum/"; long srcSeed = System.currentTimeMillis(); @@ -1224,22 +1225,18 @@ public class TestDistCpUtils { Path dstWithLen0 = new Path(base + "dstLen0"); fs.create(srcWithLen0).close(); fs.create(dstWithLen0).close(); - DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithLen0, - null, fs, dstWithLen0, false); + DistCpUtils.compareFileLengthsAndChecksums(0, fs, srcWithLen0, + null, fs, dstWithLen0, false, 0); // different lengths comparison Path srcWithLen1 = new Path(base + "srcLen1"); Path dstWithLen2 = new Path(base + "dstLen2"); DFSTestUtil.createFile(fs, srcWithLen1, 1, replFactor, srcSeed); DFSTestUtil.createFile(fs, dstWithLen2, 2, replFactor, srcSeed); - try { - DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithLen1, - null, fs, dstWithLen2, false); - Assert.fail("Expected different lengths comparison to fail!"); - } catch (IOException e) { - GenericTestUtils.assertExceptionContains( - "Mismatch in length", e); - } + + intercept(IOException.class, DistCpConstants.LENGTH_MISMATCH_ERROR_MSG, + () -> DistCpUtils.compareFileLengthsAndChecksums(1, fs, + srcWithLen1, null, fs, dstWithLen2, false, 2)); // checksums matched Path srcWithChecksum1 = new Path(base + "srcChecksum1"); @@ -1248,28 +1245,24 @@ public class TestDistCpUtils { replFactor, srcSeed); DFSTestUtil.createFile(fs, dstWithChecksum1, 1024, replFactor, srcSeed); - DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1, - null, fs, dstWithChecksum1, false); - DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1, + DistCpUtils.compareFileLengthsAndChecksums(1024, fs, srcWithChecksum1, + null, fs, dstWithChecksum1, false, 1024); + DistCpUtils.compareFileLengthsAndChecksums(1024, fs, srcWithChecksum1, fs.getFileChecksum(srcWithChecksum1), fs, dstWithChecksum1, - false); + false, 1024); // checksums mismatched Path dstWithChecksum2 = new Path(base + "dstChecksum2"); DFSTestUtil.createFile(fs, dstWithChecksum2, 1024, replFactor, dstSeed); - try { - DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1, - null, fs, dstWithChecksum2, false); - Assert.fail("Expected different checksums comparison to fail!"); - } catch (IOException e) { - GenericTestUtils.assertExceptionContains( - "Checksum mismatch", e); - } + intercept(IOException.class, DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG, + () -> DistCpUtils.compareFileLengthsAndChecksums(1024, fs, + srcWithChecksum1, null, fs, dstWithChecksum2, + false, 1024)); // checksums mismatched but skipped - DistCpUtils.compareFileLengthsAndChecksums(fs, srcWithChecksum1, - null, fs, dstWithChecksum2, true); + DistCpUtils.compareFileLengthsAndChecksums(1024, fs, srcWithChecksum1, + null, fs, dstWithChecksum2, true, 1024); } private static Random rand = new Random(); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtilsWithCombineMode.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtilsWithCombineMode.java index 5d44ab0a32..306ac08e05 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtilsWithCombineMode.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtilsWithCombineMode.java @@ -109,7 +109,7 @@ public class TestDistCpUtilsWithCombineMode { DFSTestUtil.createFile(fs, dst, 256, 1024, 1024, rf, seed); // then compare - DistCpUtils.compareFileLengthsAndChecksums(fs, src, - null, fs, dst, false); + DistCpUtils.compareFileLengthsAndChecksums(1024, fs, src, + null, fs, dst, false, 1024); } }