HADOOP-12017. Hadoop archives command should use configurable replication factor when closing (Contributed by Bibin A Chundatt)

This commit is contained in:
Vinayakumar B 2015-07-22 10:25:49 +05:30
parent 31f117138a
commit 94c6a4aa85
4 changed files with 33 additions and 19 deletions

View File

@ -992,6 +992,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString() HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString()
over getMessage() in logging/span events. (Varun Saxena via stevel) over getMessage() in logging/span events. (Varun Saxena via stevel)
HADOOP-12017. Hadoop archives command should use configurable replication
factor when closing (Bibin A Chundatt via vinayakumarb)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -100,15 +100,17 @@ public class HadoopArchives implements Tool {
static final String SRC_PARENT_LABEL = NAME + ".parent.path"; static final String SRC_PARENT_LABEL = NAME + ".parent.path";
/** the size of the blocks that will be created when archiving **/ /** the size of the blocks that will be created when archiving **/
static final String HAR_BLOCKSIZE_LABEL = NAME + ".block.size"; static final String HAR_BLOCKSIZE_LABEL = NAME + ".block.size";
/**the size of the part files that will be created when archiving **/ /** the replication factor for the file in archiving. **/
static final String HAR_REPLICATION_LABEL = NAME + ".replication.factor";
/** the size of the part files that will be created when archiving **/
static final String HAR_PARTSIZE_LABEL = NAME + ".partfile.size"; static final String HAR_PARTSIZE_LABEL = NAME + ".partfile.size";
/** size of each part file size **/ /** size of each part file size **/
long partSize = 2 * 1024 * 1024 * 1024l; long partSize = 2 * 1024 * 1024 * 1024l;
/** size of blocks in hadoop archives **/ /** size of blocks in hadoop archives **/
long blockSize = 512 * 1024 * 1024l; long blockSize = 512 * 1024 * 1024l;
/** the desired replication degree; default is 10 **/ /** the desired replication degree; default is 3 **/
short repl = 10; short repl = 3;
private static final String usage = "archive" private static final String usage = "archive"
+ " <-archiveName <NAME>.har> <-p <parent path>> [-r <replication factor>]" + + " <-archiveName <NAME>.har> <-p <parent path>> [-r <replication factor>]" +
@ -475,6 +477,7 @@ void archive(Path parentPath, List<Path> srcPaths,
conf.setLong(HAR_PARTSIZE_LABEL, partSize); conf.setLong(HAR_PARTSIZE_LABEL, partSize);
conf.set(DST_HAR_LABEL, archiveName); conf.set(DST_HAR_LABEL, archiveName);
conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString()); conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString());
conf.setInt(HAR_REPLICATION_LABEL, repl);
Path outputPath = new Path(dest, archiveName); Path outputPath = new Path(dest, archiveName);
FileOutputFormat.setOutputPath(conf, outputPath); FileOutputFormat.setOutputPath(conf, outputPath);
FileSystem outFs = outputPath.getFileSystem(conf); FileSystem outFs = outputPath.getFileSystem(conf);
@ -549,8 +552,6 @@ void archive(Path parentPath, List<Path> srcPaths,
} finally { } finally {
srcWriter.close(); srcWriter.close();
} }
//increase the replication of src files
jobfs.setReplication(srcFiles, repl);
conf.setInt(SRC_COUNT_LABEL, numFiles); conf.setInt(SRC_COUNT_LABEL, numFiles);
conf.setLong(TOTAL_SIZE_LABEL, totalSize); conf.setLong(TOTAL_SIZE_LABEL, totalSize);
int numMaps = (int)(totalSize/partSize); int numMaps = (int)(totalSize/partSize);
@ -587,6 +588,7 @@ static class HArchivesMapper
FileSystem destFs = null; FileSystem destFs = null;
byte[] buffer; byte[] buffer;
int buf_size = 128 * 1024; int buf_size = 128 * 1024;
private int replication = 3;
long blockSize = 512 * 1024 * 1024l; long blockSize = 512 * 1024 * 1024l;
// configure the mapper and create // configure the mapper and create
@ -595,7 +597,7 @@ static class HArchivesMapper
// tmp files. // tmp files.
public void configure(JobConf conf) { public void configure(JobConf conf) {
this.conf = conf; this.conf = conf;
replication = conf.getInt(HAR_REPLICATION_LABEL, 3);
// this is tightly tied to map reduce // this is tightly tied to map reduce
// since it does not expose an api // since it does not expose an api
// to get the partition // to get the partition
@ -712,6 +714,7 @@ public void map(LongWritable key, HarEntry value,
public void close() throws IOException { public void close() throws IOException {
// close the part files. // close the part files.
partStream.close(); partStream.close();
destFs.setReplication(tmpOutput, (short) replication);
} }
} }
@ -732,6 +735,7 @@ static class HArchivesReducer implements Reducer<IntWritable,
private int numIndexes = 1000; private int numIndexes = 1000;
private Path tmpOutputDir = null; private Path tmpOutputDir = null;
private int written = 0; private int written = 0;
private int replication = 3;
private int keyVal = 0; private int keyVal = 0;
// configure // configure
@ -740,6 +744,7 @@ public void configure(JobConf conf) {
tmpOutputDir = FileOutputFormat.getWorkOutputPath(this.conf); tmpOutputDir = FileOutputFormat.getWorkOutputPath(this.conf);
masterIndex = new Path(tmpOutputDir, "_masterindex"); masterIndex = new Path(tmpOutputDir, "_masterindex");
index = new Path(tmpOutputDir, "_index"); index = new Path(tmpOutputDir, "_index");
replication = conf.getInt(HAR_REPLICATION_LABEL, 3);
try { try {
fs = masterIndex.getFileSystem(conf); fs = masterIndex.getFileSystem(conf);
if (fs.exists(masterIndex)) { if (fs.exists(masterIndex)) {
@ -798,8 +803,8 @@ public void close() throws IOException {
outStream.close(); outStream.close();
indexStream.close(); indexStream.close();
// try increasing the replication // try increasing the replication
fs.setReplication(index, (short) 5); fs.setReplication(index, (short) replication);
fs.setReplication(masterIndex, (short) 5); fs.setReplication(masterIndex, (short) replication);
} }
} }

View File

@ -53,7 +53,7 @@ How to Create an Archive
sections. sections.
-r indicates the desired replication factor; if this optional argument is -r indicates the desired replication factor; if this optional argument is
not specified, a replication factor of 10 will be used. not specified, a replication factor of 3 will be used.
If you just want to archive a single directory /foo/bar then you can just use If you just want to archive a single directory /foo/bar then you can just use

View File

@ -21,7 +21,6 @@
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.FilterInputStream; import java.io.FilterInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
@ -39,7 +38,9 @@
import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.HarFileSystem; import org.apache.hadoop.fs.HarFileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.JarFinder;
@ -110,13 +111,9 @@ public void setUp() throws Exception {
conf.set(CapacitySchedulerConfiguration.PREFIX conf.set(CapacitySchedulerConfiguration.PREFIX
+ CapacitySchedulerConfiguration.ROOT + ".default." + CapacitySchedulerConfiguration.ROOT + ".default."
+ CapacitySchedulerConfiguration.CAPACITY, "100"); + CapacitySchedulerConfiguration.CAPACITY, "100");
dfscluster = new MiniDFSCluster dfscluster =
.Builder(conf) new MiniDFSCluster.Builder(conf).checkExitOnShutdown(true)
.checkExitOnShutdown(true) .numDataNodes(3).format(true).racks(null).build();
.numDataNodes(2)
.format(true)
.racks(null)
.build();
fs = dfscluster.getFileSystem(); fs = dfscluster.getFileSystem();
@ -753,12 +750,21 @@ private String makeArchiveWithRepl() throws Exception {
final String harName = "foo.har"; final String harName = "foo.har";
final String fullHarPathStr = prefix + harName; final String fullHarPathStr = prefix + harName;
final String[] args = { "-archiveName", harName, "-p", inputPathStr, "-r", final String[] args =
"3", "*", archivePath.toString() }; { "-archiveName", harName, "-p", inputPathStr, "-r", "2", "*",
archivePath.toString() };
System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH, System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH,
HADOOP_ARCHIVES_JAR); HADOOP_ARCHIVES_JAR);
final HadoopArchives har = new HadoopArchives(conf); final HadoopArchives har = new HadoopArchives(conf);
assertEquals(0, ToolRunner.run(har, args)); assertEquals(0, ToolRunner.run(har, args));
RemoteIterator<LocatedFileStatus> listFiles =
fs.listFiles(new Path(archivePath.toString() + "/" + harName), false);
while (listFiles.hasNext()) {
LocatedFileStatus next = listFiles.next();
if (!next.getPath().toString().endsWith("_SUCCESS")) {
assertEquals(next.getPath().toString(), 2, next.getReplication());
}
}
return fullHarPathStr; return fullHarPathStr;
} }