diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 5b51bce687..3d101d4f5a 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -992,6 +992,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString() over getMessage() in logging/span events. (Varun Saxena via stevel) + HADOOP-12017. Hadoop archives command should use configurable replication + factor when closing (Bibin A Chundatt via vinayakumarb) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-archives/src/main/java/org/apache/hadoop/tools/HadoopArchives.java b/hadoop-tools/hadoop-archives/src/main/java/org/apache/hadoop/tools/HadoopArchives.java index 330830b41f..ee148503f1 100644 --- a/hadoop-tools/hadoop-archives/src/main/java/org/apache/hadoop/tools/HadoopArchives.java +++ b/hadoop-tools/hadoop-archives/src/main/java/org/apache/hadoop/tools/HadoopArchives.java @@ -100,15 +100,17 @@ public class HadoopArchives implements Tool { static final String SRC_PARENT_LABEL = NAME + ".parent.path"; /** the size of the blocks that will be created when archiving **/ static final String HAR_BLOCKSIZE_LABEL = NAME + ".block.size"; - /**the size of the part files that will be created when archiving **/ + /** the replication factor for the file in archiving. **/ + static final String HAR_REPLICATION_LABEL = NAME + ".replication.factor"; + /** the size of the part files that will be created when archiving **/ static final String HAR_PARTSIZE_LABEL = NAME + ".partfile.size"; /** size of each part file size **/ long partSize = 2 * 1024 * 1024 * 1024l; /** size of blocks in hadoop archives **/ long blockSize = 512 * 1024 * 1024l; - /** the desired replication degree; default is 10 **/ - short repl = 10; + /** the desired replication degree; default is 3 **/ + short repl = 3; private static final String usage = "archive" + " <-archiveName .har> <-p > [-r ]" + @@ -475,6 +477,7 @@ void archive(Path parentPath, List srcPaths, conf.setLong(HAR_PARTSIZE_LABEL, partSize); conf.set(DST_HAR_LABEL, archiveName); conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString()); + conf.setInt(HAR_REPLICATION_LABEL, repl); Path outputPath = new Path(dest, archiveName); FileOutputFormat.setOutputPath(conf, outputPath); FileSystem outFs = outputPath.getFileSystem(conf); @@ -549,8 +552,6 @@ void archive(Path parentPath, List srcPaths, } finally { srcWriter.close(); } - //increase the replication of src files - jobfs.setReplication(srcFiles, repl); conf.setInt(SRC_COUNT_LABEL, numFiles); conf.setLong(TOTAL_SIZE_LABEL, totalSize); int numMaps = (int)(totalSize/partSize); @@ -587,6 +588,7 @@ static class HArchivesMapper FileSystem destFs = null; byte[] buffer; int buf_size = 128 * 1024; + private int replication = 3; long blockSize = 512 * 1024 * 1024l; // configure the mapper and create @@ -595,7 +597,7 @@ static class HArchivesMapper // tmp files. public void configure(JobConf conf) { this.conf = conf; - + replication = conf.getInt(HAR_REPLICATION_LABEL, 3); // this is tightly tied to map reduce // since it does not expose an api // to get the partition @@ -712,6 +714,7 @@ public void map(LongWritable key, HarEntry value, public void close() throws IOException { // close the part files. partStream.close(); + destFs.setReplication(tmpOutput, (short) replication); } } @@ -732,6 +735,7 @@ static class HArchivesReducer implements Reducer listFiles = + fs.listFiles(new Path(archivePath.toString() + "/" + harName), false); + while (listFiles.hasNext()) { + LocatedFileStatus next = listFiles.next(); + if (!next.getPath().toString().endsWith("_SUCCESS")) { + assertEquals(next.getPath().toString(), 2, next.getReplication()); + } + } return fullHarPathStr; }