HDFS-14195. OIV: print out storage policy id in oiv Delimited output. Contributed by Wang, Xinglong.

This commit is contained in:
Wei-Chiu Chuang 2019-08-09 15:37:29 -07:00
parent 98dd7c48ff
commit 865021b8c9
8 changed files with 281 additions and 24 deletions

View File

@ -149,4 +149,9 @@ public static XAttr buildXAttr(byte policyId) {
public static String getStoragePolicyXAttrPrefixedName() {
return XAttrHelper.getPrefixedName(XAttrNS, STORAGE_POLICY_XATTR_NAME);
}
public static boolean isStoragePolicyXAttr(XAttr xattr) {
return xattr != null && xattr.getNameSpace() == XAttrNS
&& xattr.getName().equals(STORAGE_POLICY_XATTR_NAME);
}
}

View File

@ -79,6 +79,7 @@ public class OfflineImageViewerPB {
+ " to both inodes and inodes-under-construction, separated by a\n"
+ " delimiter. The default delimiter is \\t, though this may be\n"
+ " changed via the -delimiter argument.\n"
+ " -sp print storage policy, used by delimiter only.\n"
+ " * DetectCorruption: Detect potential corruption of the image by\n"
+ " selectively loading parts of it and actively searching for\n"
+ " inconsistencies. Outputs a summary of the found corruptions\n"
@ -129,6 +130,7 @@ private static Options buildOptions() {
options.addOption("format", false, "");
options.addOption("addr", true, "");
options.addOption("delimiter", true, "");
options.addOption("sp", false, "");
options.addOption("t", "temp", true, "");
return options;
@ -222,8 +224,10 @@ public static int run(String[] args) throws Exception {
}
break;
case "DELIMITED":
boolean printStoragePolicy = cmd.hasOption("sp");
try (PBImageDelimitedTextWriter writer =
new PBImageDelimitedTextWriter(out, delimiter, tempPath);
new PBImageDelimitedTextWriter(out, delimiter,
tempPath, printStoragePolicy);
RandomAccessFile r = new RandomAccessFile(inputFile, "r")) {
writer.visit(r);
}

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeFile;
@ -44,6 +45,7 @@
*/
public class PBImageDelimitedTextWriter extends PBImageTextWriter {
private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm";
private boolean printStoragePolicy;
static class OutputEntryBuilder {
private final SimpleDateFormat dateFormatter =
@ -59,6 +61,7 @@ static class OutputEntryBuilder {
private long fileSize = 0;
private long nsQuota = 0;
private long dsQuota = 0;
private int storagePolicy = 0;
private String dirPermission = "-";
private PermissionStatus permissionStatus;
@ -79,6 +82,7 @@ static class OutputEntryBuilder {
if (file.hasAcl() && file.getAcl().getEntriesCount() > 0){
aclPermission = "+";
}
storagePolicy = file.getStoragePolicyID();
break;
case DIRECTORY:
INodeDirectory dir = inode.getDirectory();
@ -90,12 +94,14 @@ static class OutputEntryBuilder {
if (dir.hasAcl() && dir.getAcl().getEntriesCount() > 0) {
aclPermission = "+";
}
storagePolicy = writer.getStoragePolicy(dir.getXAttrs());
break;
case SYMLINK:
INodeSymlink s = inode.getSymlink();
modificationTime = s.getModificationTime();
accessTime = s.getAccessTime();
permissionStatus = writer.getPermission(s.getPermission());
storagePolicy = HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
break;
default:
break;
@ -125,13 +131,23 @@ public String build() {
permissionStatus.getPermission().toString() + aclPermission);
writer.append(buffer, permissionStatus.getUserName());
writer.append(buffer, permissionStatus.getGroupName());
if (writer.printStoragePolicy) {
writer.append(buffer, storagePolicy);
}
return buffer.substring(1);
}
}
PBImageDelimitedTextWriter(PrintStream out, String delimiter, String tempPath)
throws IOException {
this(out, delimiter, tempPath, false);
}
PBImageDelimitedTextWriter(PrintStream out, String delimiter,
String tempPath, boolean printStoragePolicy)
throws IOException {
super(out, delimiter, tempPath);
this.printStoragePolicy = printStoragePolicy;
}
@Override
@ -162,6 +178,9 @@ public String getHeader() {
append(buffer, "Permission");
append(buffer, "UserName");
append(buffer, "GroupName");
if (printStoragePolicy) {
append(buffer, "StoragePolicyId");
}
return buffer.toString();
}

View File

@ -24,7 +24,10 @@
import org.apache.commons.text.StringEscapeUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName;
@ -805,4 +808,16 @@ private static IgnoreSnapshotException createIgnoredSnapshotException(
}
return new IgnoreSnapshotException();
}
public int getStoragePolicy(
INodeSection.XAttrFeatureProto xattrFeatureProto) {
List<XAttr> xattrs =
FSImageFormatPBINode.Loader.loadXAttrs(xattrFeatureProto, stringTable);
for (XAttr xattr : xattrs) {
if (BlockStoragePolicySuite.isStoragePolicyXAttr(xattr)) {
return xattr.getValue()[0];
}
}
return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
}
}

View File

@ -368,6 +368,25 @@ public void createFiles(FileSystem fs, String topdir) throws IOException {
createFiles(fs, topdir, (short)3);
}
public static String readResoucePlainFile(
String fileName) throws IOException {
File file = new File(System.getProperty(
"test.cache.data", "build/test/cache"), fileName);
StringBuilder s = new StringBuilder();
try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
String line;
while ((line = reader.readLine()) != null) {
line = line.trim();
if (line.length() <= 0 || line.startsWith("#")) {
continue;
}
s.append(line);
s.append("\n");
}
}
return s.toString();
}
public static byte[] readFileAsBytes(FileSystem fs, Path fileName) throws IOException {
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
IOUtils.copyBytes(fs.open(fileName), os, 1024);

View File

@ -55,7 +55,6 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
@ -1015,28 +1014,12 @@ private String testCorruptionDetectorRun(int runNumber,
return output.toString();
}
private String readExpectedFile(String fileName) throws IOException {
File file = new File(System.getProperty(
"test.cache.data", "build/test/cache"), fileName);
BufferedReader reader = new BufferedReader(new FileReader(file));
String line;
StringBuilder s = new StringBuilder();
while ((line = reader.readLine()) != null) {
line = line.trim();
if (line.length() <= 0 || line.startsWith("#")) {
continue;
}
s.append(line);
s.append("\n");
}
return s.toString();
}
@Test
public void testCorruptionDetectionSingleFileCorruption() throws Exception {
List<Long> corruptions = Collections.singletonList(FILE_NODE_ID_1);
String result = testCorruptionDetectorRun(1, corruptions, "");
String expected = readExpectedFile("testSingleFileCorruption.csv");
String expected = DFSTestUtil.readResoucePlainFile(
"testSingleFileCorruption.csv");
assertEquals(expected, result);
result = testCorruptionDetectorRun(2, corruptions,
new FileSystemTestHelper().getTestRootDir() + "/corruption2.db");
@ -1048,7 +1031,8 @@ public void testCorruptionDetectionMultipleFileCorruption() throws Exception {
List<Long> corruptions = Arrays.asList(FILE_NODE_ID_1, FILE_NODE_ID_2,
FILE_NODE_ID_3);
String result = testCorruptionDetectorRun(3, corruptions, "");
String expected = readExpectedFile("testMultipleFileCorruption.csv");
String expected = DFSTestUtil.readResoucePlainFile(
"testMultipleFileCorruption.csv");
assertEquals(expected, result);
result = testCorruptionDetectorRun(4, corruptions,
new FileSystemTestHelper().getTestRootDir() + "/corruption4.db");
@ -1059,7 +1043,8 @@ public void testCorruptionDetectionMultipleFileCorruption() throws Exception {
public void testCorruptionDetectionSingleFolderCorruption() throws Exception {
List<Long> corruptions = Collections.singletonList(DIR_NODE_ID);
String result = testCorruptionDetectorRun(5, corruptions, "");
String expected = readExpectedFile("testSingleFolderCorruption.csv");
String expected = DFSTestUtil.readResoucePlainFile(
"testSingleFolderCorruption.csv");
assertEquals(expected, result);
result = testCorruptionDetectorRun(6, corruptions,
new FileSystemTestHelper().getTestRootDir() + "/corruption6.db");
@ -1071,7 +1056,8 @@ public void testCorruptionDetectionMultipleCorruption() throws Exception {
List<Long> corruptions = Arrays.asList(FILE_NODE_ID_1, FILE_NODE_ID_2,
FILE_NODE_ID_3, DIR_NODE_ID);
String result = testCorruptionDetectorRun(7, corruptions, "");
String expected = readExpectedFile("testMultipleCorruption.csv");
String expected = DFSTestUtil.readResoucePlainFile(
"testMultipleCorruption.csv");
assertEquals(expected, result);
result = testCorruptionDetectorRun(8, corruptions,
new FileSystemTestHelper().getTestRootDir() + "/corruption8.db");

View File

@ -0,0 +1,183 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.ALLSSD_STORAGE_POLICY_NAME;
import static org.junit.Assert.assertEquals;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
/**
* Tests OfflineImageViewer if the input fsimage has HDFS StoragePolicy entries.
*/
public class TestOfflineImageViewerForStoragePolicy {
private static final Logger LOG =
LoggerFactory.getLogger(TestOfflineImageViewerForStoragePolicy.class);
private static File originalFsimage = null;
private static File tempDir;
/**
* Create a populated namespace for later testing. Save its contents to a
* data structure and store its fsimage location.
*/
@BeforeClass
public static void createOriginalFSImage() throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, true);
File[] nnDirs = MiniDFSCluster.getNameNodeDirectory(
MiniDFSCluster.getBaseDirectory(), 0, 0);
tempDir = nnDirs[0];
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
DistributedFileSystem hdfs = cluster.getFileSystem();
Path dir = new Path("/dir_wo_sp");
hdfs.mkdirs(dir);
dir = new Path("/dir_wo_sp/sub_dir_wo_sp");
hdfs.mkdirs(dir);
dir = new Path("/dir_wo_sp/sub_dir_w_sp_allssd");
hdfs.mkdirs(dir);
hdfs.setStoragePolicy(dir, ALLSSD_STORAGE_POLICY_NAME);
Path file = new Path("/dir_wo_sp/file_wo_sp");
try (FSDataOutputStream o = hdfs.create(file)) {
o.write(123);
o.close();
}
file = new Path("/dir_wo_sp/file_w_sp_allssd");
try (FSDataOutputStream o = hdfs.create(file)) {
o.write(123);
o.close();
hdfs.setStoragePolicy(file, HdfsConstants.ALLSSD_STORAGE_POLICY_NAME);
}
dir = new Path("/dir_w_sp_allssd");
hdfs.mkdirs(dir);
hdfs.setStoragePolicy(dir, HdfsConstants.ALLSSD_STORAGE_POLICY_NAME);
dir = new Path("/dir_w_sp_allssd/sub_dir_wo_sp");
hdfs.mkdirs(dir);
file = new Path("/dir_w_sp_allssd/file_wo_sp");
try (FSDataOutputStream o = hdfs.create(file)) {
o.write(123);
o.close();
}
dir = new Path("/dir_w_sp_allssd/sub_dir_w_sp_hot");
hdfs.mkdirs(dir);
hdfs.setStoragePolicy(dir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
// Write results to the fsimage file
hdfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER, false);
hdfs.saveNamespace();
// Determine the location of the fsimage file
originalFsimage = FSImageTestUtil.findLatestImageFile(FSImageTestUtil
.getFSImage(cluster.getNameNode()).getStorage().getStorageDir(0));
if (originalFsimage == null) {
throw new RuntimeException("Didn't generate or can't find fsimage");
}
LOG.debug("original FS image file is " + originalFsimage);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@AfterClass
public static void deleteOriginalFSImage() throws IOException {
if (originalFsimage != null && originalFsimage.exists()) {
originalFsimage.delete();
}
}
@Test
public void testPBDelimitedWriterForStoragePolicy() throws Exception {
String expected = DFSTestUtil.readResoucePlainFile(
"testStoragePolicy.csv");
String result = readStoragePolicyFromFsimageFile();
assertEquals(expected, result);
}
private String readStoragePolicyFromFsimageFile() throws Exception {
StringBuilder builder = new StringBuilder();
ByteArrayOutputStream output = new ByteArrayOutputStream();
String delemiter = "\t";
File delimitedOutput = new File(tempDir, "delimitedOutput");
if (OfflineImageViewerPB.run(new String[] {"-p", "Delimited",
"-i", originalFsimage.getAbsolutePath(),
"-o", delimitedOutput.getAbsolutePath(),
"-sp"}) != 0) {
throw new IOException("oiv returned failure creating " +
"delimited output with sp.");
}
try (InputStream input = new FileInputStream(delimitedOutput);
BufferedReader reader =
new BufferedReader(new InputStreamReader(input))) {
String line;
boolean header = true;
while ((line = reader.readLine()) != null) {
String[] fields = line.split(delemiter);
if (!header) {
String path = fields[0];
int storagePolicy = Integer.parseInt(fields[12]);
builder.append(path).append(",").append(storagePolicy).append("\n");
}
header = false;
}
}
return builder.toString();
}
}

View File

@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#dir,storage policy
/,0
/dir_wo_sp,0
/dir_wo_sp/sub_dir_wo_sp,0
/dir_wo_sp/sub_dir_w_sp_allssd,12
/dir_wo_sp/file_wo_sp,0
/dir_wo_sp/file_w_sp_allssd,12
/dir_w_sp_allssd,12
/dir_w_sp_allssd/sub_dir_wo_sp,0
/dir_w_sp_allssd/file_wo_sp,0
/dir_w_sp_allssd/sub_dir_w_sp_hot,7
Can't render this file because it contains an unexpected character in line 5 and column 8.