From e91aec930fd033e71b8bcecbb8e7873c28697f0b Mon Sep 17 00:00:00 2001 From: hfutatzhanghb Date: Fri, 8 Dec 2023 17:15:58 +0800 Subject: [PATCH] HADOOP-18989. Use thread pool to improve the speed of creating control files in TestDFSIO (#6294). Contributed by farmmamba. Signed-off-by: Shuyan Zhang --- .../java/org/apache/hadoop/fs/TestDFSIO.java | 92 +++++++++++++++++-- 1 file changed, 82 insertions(+), 10 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java index 40f12295be..6ee143dcf4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java @@ -32,7 +32,16 @@ import java.util.Collection; import java.util.Date; import java.util.StringTokenizer; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -116,6 +125,10 @@ public class TestDFSIO implements Tool { "test.io.block.storage.policy"; private static final String ERASURE_CODE_POLICY_NAME_KEY = "test.io.erasure.code.policy"; + private ExecutorService excutorService = Executors.newFixedThreadPool( + 2 * Runtime.getRuntime().availableProcessors()); + private CompletionService completionService = + new ExecutorCompletionService<>(excutorService); static{ Configuration.addDefaultResource("hdfs-default.xml"); @@ -289,12 +302,43 @@ public void testTruncate() throws Exception { bench.analyzeResult(fs, TestType.TEST_TYPE_TRUNCATE, execTime); } + private class ControlFileCreateTask implements Runnable { + private SequenceFile.Writer writer = null; + private String name; + private long nrBytes; + + ControlFileCreateTask(SequenceFile.Writer writer, String name, + long nrBytes) { + this.writer = writer; + this.name = name; + this.nrBytes = nrBytes; + } + + @Override + public void run() { + try { + writer.append(new Text(name), new LongWritable(nrBytes)); + } catch (Exception e) { + LOG.error(e.getLocalizedMessage()); + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.error(e.toString()); + } + } + writer = null; + } + } + } + @SuppressWarnings("deprecation") private void createControlFile(FileSystem fs, long nrBytes, // in bytes int nrFiles ) throws IOException { - LOG.info("creating control file: "+nrBytes+" bytes, "+nrFiles+" files"); + LOG.info("creating control file: " + nrBytes + " bytes, " + nrFiles + " files"); final int maxDirItems = config.getInt( DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT); @@ -308,7 +352,7 @@ private void createControlFile(FileSystem fs, fs.delete(controlDir, true); - for(int i=0; i < nrFiles; i++) { + for (int i = 0; i < nrFiles; i++) { String name = getFileName(i); Path controlFile = new Path(controlDir, "in_file_" + name); SequenceFile.Writer writer = null; @@ -316,19 +360,42 @@ private void createControlFile(FileSystem fs, writer = SequenceFile.createWriter(fs, config, controlFile, Text.class, LongWritable.class, CompressionType.NONE); - writer.append(new Text(name), new LongWritable(nrBytes)); + Runnable controlFileCreateTask = new ControlFileCreateTask(writer, name, nrBytes); + completionService.submit(controlFileCreateTask, "success"); } catch(Exception e) { throw new IOException(e.getLocalizedMessage()); - } finally { - if (writer != null) { - writer.close(); - } - writer = null; } } - LOG.info("created control files for: " + nrFiles + " files"); + + boolean isSuccess = false; + int count = 0; + for (int i = 0; i < nrFiles; i++) { + try { + // Since control file is quiet small, we use 3 minutes here. + Future future = completionService.poll(3, TimeUnit.MINUTES); + if (future != null) { + future.get(3, TimeUnit.MINUTES); + count++; + } else { + break; + } + } catch (ExecutionException | InterruptedException | TimeoutException e) { + throw new IOException(e); + } + } + + if (count == nrFiles) { + isSuccess = true; + } + + if (isSuccess) { + LOG.info("created control files for: " + nrFiles + " files"); + } else { + throw new IOException("Create control files timeout."); + } } + private static String getFileName(int fIdx) { return BASE_FILE_NAME + Integer.toString(fIdx); } @@ -865,7 +932,12 @@ public int run(String[] args) throws IOException { cleanup(fs); return 0; } - createControlFile(fs, nrBytes, nrFiles); + try { + createControlFile(fs, nrBytes, nrFiles); + } catch (IOException e) { + LOG.warn(e.toString()); + throw new IOException(e); + } long tStart = System.currentTimeMillis(); switch(testType) { case TEST_TYPE_WRITE: