From d716084f4503bf826ef10424d7025ea1ff4ee104 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Tue, 16 Jan 2018 10:45:45 -0800 Subject: [PATCH] MAPREDUCE-7032. Add the ability to specify a delayed replication count (miklos.szegedi@cloudera.com via rkanter) --- .../mapred/uploader/FrameworkUploader.java | 124 ++++++++++++++++-- .../uploader/TestFrameworkUploader.java | 21 ++- 2 files changed, 128 insertions(+), 17 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java index 899689dc09..ee482d7e0c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java @@ -25,6 +25,8 @@ import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -43,6 +45,8 @@ import java.nio.file.Files; import java.nio.file.NotLinkException; import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -73,7 +77,15 @@ public class FrameworkUploader implements Runnable { @VisibleForTesting String target = null; @VisibleForTesting - short replication = 10; + Path targetPath = null; + @VisibleForTesting + short initialReplication = 3; + @VisibleForTesting + short finalReplication = 10; + @VisibleForTesting + short acceptableReplication = 9; + @VisibleForTesting + int timeout = 10; private boolean ignoreSymlink = false; @VisibleForTesting @@ -101,9 +113,10 @@ public void run() { LOG.info( "Suggested mapreduce.application.classpath $PWD/" + alias + "/*"); System.out.println("Suggested classpath $PWD/" + alias + "/*"); - } catch (UploaderException|IOException e) { + } catch (UploaderException|IOException|InterruptedException e) { LOG.error("Error in execution " + e.getMessage()); e.printStackTrace(); + throw new RuntimeException(e); } } @@ -147,7 +160,7 @@ void beginUpload() throws IOException, UploaderException { if (targetStream == null) { validateTargetPath(); int lastIndex = target.indexOf('#'); - Path targetPath = + targetPath = new Path( target.substring( 0, lastIndex == -1 ? target.length() : lastIndex)); @@ -160,7 +173,7 @@ void beginUpload() throws IOException, UploaderException { targetStream = null; if (fileSystem instanceof DistributedFileSystem) { LOG.info("Set replication to " + - replication + " for path: " + targetPath); + initialReplication + " for path: " + targetPath); LOG.info("Disabling Erasure Coding for path: " + targetPath); DistributedFileSystem dfs = (DistributedFileSystem)fileSystem; DistributedFileSystem.HdfsDataOutputStreamBuilder builder = @@ -168,13 +181,13 @@ void beginUpload() throws IOException, UploaderException { .overwrite(true) .ecPolicyName( SystemErasureCodingPolicies.getReplicationPolicy().getName()); - if (replication > 0) { - builder.replication(replication); + if (initialReplication > 0) { + builder.replication(initialReplication); } targetStream = builder.build(); } else { LOG.warn("Cannot set replication to " + - replication + " for path: " + targetPath + + initialReplication + " for path: " + targetPath + " on a non-distributed fileystem " + fileSystem.getClass().getName()); } @@ -190,8 +203,70 @@ void beginUpload() throws IOException, UploaderException { } } + private long getSmallestReplicatedBlockCount() + throws IOException { + FileSystem fileSystem = targetPath.getFileSystem(new Configuration()); + FileStatus status = fileSystem.getFileStatus(targetPath); + long length = status.getLen(); + HashMap blockCount = new HashMap<>(); + + // Start with 0s for each offset + for (long offset = 0; offset < length; offset +=status.getBlockSize()) { + blockCount.put(offset, 0); + } + + // Count blocks + BlockLocation[] locations = fileSystem.getFileBlockLocations( + targetPath, 0, length); + for(BlockLocation location: locations) { + final int replicas = location.getHosts().length; + blockCount.compute( + location.getOffset(), (key, value) -> value + replicas); + } + + // Print out the results + for (long offset = 0; offset < length; offset +=status.getBlockSize()) { + LOG.info(String.format( + "Replication counts offset:%d blocks:%d", + offset, blockCount.get(offset))); + } + + return Collections.min(blockCount.values()); + } + + private void endUpload() + throws IOException, InterruptedException { + FileSystem fileSystem = targetPath.getFileSystem(new Configuration()); + if (fileSystem instanceof DistributedFileSystem) { + fileSystem.setReplication(targetPath, finalReplication); + LOG.info("Set replication to " + + finalReplication + " for path: " + targetPath); + long startTime = System.currentTimeMillis(); + long endTime = startTime; + long currentReplication = 0; + while(endTime - startTime < timeout * 1000 && + currentReplication < acceptableReplication) { + Thread.sleep(1000); + endTime = System.currentTimeMillis(); + currentReplication = getSmallestReplicatedBlockCount(); + } + if (endTime - startTime >= timeout * 1000) { + LOG.error(String.format( + "Timed out after %d seconds while waiting for acceptable" + + " replication of %d (current replication is %d)", + timeout, acceptableReplication, currentReplication)); + } + } else { + LOG.info("Cannot set replication to " + + finalReplication + " for path: " + targetPath + + " on a non-distributed fileystem " + + fileSystem.getClass().getName()); + } + } + @VisibleForTesting - void buildPackage() throws IOException, UploaderException { + void buildPackage() + throws IOException, UploaderException, InterruptedException { beginUpload(); LOG.info("Compressing tarball"); try (TarArchiveOutputStream out = new TarArchiveOutputStream( @@ -206,6 +281,7 @@ void buildPackage() throws IOException, UploaderException { out.closeArchiveEntry(); } } + endUpload(); } finally { if (targetStream != null) { targetStream.close(); @@ -378,8 +454,21 @@ boolean parseArguments(String[] args) throws IOException { .hasArg().create("target")); opts.addOption(OptionBuilder .withDescription( - "Desired replication count") - .hasArg().create("replication")); + "Desired initial replication count. Default 3.") + .hasArg().create("initialReplication")); + opts.addOption(OptionBuilder + .withDescription( + "Desired final replication count. Default 10.") + .hasArg().create("finalReplication")); + opts.addOption(OptionBuilder + .withDescription( + "Desired acceptable replication count. Default 9.") + .hasArg().create("acceptableReplication")); + opts.addOption(OptionBuilder + .withDescription( + "Desired timeout for the acceptable" + + " replication in seconds. Default 10") + .hasArg().create("timeout")); opts.addOption(OptionBuilder .withDescription("Ignore symlinks into the same directory") .create("nosymlink")); @@ -395,8 +484,19 @@ boolean parseArguments(String[] args) throws IOException { "whitelist", DefaultJars.DEFAULT_MR_JARS); blacklist = parser.getCommandLine().getOptionValue( "blacklist", DefaultJars.DEFAULT_EXCLUDED_MR_JARS); - replication = Short.parseShort(parser.getCommandLine().getOptionValue( - "replication", "10")); + initialReplication = + Short.parseShort(parser.getCommandLine().getOptionValue( + "initialReplication", "3")); + finalReplication = + Short.parseShort(parser.getCommandLine().getOptionValue( + "finalReplication", "10")); + acceptableReplication = + Short.parseShort( + parser.getCommandLine().getOptionValue( + "acceptableReplication", "9")); + timeout = + Integer.parseInt( + parser.getCommandLine().getOptionValue("timeout", "10")); if (parser.getCommandLine().hasOption("nosymlink")) { ignoreSymlink = true; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java index ef64bfe929..61c0b12f9c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java @@ -104,7 +104,10 @@ public void testArguments() throws IOException { "-blacklist", "C", "-fs", "hdfs://C:8020", "-target", "D", - "-replication", "100"}; + "-initialReplication", "100", + "-acceptableReplication", "120", + "-finalReplication", "140", + "-timeout", "10"}; FrameworkUploader uploader = new FrameworkUploader(); boolean success = uploader.parseArguments(args); Assert.assertTrue("Expected to print help", success); @@ -116,8 +119,14 @@ public void testArguments() throws IOException { uploader.blacklist); Assert.assertEquals("Target mismatch", "hdfs://C:8020/D", uploader.target); - Assert.assertEquals("Replication mismatch", 100, - uploader.replication); + Assert.assertEquals("Initial replication mismatch", 100, + uploader.initialReplication); + Assert.assertEquals("Acceptable replication mismatch", 120, + uploader.acceptableReplication); + Assert.assertEquals("Final replication mismatch", 140, + uploader.finalReplication); + Assert.assertEquals("Timeout mismatch", 10, + uploader.timeout); } /** @@ -176,7 +185,8 @@ public void testCollectPackages() throws IOException, UploaderException { * Test building a tarball from source jars. */ @Test - public void testBuildTarBall() throws IOException, UploaderException { + public void testBuildTarBall() + throws IOException, UploaderException, InterruptedException { String[] testFiles = {"upload.tar", "upload.tar.gz"}; for (String testFile: testFiles) { File parent = new File(testDir); @@ -232,7 +242,8 @@ public void testBuildTarBall() throws IOException, UploaderException { * Test upload to HDFS. */ @Test - public void testUpload() throws IOException, UploaderException { + public void testUpload() + throws IOException, UploaderException, InterruptedException { final String fileName = "/upload.tar.gz"; File parent = new File(testDir); try {