diff --git a/hadoop-tools/hadoop-distcp/pom.xml b/hadoop-tools/hadoop-distcp/pom.xml
index d0486a6794..fd592d2395 100644
--- a/hadoop-tools/hadoop-distcp/pom.xml
+++ b/hadoop-tools/hadoop-distcp/pom.xml
@@ -114,6 +114,12 @@
assertj-core
test
+
+ org.hamcrest
+ hamcrest-library
+ 1.3
+ test
+
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
index 4d3676a669..66cac955da 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
@@ -120,12 +120,11 @@ public long getTotalBytesRead() {
* @return Read rate, in bytes/sec.
*/
public long getBytesPerSec() {
- long elapsed = (System.currentTimeMillis() - startTime) / 1000;
- if (elapsed == 0) {
- return bytesRead;
- } else {
- return bytesRead / elapsed;
+ if (bytesRead == 0){
+ return 0;
}
+ float elapsed = (System.currentTimeMillis() - startTime) / 1000.0f;
+ return (long) (bytesRead / elapsed);
}
/**
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java
index 6a57217719..0b5ebf6c69 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java
@@ -22,6 +22,8 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertThat;
import org.junit.Test;
import java.io.*;
@@ -43,7 +45,9 @@ public void testRead() {
tmpFile.deleteOnExit();
outFile.deleteOnExit();
- long maxBandwidth = copyAndAssert(tmpFile, outFile, 0, 1, -1, CB.BUFFER);
+ // Correction: we should use CB.ONE_C mode to calculate the maxBandwidth,
+ // because CB.ONE_C's speed is the lowest.
+ long maxBandwidth = copyAndAssert(tmpFile, outFile, 0, 1, -1, CB.ONE_C);
copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.BUFFER);
/*
@@ -90,10 +94,16 @@ private long copyAndAssert(File tmpFile, File outFile,
}
LOG.info("{}", in);
+ /*
+ in.getBytesPerSec() should not be called repeatedly,
+ because each call will return a different value,
+ and because the program execution also takes time,
+ which magnifies the error of getBytesPerSec()
+ */
bandwidth = in.getBytesPerSec();
Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length());
- Assert.assertTrue(in.getBytesPerSec() > maxBandwidth / (factor * 1.2));
- Assert.assertTrue(in.getTotalSleepTime() > sleepTime || in.getBytesPerSec() <= maxBPS);
+ Assert.assertTrue(bandwidth > maxBandwidth / (factor * 1.2));
+ Assert.assertTrue(in.getTotalSleepTime() > sleepTime || bandwidth <= maxBPS);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
@@ -154,4 +164,61 @@ private void writeToFile(File tmpFile, long sizeInKB) throws IOException {
IOUtils.closeStream(out);
}
}
-}
+
+ /**
+ * Distcp: When handle the small files,
+ * the bandwidth parameter will be invalid, fix this bug
+ */
+ @Test
+ public void testFixThrottleInvalid() {
+ int testFileCnt = 100;
+ int fileSize = 19;
+ int bandwidth= 20;
+ File[] srcFiles = new File[testFileCnt];
+ File destFile;
+ try {
+ destFile = createFile(testFileCnt * 100 * 1024);
+ destFile.deleteOnExit();
+
+ // create srcFile
+ for (int i = 0; i < srcFiles.length; i++) {
+ srcFiles[i] = createFile(fileSize * 1024);
+ srcFiles[i].deleteOnExit();
+ }
+
+ long begin = System.currentTimeMillis();
+ LOG.info("begin: " + begin);
+
+ // copy srcFiles
+ for (File srcFile : srcFiles) {
+ LOG.info("fileLength: " + srcFiles.length);
+ copyAndAssert(srcFile, destFile, bandwidth * 1024 * 1024);
+ }
+
+ // Check whether the speed limit is successfully limited
+ long end = System.currentTimeMillis();
+ LOG.info("end: " + end);
+ assertThat((int) (end - begin) / 1000,
+ greaterThanOrEqualTo(testFileCnt * fileSize / bandwidth));
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ }
+ }
+
+ private void copyAndAssert(File tmpFile, File outFile, long maxBPS)
+ throws IOException {
+ ThrottledInputStream in = new ThrottledInputStream(new FileInputStream(tmpFile), maxBPS);
+ OutputStream out = new FileOutputStream(outFile);
+ try {
+ copyBytes(in, out, BUFF_SIZE);
+ LOG.info("{}", in);
+ Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length());
+
+ long bytesPerSec = in.getBytesPerSec();
+ Assert.assertTrue(bytesPerSec < maxBPS);
+ } finally {
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(out);
+ }
+ }
+}
\ No newline at end of file