From 8528d5783d14ce777341d481894e6d4171b8fe24 Mon Sep 17 00:00:00 2001 From: xiaojunxiang Date: Thu, 28 Mar 2024 22:31:06 +0800 Subject: [PATCH] HDFS-17216. Distcp: When handle the small files, the bandwidth parameter will be invalid, fix this bug. (#6138) --- hadoop-tools/hadoop-distcp/pom.xml | 6 ++ .../tools/util/ThrottledInputStream.java | 9 +-- .../tools/util/TestThrottledInputStream.java | 75 ++++++++++++++++++- 3 files changed, 81 insertions(+), 9 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/pom.xml b/hadoop-tools/hadoop-distcp/pom.xml index d0486a6794..fd592d2395 100644 --- a/hadoop-tools/hadoop-distcp/pom.xml +++ b/hadoop-tools/hadoop-distcp/pom.xml @@ -114,6 +114,12 @@ assertj-core test + + org.hamcrest + hamcrest-library + 1.3 + test + diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java index 4d3676a669..66cac955da 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java @@ -120,12 +120,11 @@ public long getTotalBytesRead() { * @return Read rate, in bytes/sec. */ public long getBytesPerSec() { - long elapsed = (System.currentTimeMillis() - startTime) / 1000; - if (elapsed == 0) { - return bytesRead; - } else { - return bytesRead / elapsed; + if (bytesRead == 0){ + return 0; } + float elapsed = (System.currentTimeMillis() - startTime) / 1000.0f; + return (long) (bytesRead / elapsed); } /** diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java index 6a57217719..0b5ebf6c69 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java @@ -22,6 +22,8 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.io.IOUtils; import org.junit.Assert; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertThat; import org.junit.Test; import java.io.*; @@ -43,7 +45,9 @@ public void testRead() { tmpFile.deleteOnExit(); outFile.deleteOnExit(); - long maxBandwidth = copyAndAssert(tmpFile, outFile, 0, 1, -1, CB.BUFFER); + // Correction: we should use CB.ONE_C mode to calculate the maxBandwidth, + // because CB.ONE_C's speed is the lowest. + long maxBandwidth = copyAndAssert(tmpFile, outFile, 0, 1, -1, CB.ONE_C); copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.BUFFER); /* @@ -90,10 +94,16 @@ private long copyAndAssert(File tmpFile, File outFile, } LOG.info("{}", in); + /* + in.getBytesPerSec() should not be called repeatedly, + because each call will return a different value, + and because the program execution also takes time, + which magnifies the error of getBytesPerSec() + */ bandwidth = in.getBytesPerSec(); Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length()); - Assert.assertTrue(in.getBytesPerSec() > maxBandwidth / (factor * 1.2)); - Assert.assertTrue(in.getTotalSleepTime() > sleepTime || in.getBytesPerSec() <= maxBPS); + Assert.assertTrue(bandwidth > maxBandwidth / (factor * 1.2)); + Assert.assertTrue(in.getTotalSleepTime() > sleepTime || bandwidth <= maxBPS); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); @@ -154,4 +164,61 @@ private void writeToFile(File tmpFile, long sizeInKB) throws IOException { IOUtils.closeStream(out); } } -} + + /** + * Distcp: When handle the small files, + * the bandwidth parameter will be invalid, fix this bug + */ + @Test + public void testFixThrottleInvalid() { + int testFileCnt = 100; + int fileSize = 19; + int bandwidth= 20; + File[] srcFiles = new File[testFileCnt]; + File destFile; + try { + destFile = createFile(testFileCnt * 100 * 1024); + destFile.deleteOnExit(); + + // create srcFile + for (int i = 0; i < srcFiles.length; i++) { + srcFiles[i] = createFile(fileSize * 1024); + srcFiles[i].deleteOnExit(); + } + + long begin = System.currentTimeMillis(); + LOG.info("begin: " + begin); + + // copy srcFiles + for (File srcFile : srcFiles) { + LOG.info("fileLength: " + srcFiles.length); + copyAndAssert(srcFile, destFile, bandwidth * 1024 * 1024); + } + + // Check whether the speed limit is successfully limited + long end = System.currentTimeMillis(); + LOG.info("end: " + end); + assertThat((int) (end - begin) / 1000, + greaterThanOrEqualTo(testFileCnt * fileSize / bandwidth)); + } catch (IOException e) { + LOG.error("Exception encountered ", e); + } + } + + private void copyAndAssert(File tmpFile, File outFile, long maxBPS) + throws IOException { + ThrottledInputStream in = new ThrottledInputStream(new FileInputStream(tmpFile), maxBPS); + OutputStream out = new FileOutputStream(outFile); + try { + copyBytes(in, out, BUFF_SIZE); + LOG.info("{}", in); + Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length()); + + long bytesPerSec = in.getBytesPerSec(); + Assert.assertTrue(bytesPerSec < maxBPS); + } finally { + IOUtils.closeStream(in); + IOUtils.closeStream(out); + } + } +} \ No newline at end of file