diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 21f621adbe..031106134c 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -260,7 +260,8 @@ long copyBytes(CopyListingFileStatus source2, long sourceOffset, boolean finished = false; try { inStream = getInputStream(source, context.getConfiguration()); - int bytesRead = readBytes(inStream, buf, sourceOffset); + seekIfRequired(inStream, sourceOffset); + int bytesRead = readBytes(inStream, buf); while (bytesRead >= 0) { if (chunkLength > 0 && (totalBytesRead + bytesRead) >= chunkLength) { @@ -276,7 +277,7 @@ long copyBytes(CopyListingFileStatus source2, long sourceOffset, if (finished) { break; } - bytesRead = readBytes(inStream, buf, sourceOffset); + bytesRead = readBytes(inStream, buf); } outStream.close(); outStream = null; @@ -299,13 +300,20 @@ private void updateContextStatus(long totalBytesRead, Mapper.Context context, context.setStatus(message.toString()); } - private static int readBytes(ThrottledInputStream inStream, byte buf[], - long position) throws IOException { + private static int readBytes(ThrottledInputStream inStream, byte buf[]) + throws IOException { try { - if (position == 0) { - return inStream.read(buf); - } else { - return inStream.read(position, buf, 0, buf.length); + return inStream.read(buf); + } catch (IOException e) { + throw new CopyReadException(e); + } + } + + private static void seekIfRequired(ThrottledInputStream inStream, + long sourceOffset) throws IOException { + try { + if (sourceOffset != inStream.getPos()) { + inStream.seek(sourceOffset); } } catch (IOException e) { throw new CopyReadException(e); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java index 2d2f10c90b..4d3676a669 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java @@ -18,7 +18,7 @@ package org.apache.hadoop.tools.util; -import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; import java.io.IOException; import java.io.InputStream; @@ -33,7 +33,7 @@ * (Thus, while the read-rate might exceed the maximum for a given short interval, * the average tends towards the specified maximum, overall.) */ -public class ThrottledInputStream extends InputStream { +public class ThrottledInputStream extends InputStream implements Seekable { private final InputStream rawStream; private final float maxBytesPerSec; @@ -95,25 +95,6 @@ public int read(byte[] b, int off, int len) throws IOException { return readLen; } - /** - * Read bytes starting from the specified position. This requires rawStream is - * an instance of {@link PositionedReadable}. - */ - public int read(long position, byte[] buffer, int offset, int length) - throws IOException { - if (!(rawStream instanceof PositionedReadable)) { - throw new UnsupportedOperationException( - "positioned read is not supported by the internal stream"); - } - throttle(); - int readLen = ((PositionedReadable) rawStream).read(position, buffer, - offset, length); - if (readLen != -1) { - bytesRead += readLen; - } - return readLen; - } - private void throttle() throws IOException { while (getBytesPerSec() > maxBytesPerSec) { try { @@ -165,4 +146,29 @@ public String toString() { ", totalSleepTime=" + totalSleepTime + '}'; } + + private void checkSeekable() throws IOException { + if (!(rawStream instanceof Seekable)) { + throw new UnsupportedOperationException( + "seek operations are unsupported by the internal stream"); + } + } + + @Override + public void seek(long pos) throws IOException { + checkSeekable(); + ((Seekable) rawStream).seek(pos); + } + + @Override + public long getPos() throws IOException { + checkSeekable(); + return ((Seekable) rawStream).getPos(); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + checkSeekable(); + return ((Seekable) rawStream).seekToNewSource(targetPos); + } } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java index fd998c82c6..da51326454 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java @@ -42,6 +42,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.tools.CopyListingFileStatus; @@ -55,6 +56,10 @@ import org.junit.BeforeClass; import org.junit.Test; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + public class TestCopyMapper { private static final Log LOG = LogFactory.getLog(TestCopyMapper.class); private static List pathList = new ArrayList(); @@ -248,7 +253,11 @@ public void testCopyWithAppend() throws Exception { // do the distcp again with -update and -append option CopyMapper copyMapper = new CopyMapper(); - StubContext stubContext = new StubContext(getConfiguration(), null, 0); + Configuration conf = getConfiguration(); + // set the buffer size to 1/10th the size of the file. + conf.setInt(DistCpOptionSwitch.COPY_BUFFER_SIZE.getConfigLabel(), + DEFAULT_FILE_SIZE/10); + StubContext stubContext = new StubContext(conf, null, 0); Mapper.Context context = stubContext.getContext(); // Enable append @@ -257,6 +266,10 @@ public void testCopyWithAppend() throws Exception { copyMapper.setup(context); int numFiles = 0; + MetricsRecordBuilder rb = + getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); + String readCounter = "ReadsFromLocalClient"; + long readsFromClient = getLongCounter(readCounter, rb); for (Path path: pathList) { if (fs.getFileStatus(path).isFile()) { numFiles++; @@ -274,6 +287,15 @@ public void testCopyWithAppend() throws Exception { .getValue()); Assert.assertEquals(numFiles, stubContext.getReporter(). getCounter(CopyMapper.Counter.COPY).getValue()); + rb = getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); + /* + * added as part of HADOOP-15292 to ensure that multiple readBlock() + * operations are not performed to read a block from a single Datanode. + * assert assumes that there is only one block per file, and that the number + * of files appended to in appendSourceData() above is captured by the + * variable numFiles. + */ + assertCounter(readCounter, readsFromClient + numFiles, rb); } private void testCopy(boolean preserveChecksum) throws Exception {