HDFS-14617. Improve oiv tool to parse fsimage file in parallel with delimited format. (#2918). Contributed by Hongbing Wang.
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
708a0ce21b
commit
88975496d8
@ -107,6 +107,7 @@ public class OfflineImageViewerPB {
|
|||||||
+ " Delimited outputs. If not set, the processor\n"
|
+ " Delimited outputs. If not set, the processor\n"
|
||||||
+ " constructs the namespace in memory \n"
|
+ " constructs the namespace in memory \n"
|
||||||
+ " before outputting text.\n"
|
+ " before outputting text.\n"
|
||||||
|
+ "-m,--multiThread <arg> Use multiThread to process sub-sections.\n"
|
||||||
+ "-h,--help Display usage information and exit\n";
|
+ "-h,--help Display usage information and exit\n";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -132,6 +133,7 @@ private static Options buildOptions() {
|
|||||||
options.addOption("delimiter", true, "");
|
options.addOption("delimiter", true, "");
|
||||||
options.addOption("sp", false, "");
|
options.addOption("sp", false, "");
|
||||||
options.addOption("t", "temp", true, "");
|
options.addOption("t", "temp", true, "");
|
||||||
|
options.addOption("m", "multiThread", true, "");
|
||||||
|
|
||||||
return options;
|
return options;
|
||||||
}
|
}
|
||||||
@ -185,6 +187,7 @@ public static int run(String[] args) throws Exception {
|
|||||||
String delimiter = cmd.getOptionValue("delimiter",
|
String delimiter = cmd.getOptionValue("delimiter",
|
||||||
PBImageTextWriter.DEFAULT_DELIMITER);
|
PBImageTextWriter.DEFAULT_DELIMITER);
|
||||||
String tempPath = cmd.getOptionValue("t", "");
|
String tempPath = cmd.getOptionValue("t", "");
|
||||||
|
int threads = Integer.parseInt(cmd.getOptionValue("m", "1"));
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
PrintStream out = null;
|
PrintStream out = null;
|
||||||
@ -227,15 +230,14 @@ public static int run(String[] args) throws Exception {
|
|||||||
boolean printStoragePolicy = cmd.hasOption("sp");
|
boolean printStoragePolicy = cmd.hasOption("sp");
|
||||||
try (PBImageDelimitedTextWriter writer =
|
try (PBImageDelimitedTextWriter writer =
|
||||||
new PBImageDelimitedTextWriter(out, delimiter,
|
new PBImageDelimitedTextWriter(out, delimiter,
|
||||||
tempPath, printStoragePolicy);
|
tempPath, printStoragePolicy, threads, outputFile)) {
|
||||||
RandomAccessFile r = new RandomAccessFile(inputFile, "r")) {
|
writer.visit(inputFile);
|
||||||
writer.visit(r);
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case "DETECTCORRUPTION":
|
case "DETECTCORRUPTION":
|
||||||
try (PBImageCorruptionDetector detector =
|
try (PBImageCorruptionDetector detector =
|
||||||
new PBImageCorruptionDetector(out, delimiter, tempPath)) {
|
new PBImageCorruptionDetector(out, delimiter, tempPath)) {
|
||||||
detector.visit(new RandomAccessFile(inputFile, "r"));
|
detector.visit(inputFile);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -337,7 +337,7 @@ public void afterOutput() throws IOException {
|
|||||||
if (parentId != -1) {
|
if (parentId != -1) {
|
||||||
entryBuilder.setParentId(parentId);
|
entryBuilder.setParentId(parentId);
|
||||||
}
|
}
|
||||||
printIfNotEmpty(entryBuilder.build());
|
printIfNotEmpty(serialOutStream(), entryBuilder.build());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -146,7 +146,13 @@ public String build() {
|
|||||||
PBImageDelimitedTextWriter(PrintStream out, String delimiter,
|
PBImageDelimitedTextWriter(PrintStream out, String delimiter,
|
||||||
String tempPath, boolean printStoragePolicy)
|
String tempPath, boolean printStoragePolicy)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super(out, delimiter, tempPath);
|
this(out, delimiter, tempPath, printStoragePolicy, 1, "-");
|
||||||
|
}
|
||||||
|
|
||||||
|
PBImageDelimitedTextWriter(PrintStream out, String delimiter,
|
||||||
|
String tempPath, boolean printStoragePolicy, int threads,
|
||||||
|
String parallelOut) throws IOException {
|
||||||
|
super(out, delimiter, tempPath, threads, parallelOut);
|
||||||
this.printStoragePolicy = printStoragePolicy;
|
this.printStoragePolicy = printStoragePolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,17 +21,25 @@
|
|||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.io.RandomAccessFile;
|
import java.io.RandomAccessFile;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.FileChannel;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -455,20 +463,22 @@ public String getParentPath(long inode) throws IOException {
|
|||||||
return "/";
|
return "/";
|
||||||
}
|
}
|
||||||
long parent = getFromDirChildMap(inode);
|
long parent = getFromDirChildMap(inode);
|
||||||
if (!dirPathCache.containsKey(parent)) {
|
byte[] bytes = dirMap.get(toBytes(parent));
|
||||||
byte[] bytes = dirMap.get(toBytes(parent));
|
synchronized (this) {
|
||||||
if (parent != INodeId.ROOT_INODE_ID && bytes == null) {
|
if (!dirPathCache.containsKey(parent)) {
|
||||||
// The parent is an INodeReference, which is generated from snapshot.
|
if (parent != INodeId.ROOT_INODE_ID && bytes == null) {
|
||||||
// For delimited oiv tool, no need to print out metadata in snapshots.
|
// The parent is an INodeReference, which is generated from snapshot.
|
||||||
throw PBImageTextWriter.createIgnoredSnapshotException(inode);
|
// For delimited oiv tool, no need to print out metadata in snapshots.
|
||||||
|
throw PBImageTextWriter.createIgnoredSnapshotException(inode);
|
||||||
|
}
|
||||||
|
String parentName = toString(bytes);
|
||||||
|
String parentPath =
|
||||||
|
new Path(getParentPath(parent),
|
||||||
|
parentName.isEmpty() ? "/" : parentName).toString();
|
||||||
|
dirPathCache.put(parent, parentPath);
|
||||||
}
|
}
|
||||||
String parentName = toString(bytes);
|
return dirPathCache.get(parent);
|
||||||
String parentPath =
|
|
||||||
new Path(getParentPath(parent),
|
|
||||||
parentName.isEmpty() ? "/" : parentName).toString();
|
|
||||||
dirPathCache.put(parent, parentPath);
|
|
||||||
}
|
}
|
||||||
return dirPathCache.get(parent);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -493,9 +503,12 @@ public long getParentId(long id) throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private SerialNumberManager.StringTable stringTable;
|
private SerialNumberManager.StringTable stringTable;
|
||||||
private PrintStream out;
|
private final PrintStream out;
|
||||||
private MetadataMap metadataMap = null;
|
private MetadataMap metadataMap = null;
|
||||||
private String delimiter;
|
private String delimiter;
|
||||||
|
private File filename;
|
||||||
|
private int numThreads;
|
||||||
|
private String parallelOutputFile;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a PB FsImage writer to generate text file.
|
* Construct a PB FsImage writer to generate text file.
|
||||||
@ -503,8 +516,8 @@ public long getParentId(long id) throws IOException {
|
|||||||
* @param tempPath the path to store metadata. If it is empty, store metadata
|
* @param tempPath the path to store metadata. If it is empty, store metadata
|
||||||
* in memory instead.
|
* in memory instead.
|
||||||
*/
|
*/
|
||||||
PBImageTextWriter(PrintStream out, String delimiter, String tempPath)
|
PBImageTextWriter(PrintStream out, String delimiter, String tempPath,
|
||||||
throws IOException {
|
int numThreads, String parallelOutputFile) throws IOException {
|
||||||
this.out = out;
|
this.out = out;
|
||||||
this.delimiter = delimiter;
|
this.delimiter = delimiter;
|
||||||
if (tempPath.isEmpty()) {
|
if (tempPath.isEmpty()) {
|
||||||
@ -512,6 +525,17 @@ public long getParentId(long id) throws IOException {
|
|||||||
} else {
|
} else {
|
||||||
metadataMap = new LevelDBMetadataMap(tempPath);
|
metadataMap = new LevelDBMetadataMap(tempPath);
|
||||||
}
|
}
|
||||||
|
this.numThreads = numThreads;
|
||||||
|
this.parallelOutputFile = parallelOutputFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
PBImageTextWriter(PrintStream out, String delimiter, String tempPath)
|
||||||
|
throws IOException {
|
||||||
|
this(out, delimiter, tempPath, 1, "-");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected PrintStream serialOutStream() {
|
||||||
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -562,7 +586,9 @@ void append(StringBuffer buffer, String field) {
|
|||||||
*/
|
*/
|
||||||
abstract protected void afterOutput() throws IOException;
|
abstract protected void afterOutput() throws IOException;
|
||||||
|
|
||||||
public void visit(RandomAccessFile file) throws IOException {
|
public void visit(String filePath) throws IOException {
|
||||||
|
filename = new File(filePath);
|
||||||
|
RandomAccessFile file = new RandomAccessFile(filePath, "r");
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
if (!FSImageUtil.checkFileFormat(file)) {
|
if (!FSImageUtil.checkFileFormat(file)) {
|
||||||
throw new IOException("Unrecognized FSImage");
|
throw new IOException("Unrecognized FSImage");
|
||||||
@ -642,21 +668,122 @@ long getParentId(long id) throws IOException {
|
|||||||
private void output(Configuration conf, FileSummary summary,
|
private void output(Configuration conf, FileSummary summary,
|
||||||
FileInputStream fin, ArrayList<FileSummary.Section> sections)
|
FileInputStream fin, ArrayList<FileSummary.Section> sections)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
ArrayList<FileSummary.Section> allINodeSubSections =
|
||||||
|
getINodeSubSections(sections);
|
||||||
|
if (numThreads > 1 && !parallelOutputFile.equals("-") &&
|
||||||
|
allINodeSubSections.size() > 1) {
|
||||||
|
outputInParallel(conf, summary, allINodeSubSections);
|
||||||
|
} else {
|
||||||
|
LOG.info("Serial output due to threads num: {}, parallel output file: {}, " +
|
||||||
|
"subSections: {}.", numThreads, parallelOutputFile, allINodeSubSections.size());
|
||||||
|
outputInSerial(conf, summary, fin, sections);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void outputInSerial(Configuration conf, FileSummary summary,
|
||||||
|
FileInputStream fin, ArrayList<FileSummary.Section> sections)
|
||||||
|
throws IOException {
|
||||||
InputStream is;
|
InputStream is;
|
||||||
long startTime = Time.monotonicNow();
|
long startTime = Time.monotonicNow();
|
||||||
out.println(getHeader());
|
serialOutStream().println(getHeader());
|
||||||
for (FileSummary.Section section : sections) {
|
for (FileSummary.Section section : sections) {
|
||||||
if (SectionName.fromString(section.getName()) == SectionName.INODE) {
|
if (SectionName.fromString(section.getName()) == SectionName.INODE) {
|
||||||
fin.getChannel().position(section.getOffset());
|
fin.getChannel().position(section.getOffset());
|
||||||
is = FSImageUtil.wrapInputStreamForCompression(conf,
|
is = FSImageUtil.wrapInputStreamForCompression(conf,
|
||||||
summary.getCodec(), new BufferedInputStream(new LimitInputStream(
|
summary.getCodec(), new BufferedInputStream(new LimitInputStream(
|
||||||
fin, section.getLength())));
|
fin, section.getLength())));
|
||||||
outputINodes(is);
|
INodeSection s = INodeSection.parseDelimitedFrom(is);
|
||||||
|
LOG.info("Found {} INodes in the INode section", s.getNumInodes());
|
||||||
|
int count = outputINodes(is, serialOutStream());
|
||||||
|
LOG.info("Outputted {} INodes.", count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
afterOutput();
|
afterOutput();
|
||||||
long timeTaken = Time.monotonicNow() - startTime;
|
long timeTaken = Time.monotonicNow() - startTime;
|
||||||
LOG.debug("Time to output inodes: {}ms", timeTaken);
|
LOG.debug("Time to output inodes: {} ms", timeTaken);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* STEP1: Multi-threaded process sub-sections.
|
||||||
|
* Given n (n>1) threads to process k (k>=n) sections,
|
||||||
|
* output parsed results of each section to tmp file in order.
|
||||||
|
* STEP2: Merge tmp files.
|
||||||
|
*/
|
||||||
|
private void outputInParallel(Configuration conf, FileSummary summary,
|
||||||
|
ArrayList<FileSummary.Section> subSections)
|
||||||
|
throws IOException {
|
||||||
|
int nThreads = Integer.min(numThreads, subSections.size());
|
||||||
|
LOG.info("Outputting in parallel with {} sub-sections using {} threads",
|
||||||
|
subSections.size(), nThreads);
|
||||||
|
final CopyOnWriteArrayList<IOException> exceptions = new CopyOnWriteArrayList<>();
|
||||||
|
CountDownLatch latch = new CountDownLatch(subSections.size());
|
||||||
|
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
|
||||||
|
AtomicLong expectedINodes = new AtomicLong(0);
|
||||||
|
AtomicLong totalParsed = new AtomicLong(0);
|
||||||
|
String codec = summary.getCodec();
|
||||||
|
String[] paths = new String[subSections.size()];
|
||||||
|
|
||||||
|
for (int i = 0; i < subSections.size(); i++) {
|
||||||
|
paths[i] = parallelOutputFile + ".tmp." + i;
|
||||||
|
int index = i;
|
||||||
|
executorService.submit(() -> {
|
||||||
|
LOG.info("Output iNodes of section-{}", index);
|
||||||
|
InputStream is = null;
|
||||||
|
try (PrintStream outStream = new PrintStream(paths[index], "UTF-8")) {
|
||||||
|
long startTime = Time.monotonicNow();
|
||||||
|
is = getInputStreamForSection(subSections.get(index), codec, conf);
|
||||||
|
if (index == 0) {
|
||||||
|
// The first iNode section has a header which must be processed first
|
||||||
|
INodeSection s = INodeSection.parseDelimitedFrom(is);
|
||||||
|
expectedINodes.set(s.getNumInodes());
|
||||||
|
}
|
||||||
|
totalParsed.addAndGet(outputINodes(is, outStream));
|
||||||
|
long timeTaken = Time.monotonicNow() - startTime;
|
||||||
|
LOG.info("Time to output iNodes of section-{}: {} ms", index, timeTaken);
|
||||||
|
} catch (Exception e) {
|
||||||
|
exceptions.add(new IOException(e));
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
try {
|
||||||
|
if (is != null) {
|
||||||
|
is.close();
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("Failed to close the input stream, ignoring", ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
latch.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.error("Interrupted waiting for countdown latch", e);
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
executorService.shutdown();
|
||||||
|
if (exceptions.size() != 0) {
|
||||||
|
LOG.error("Failed to output INode sub-sections, {} exception(s) occurred.",
|
||||||
|
exceptions.size());
|
||||||
|
throw exceptions.get(0);
|
||||||
|
}
|
||||||
|
if (totalParsed.get() != expectedINodes.get()) {
|
||||||
|
throw new IOException("Expected to parse " + expectedINodes + " in parallel, " +
|
||||||
|
"but parsed " + totalParsed.get() + ". The image may be corrupt.");
|
||||||
|
}
|
||||||
|
LOG.info("Completed outputting all INode sub-sections to {} tmp files.",
|
||||||
|
subSections.size());
|
||||||
|
|
||||||
|
try (PrintStream ps = new PrintStream(parallelOutputFile, "UTF-8")) {
|
||||||
|
ps.println(getHeader());
|
||||||
|
}
|
||||||
|
|
||||||
|
// merge tmp files
|
||||||
|
long startTime = Time.monotonicNow();
|
||||||
|
mergeFiles(paths, parallelOutputFile);
|
||||||
|
long timeTaken = Time.monotonicNow() - startTime;
|
||||||
|
LOG.info("Completed all stages. Time to merge files: {} ms", timeTaken);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected PermissionStatus getPermission(long perm) {
|
protected PermissionStatus getPermission(long perm) {
|
||||||
@ -763,22 +890,27 @@ protected void buildNamespace(InputStream in, List<Long> refIdList)
|
|||||||
LOG.info("Scanned {} INode directories to build namespace.", count);
|
LOG.info("Scanned {} INode directories to build namespace.", count);
|
||||||
}
|
}
|
||||||
|
|
||||||
void printIfNotEmpty(String line) {
|
void printIfNotEmpty(PrintStream outStream, String line) {
|
||||||
if (!line.isEmpty()) {
|
if (!line.isEmpty()) {
|
||||||
out.println(line);
|
outStream.println(line);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void outputINodes(InputStream in) throws IOException {
|
private int outputINodes(InputStream in, PrintStream outStream)
|
||||||
INodeSection s = INodeSection.parseDelimitedFrom(in);
|
throws IOException {
|
||||||
LOG.info("Found {} INodes in the INode section", s.getNumInodes());
|
|
||||||
long ignored = 0;
|
long ignored = 0;
|
||||||
long ignoredSnapshots = 0;
|
long ignoredSnapshots = 0;
|
||||||
for (int i = 0; i < s.getNumInodes(); ++i) {
|
// As the input stream is a LimitInputStream, the reading will stop when
|
||||||
|
// EOF is encountered at the end of the stream.
|
||||||
|
int count = 0;
|
||||||
|
while (true) {
|
||||||
INode p = INode.parseDelimitedFrom(in);
|
INode p = INode.parseDelimitedFrom(in);
|
||||||
|
if (p == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
String parentPath = metadataMap.getParentPath(p.getId());
|
String parentPath = metadataMap.getParentPath(p.getId());
|
||||||
printIfNotEmpty(getEntry(parentPath, p));
|
printIfNotEmpty(outStream, getEntry(parentPath, p));
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
ignored++;
|
ignored++;
|
||||||
if (!(ioe instanceof IgnoreSnapshotException)) {
|
if (!(ioe instanceof IgnoreSnapshotException)) {
|
||||||
@ -790,16 +922,16 @@ private void outputINodes(InputStream in) throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
count++;
|
||||||
if (LOG.isDebugEnabled() && i % 100000 == 0) {
|
if (LOG.isDebugEnabled() && count % 100000 == 0) {
|
||||||
LOG.debug("Outputted {} INodes.", i);
|
LOG.debug("Outputted {} INodes.", count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (ignored > 0) {
|
if (ignored > 0) {
|
||||||
LOG.warn("Ignored {} nodes, including {} in snapshots. Please turn on"
|
LOG.warn("Ignored {} nodes, including {} in snapshots. Please turn on"
|
||||||
+ " debug log for details", ignored, ignoredSnapshots);
|
+ " debug log for details", ignored, ignoredSnapshots);
|
||||||
}
|
}
|
||||||
LOG.info("Outputted {} INodes.", s.getNumInodes());
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static IgnoreSnapshotException createIgnoredSnapshotException(
|
private static IgnoreSnapshotException createIgnoredSnapshotException(
|
||||||
@ -822,4 +954,79 @@ public int getStoragePolicy(
|
|||||||
}
|
}
|
||||||
return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ArrayList<FileSummary.Section> getINodeSubSections(
|
||||||
|
ArrayList<FileSummary.Section> sections) {
|
||||||
|
ArrayList<FileSummary.Section> subSections = new ArrayList<>();
|
||||||
|
Iterator<FileSummary.Section> iter = sections.iterator();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
FileSummary.Section s = iter.next();
|
||||||
|
if (SectionName.fromString(s.getName()) == SectionName.INODE_SUB) {
|
||||||
|
subSections.add(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return subSections;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a FSImage FileSummary.section, return a LimitInput stream set to
|
||||||
|
* the starting position of the section and limited to the section length.
|
||||||
|
* @param section The FileSummary.Section containing the offset and length
|
||||||
|
* @param compressionCodec The compression codec in use, if any
|
||||||
|
* @return An InputStream for the given section
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private InputStream getInputStreamForSection(FileSummary.Section section,
|
||||||
|
String compressionCodec, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
// channel of RandomAccessFile is not thread safe, use File
|
||||||
|
FileInputStream fin = new FileInputStream(filename);
|
||||||
|
try {
|
||||||
|
FileChannel channel = fin.getChannel();
|
||||||
|
channel.position(section.getOffset());
|
||||||
|
InputStream in = new BufferedInputStream(new LimitInputStream(fin,
|
||||||
|
section.getLength()));
|
||||||
|
|
||||||
|
in = FSImageUtil.wrapInputStreamForCompression(conf,
|
||||||
|
compressionCodec, in);
|
||||||
|
return in;
|
||||||
|
} catch (IOException e) {
|
||||||
|
fin.close();
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param srcPaths Source files of contents to be merged
|
||||||
|
* @param resultPath Merged file path
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void mergeFiles(String[] srcPaths, String resultPath)
|
||||||
|
throws IOException {
|
||||||
|
if (srcPaths == null || srcPaths.length < 1) {
|
||||||
|
LOG.warn("no source files to merge.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
File[] files = new File[srcPaths.length];
|
||||||
|
for (int i = 0; i < srcPaths.length; i++) {
|
||||||
|
files[i] = new File(srcPaths[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
File resultFile = new File(resultPath);
|
||||||
|
try (FileChannel resultChannel =
|
||||||
|
new FileOutputStream(resultFile, true).getChannel()) {
|
||||||
|
for (File file : files) {
|
||||||
|
try (FileChannel src = new FileInputStream(file).getChannel()) {
|
||||||
|
resultChannel.transferFrom(src, resultChannel.size(), src.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (File file : files) {
|
||||||
|
if (!file.delete() && file.exists()) {
|
||||||
|
LOG.warn("delete tmp file: {} returned false", file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -83,8 +83,10 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto;
|
import org.apache.hadoop.hdfs.server.namenode.FsImageProto;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
|
||||||
|
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.MD5Hash;
|
||||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
|
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
@ -122,6 +124,7 @@
|
|||||||
import static org.apache.hadoop.fs.permission.FsAction.ALL;
|
import static org.apache.hadoop.fs.permission.FsAction.ALL;
|
||||||
import static org.apache.hadoop.fs.permission.FsAction.EXECUTE;
|
import static org.apache.hadoop.fs.permission.FsAction.EXECUTE;
|
||||||
import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
|
import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
|
||||||
|
import static org.apache.hadoop.hdfs.MiniDFSCluster.HDFS_MINIDFS_BASEDIR;
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
|
import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
|
||||||
import static org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter.ERASURE_CODING_SECTION_NAME;
|
import static org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter.ERASURE_CODING_SECTION_NAME;
|
||||||
import static org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter.ERASURE_CODING_SECTION_POLICY;
|
import static org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter.ERASURE_CODING_SECTION_POLICY;
|
||||||
@ -186,6 +189,12 @@ public static void createOriginalFSImage() throws IOException {
|
|||||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
|
||||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL,
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL,
|
||||||
"RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT");
|
"RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT");
|
||||||
|
// fsimage with sub-section conf
|
||||||
|
conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, "true");
|
||||||
|
conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, "1");
|
||||||
|
conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, "4");
|
||||||
|
conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, "4");
|
||||||
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
DistributedFileSystem hdfs = cluster.getFileSystem();
|
DistributedFileSystem hdfs = cluster.getFileSystem();
|
||||||
@ -791,6 +800,13 @@ public void testPBDelimitedWriter() throws IOException, InterruptedException {
|
|||||||
new FileSystemTestHelper().getTestRootDir() + "/delimited.db");
|
new FileSystemTestHelper().getTestRootDir() + "/delimited.db");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParallelPBDelimitedWriter() throws Exception {
|
||||||
|
testParallelPBDelimitedWriter(""); // Test in memory db.
|
||||||
|
testParallelPBDelimitedWriter(new FileSystemTestHelper().getTestRootDir()
|
||||||
|
+ "/parallel-delimited.db");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCorruptionOutputEntryBuilder() throws IOException {
|
public void testCorruptionOutputEntryBuilder() throws IOException {
|
||||||
PBImageCorruptionDetector corrDetector =
|
PBImageCorruptionDetector corrDetector =
|
||||||
@ -882,11 +898,10 @@ private void testPBDelimitedWriter(String db)
|
|||||||
final String DELIMITER = "\t";
|
final String DELIMITER = "\t";
|
||||||
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||||
|
|
||||||
try (PrintStream o = new PrintStream(output);
|
try (PrintStream o = new PrintStream(output)) {
|
||||||
RandomAccessFile r = new RandomAccessFile(originalFsimage, "r")) {
|
|
||||||
PBImageDelimitedTextWriter v =
|
PBImageDelimitedTextWriter v =
|
||||||
new PBImageDelimitedTextWriter(o, DELIMITER, db);
|
new PBImageDelimitedTextWriter(o, DELIMITER, db);
|
||||||
v.visit(r);
|
v.visit(originalFsimage.getAbsolutePath());
|
||||||
}
|
}
|
||||||
|
|
||||||
Set<String> fileNames = new HashSet<>();
|
Set<String> fileNames = new HashSet<>();
|
||||||
@ -920,6 +935,37 @@ private void testPBDelimitedWriter(String db)
|
|||||||
assertEquals(writtenFiles.keySet(), fileNames);
|
assertEquals(writtenFiles.keySet(), fileNames);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void testParallelPBDelimitedWriter(String db) throws Exception{
|
||||||
|
String delimiter = "\t";
|
||||||
|
int numThreads = 4;
|
||||||
|
|
||||||
|
File parallelDelimitedOut = new File(tempDir, "parallelDelimitedOut");
|
||||||
|
if (OfflineImageViewerPB.run(new String[] {"-p", "Delimited",
|
||||||
|
"-i", originalFsimage.getAbsolutePath(),
|
||||||
|
"-o", parallelDelimitedOut.getAbsolutePath(),
|
||||||
|
"-delimiter", delimiter,
|
||||||
|
"-t", db,
|
||||||
|
"-m", String.valueOf(numThreads)}) != 0) {
|
||||||
|
throw new IOException("oiv returned failure outputting in parallel.");
|
||||||
|
}
|
||||||
|
MD5Hash parallelMd5 = MD5FileUtils.computeMd5ForFile(parallelDelimitedOut);
|
||||||
|
|
||||||
|
File serialDelimitedOut = new File(tempDir, "serialDelimitedOut");
|
||||||
|
if (db != "") {
|
||||||
|
db = db + "/../serial.db";
|
||||||
|
}
|
||||||
|
if (OfflineImageViewerPB.run(new String[] {"-p", "Delimited",
|
||||||
|
"-i", originalFsimage.getAbsolutePath(),
|
||||||
|
"-o", serialDelimitedOut.getAbsolutePath(),
|
||||||
|
"-t", db,
|
||||||
|
"-delimiter", delimiter}) != 0) {
|
||||||
|
throw new IOException("oiv returned failure outputting in serial.");
|
||||||
|
}
|
||||||
|
MD5Hash serialMd5 = MD5FileUtils.computeMd5ForFile(serialDelimitedOut);
|
||||||
|
|
||||||
|
assertEquals(parallelMd5, serialMd5);
|
||||||
|
}
|
||||||
|
|
||||||
private void testPBCorruptionDetector(String db)
|
private void testPBCorruptionDetector(String db)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
final String delimiter = "\t";
|
final String delimiter = "\t";
|
||||||
@ -928,7 +974,7 @@ private void testPBCorruptionDetector(String db)
|
|||||||
try (PrintStream o = new PrintStream(output)) {
|
try (PrintStream o = new PrintStream(output)) {
|
||||||
PBImageCorruptionDetector v =
|
PBImageCorruptionDetector v =
|
||||||
new PBImageCorruptionDetector(o, delimiter, db);
|
new PBImageCorruptionDetector(o, delimiter, db);
|
||||||
v.visit(new RandomAccessFile(originalFsimage, "r"));
|
v.visit(originalFsimage.getAbsolutePath());
|
||||||
}
|
}
|
||||||
|
|
||||||
try (
|
try (
|
||||||
@ -1024,7 +1070,7 @@ private String testCorruptionDetectorRun(int runNumber,
|
|||||||
try (PrintStream o = new PrintStream(output)) {
|
try (PrintStream o = new PrintStream(output)) {
|
||||||
PBImageCorruptionDetector v =
|
PBImageCorruptionDetector v =
|
||||||
new PBImageCorruptionDetector(o, ",", db);
|
new PBImageCorruptionDetector(o, ",", db);
|
||||||
v.visit(new RandomAccessFile(corruptedImage, "r"));
|
v.visit(corruptedImage.getAbsolutePath());
|
||||||
}
|
}
|
||||||
return output.toString();
|
return output.toString();
|
||||||
}
|
}
|
||||||
@ -1212,6 +1258,9 @@ public void testReverseXmlWithoutSnapshotDiffSection() throws Throwable {
|
|||||||
public void testFileDistributionCalculatorForException() throws Exception {
|
public void testFileDistributionCalculatorForException() throws Exception {
|
||||||
File fsimageFile = null;
|
File fsimageFile = null;
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
// Avoid using the same cluster dir to cause the global originalFsimage
|
||||||
|
// file to be cleared.
|
||||||
|
conf.set(HDFS_MINIDFS_BASEDIR, GenericTestUtils.getRandomizedTempPath());
|
||||||
HashMap<String, FileStatus> files = Maps.newHashMap();
|
HashMap<String, FileStatus> files = Maps.newHashMap();
|
||||||
|
|
||||||
// Create a initial fsimage file
|
// Create a initial fsimage file
|
||||||
|
@ -239,7 +239,7 @@ public void testPBDelimitedWriterForAcl() throws Exception {
|
|||||||
try (PrintStream o = new PrintStream(output)) {
|
try (PrintStream o = new PrintStream(output)) {
|
||||||
PBImageDelimitedTextWriter v =
|
PBImageDelimitedTextWriter v =
|
||||||
new PBImageDelimitedTextWriter(o, DELIMITER, ""); // run in memory.
|
new PBImageDelimitedTextWriter(o, DELIMITER, ""); // run in memory.
|
||||||
v.visit(new RandomAccessFile(originalFsimage, "r"));
|
v.visit(originalFsimage.getAbsolutePath());
|
||||||
}
|
}
|
||||||
|
|
||||||
try (
|
try (
|
||||||
|
Loading…
Reference in New Issue
Block a user