HDFS-16286. Add a debug tool to verify the correctness of erasure coding on file (#3593)

(cherry picked from commit a21895a5b3)
This commit is contained in:
daimin 2021-11-04 04:19:56 +08:00 committed by S O'Donnell
parent 9cf841b1a6
commit 2844b98558
3 changed files with 350 additions and 8 deletions

View File

@ -24,15 +24,41 @@
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.impl.BlockReaderRemote;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
@ -69,6 +95,7 @@ public class DebugAdmin extends Configured implements Tool {
new VerifyMetaCommand(),
new ComputeMetaCommand(),
new RecoverLeaseCommand(),
new VerifyECCommand(),
new HelpCommand()
};
@ -387,6 +414,209 @@ int run(List<String> args) throws IOException {
}
}
/**
* The command for verifying the correctness of erasure coding on an erasure coded file.
*/
private class VerifyECCommand extends DebugCommand {
private DFSClient client;
private int dataBlkNum;
private int parityBlkNum;
private int cellSize;
private boolean useDNHostname;
private CachingStrategy cachingStrategy;
private int stripedReadBufferSize;
private CompletionService<Integer> readService;
private RawErasureEncoder encoder;
private BlockReader[] blockReaders;
VerifyECCommand() {
super("verifyEC",
"verifyEC -file <file>",
" Verify HDFS erasure coding on all block groups of the file.");
}
int run(List<String> args) throws IOException {
if (args.size() < 2) {
System.out.println(usageText);
System.out.println(helpText + System.lineSeparator());
return 1;
}
String file = StringUtils.popOptionWithArgument("-file", args);
Path path = new Path(file);
DistributedFileSystem dfs = AdminHelper.getDFS(getConf());
this.client = dfs.getClient();
FileStatus fileStatus;
try {
fileStatus = dfs.getFileStatus(path);
} catch (FileNotFoundException e) {
System.err.println("File " + file + " does not exist.");
return 1;
}
if (!fileStatus.isFile()) {
System.err.println("File " + file + " is not a regular file.");
return 1;
}
if (!dfs.isFileClosed(path)) {
System.err.println("File " + file + " is not closed.");
return 1;
}
this.useDNHostname = getConf().getBoolean(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
this.cachingStrategy = CachingStrategy.newDefaultStrategy();
this.stripedReadBufferSize = getConf().getInt(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT);
LocatedBlocks locatedBlocks = client.getLocatedBlocks(file, 0, fileStatus.getLen());
if (locatedBlocks.getErasureCodingPolicy() == null) {
System.err.println("File " + file + " is not erasure coded.");
return 1;
}
ErasureCodingPolicy ecPolicy = locatedBlocks.getErasureCodingPolicy();
this.dataBlkNum = ecPolicy.getNumDataUnits();
this.parityBlkNum = ecPolicy.getNumParityUnits();
this.cellSize = ecPolicy.getCellSize();
this.encoder = CodecUtil.createRawEncoder(getConf(), ecPolicy.getCodecName(),
new ErasureCoderOptions(
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits()));
int blockNum = dataBlkNum + parityBlkNum;
this.readService = new ExecutorCompletionService<>(
DFSUtilClient.getThreadPoolExecutor(blockNum, blockNum, 60,
new LinkedBlockingQueue<>(), "read-", false));
this.blockReaders = new BlockReader[dataBlkNum + parityBlkNum];
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
System.out.println("Checking EC block group: blk_" + locatedBlock.getBlock().getBlockId());
LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock;
try {
verifyBlockGroup(blockGroup);
System.out.println("Status: OK");
} catch (Exception e) {
System.err.println("Status: ERROR, message: " + e.getMessage());
return 1;
} finally {
closeBlockReaders();
}
}
System.out.println("\nAll EC block group status: OK");
return 0;
}
private void verifyBlockGroup(LocatedStripedBlock blockGroup) throws Exception {
final LocatedBlock[] indexedBlocks = StripedBlockUtil.parseStripedBlockGroup(blockGroup,
cellSize, dataBlkNum, parityBlkNum);
int blockNumExpected = Math.min(dataBlkNum,
(int) ((blockGroup.getBlockSize() - 1) / cellSize + 1)) + parityBlkNum;
if (blockGroup.getBlockIndices().length < blockNumExpected) {
throw new Exception("Block group is under-erasure-coded.");
}
long maxBlockLen = 0L;
DataChecksum checksum = null;
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
LocatedBlock block = indexedBlocks[i];
if (block == null) {
blockReaders[i] = null;
continue;
}
if (block.getBlockSize() > maxBlockLen) {
maxBlockLen = block.getBlockSize();
}
BlockReader blockReader = createBlockReader(block.getBlock(),
block.getLocations()[0], block.getBlockToken());
if (checksum == null) {
checksum = blockReader.getDataChecksum();
} else {
assert checksum.equals(blockReader.getDataChecksum());
}
blockReaders[i] = blockReader;
}
assert checksum != null;
int bytesPerChecksum = checksum.getBytesPerChecksum();
int bufferSize = stripedReadBufferSize < bytesPerChecksum ? bytesPerChecksum :
stripedReadBufferSize - stripedReadBufferSize % bytesPerChecksum;
final ByteBuffer[] buffers = new ByteBuffer[dataBlkNum + parityBlkNum];
final ByteBuffer[] outputs = new ByteBuffer[parityBlkNum];
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
buffers[i] = ByteBuffer.allocate(bufferSize);
}
for (int i = 0; i < parityBlkNum; i++) {
outputs[i] = ByteBuffer.allocate(bufferSize);
}
long positionInBlock = 0L;
while (positionInBlock < maxBlockLen) {
final int toVerifyLen = (int) Math.min(bufferSize, maxBlockLen - positionInBlock);
List<Future<Integer>> futures = new ArrayList<>(dataBlkNum + parityBlkNum);
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
final int fi = i;
futures.add(this.readService.submit(() -> {
BlockReader blockReader = blockReaders[fi];
ByteBuffer buffer = buffers[fi];
buffer.clear();
buffer.limit(toVerifyLen);
int readLen = 0;
if (blockReader != null) {
int toRead = buffer.remaining();
while (readLen < toRead) {
int nread = blockReader.read(buffer);
if (nread <= 0) {
break;
}
readLen += nread;
}
}
while (buffer.hasRemaining()) {
buffer.put((byte) 0);
}
buffer.flip();
return readLen;
}));
}
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
futures.get(i).get(1, TimeUnit.MINUTES);
}
ByteBuffer[] inputs = new ByteBuffer[dataBlkNum];
System.arraycopy(buffers, 0, inputs, 0, dataBlkNum);
for (int i = 0; i < parityBlkNum; i++) {
outputs[i].clear();
outputs[i].limit(toVerifyLen);
}
this.encoder.encode(inputs, outputs);
for (int i = 0; i < parityBlkNum; i++) {
if (!buffers[dataBlkNum + i].equals(outputs[i])) {
throw new Exception("EC compute result not match.");
}
}
positionInBlock += toVerifyLen;
}
}
private BlockReader createBlockReader(ExtendedBlock block, DatanodeInfo dnInfo,
Token<BlockTokenIdentifier> token) throws IOException {
InetSocketAddress dnAddress = NetUtils.createSocketAddr(dnInfo.getXferAddr(useDNHostname));
Peer peer = client.newConnectedPeer(dnAddress, token, dnInfo);
return BlockReaderRemote.newBlockReader(
"dummy", block, token, 0,
block.getNumBytes(), true, "", peer, dnInfo,
null, cachingStrategy, -1, getConf());
}
private void closeBlockReaders() {
for (int i = 0; i < blockReaders.length; i++) {
if (blockReaders[i] != null) {
IOUtils.closeStream(blockReaders[i]);
blockReaders[i] = null;
}
}
}
}
/**
* The command for getting help about other commands.
*/

View File

@ -697,6 +697,16 @@ Usage: `hdfs debug recoverLease -path <path> [-retries <num-retries>]`
Recover the lease on the specified path. The path must reside on an HDFS file system. The default number of retries is 1.
### `verifyEC`
Usage: `hdfs debug verifyEC -file <file>`
| COMMAND\_OPTION | Description |
|:---- |:---- |
| [`-file` *EC-file*] | HDFS EC file to be verified. |
Verify the correctness of erasure coding on an erasure coded file.
dfsadmin with ViewFsOverloadScheme
----------------------------------

View File

@ -17,15 +17,22 @@
*/
package org.apache.hadoop.hdfs.tools;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
@ -34,6 +41,8 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.util.List;
import java.util.Random;
import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil.*;
import static org.junit.Assert.assertEquals;
@ -44,23 +53,16 @@ public class TestDebugAdmin {
static private final String TEST_ROOT_DIR =
new File(System.getProperty("test.build.data", "/tmp"),
TestDebugAdmin.class.getSimpleName()).getAbsolutePath();
private Configuration conf = new Configuration();
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private DebugAdmin admin;
private DataNode datanode;
@Before
public void setUp() throws Exception {
final File testRoot = new File(TEST_ROOT_DIR);
testRoot.delete();
testRoot.mkdirs();
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
admin = new DebugAdmin(conf);
datanode = cluster.getDataNodes().get(0);
}
@After
@ -92,8 +94,11 @@ private String runCmd(String[] cmd) throws Exception {
@Test(timeout = 60000)
public void testRecoverLease() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
assertEquals("ret: 1, You must supply a -path argument to recoverLease.",
runCmd(new String[]{"recoverLease", "-retries", "1"}));
DistributedFileSystem fs = cluster.getFileSystem();
FSDataOutputStream out = fs.create(new Path("/foo"));
out.write(123);
out.close();
@ -103,6 +108,10 @@ public void testRecoverLease() throws Exception {
@Test(timeout = 60000)
public void testVerifyMetaCommand() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
DataNode datanode = cluster.getDataNodes().get(0);
DFSTestUtil.createFile(fs, new Path("/bar"), 1234, (short) 1, 0xdeadbeef);
FsDatasetSpi<?> fsd = datanode.getFSDataset();
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/bar"));
@ -128,6 +137,10 @@ public void testVerifyMetaCommand() throws Exception {
@Test(timeout = 60000)
public void testComputeMetaCommand() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
DataNode datanode = cluster.getDataNodes().get(0);
DFSTestUtil.createFile(fs, new Path("/bar"), 1234, (short) 1, 0xdeadbeef);
FsDatasetSpi<?> fsd = datanode.getFSDataset();
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/bar"));
@ -166,8 +179,97 @@ public void testComputeMetaCommand() throws Exception {
@Test(timeout = 60000)
public void testRecoverLeaseforFileNotFound() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
assertTrue(runCmd(new String[] {
"recoverLease", "-path", "/foo", "-retries", "2" }).contains(
"Giving up on recoverLease for /foo after 1 try"));
}
@Test(timeout = 60000)
public void testVerifyECCommand() throws Exception {
final ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getByID(
SystemErasureCodingPolicies.RS_3_2_POLICY_ID);
cluster = DFSTestUtil.setupCluster(conf, 6, 5, 0);
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
assertEquals("ret: 1, verifyEC -file <file> Verify HDFS erasure coding on " +
"all block groups of the file.", runCmd(new String[]{"verifyEC"}));
assertEquals("ret: 1, File /bar does not exist.",
runCmd(new String[]{"verifyEC", "-file", "/bar"}));
fs.create(new Path("/bar")).close();
assertEquals("ret: 1, File /bar is not erasure coded.",
runCmd(new String[]{"verifyEC", "-file", "/bar"}));
final Path ecDir = new Path("/ec");
fs.mkdir(ecDir, FsPermission.getDirDefault());
fs.enableErasureCodingPolicy(ecPolicy.getName());
fs.setErasureCodingPolicy(ecDir, ecPolicy.getName());
assertEquals("ret: 1, File /ec is not a regular file.",
runCmd(new String[]{"verifyEC", "-file", "/ec"}));
fs.create(new Path(ecDir, "foo"));
assertEquals("ret: 1, File /ec/foo is not closed.",
runCmd(new String[]{"verifyEC", "-file", "/ec/foo"}));
final short repl = 1;
final long k = 1024;
final long m = k * k;
final long seed = 0x1234567L;
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_65535"), 65535, repl, seed);
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_65535"})
.contains("All EC block group status: OK"));
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_256k"), 256 * k, repl, seed);
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_256k"})
.contains("All EC block group status: OK"));
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_1m"), m, repl, seed);
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_1m"})
.contains("All EC block group status: OK"));
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_2m"), 2 * m, repl, seed);
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_2m"})
.contains("All EC block group status: OK"));
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_3m"), 3 * m, repl, seed);
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_3m"})
.contains("All EC block group status: OK"));
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_5m"), 5 * m, repl, seed);
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_5m"})
.contains("All EC block group status: OK"));
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_6m"), (int) k, 6 * m, m, repl, seed);
assertEquals("ret: 0, Checking EC block group: blk_x;Status: OK" +
"Checking EC block group: blk_x;Status: OK" +
"All EC block group status: OK",
runCmd(new String[]{"verifyEC", "-file", "/ec/foo_6m"})
.replaceAll("blk_-[0-9]+", "blk_x;"));
Path corruptFile = new Path(ecDir, "foo_corrupt");
DFSTestUtil.createFile(fs, corruptFile, 5841961, repl, seed);
List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs, corruptFile);
assertEquals(1, blocks.size());
LocatedStripedBlock blockGroup = (LocatedStripedBlock) blocks.get(0);
LocatedBlock[] indexedBlocks = StripedBlockUtil.parseStripedBlockGroup(blockGroup,
ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
// Try corrupt block 0 in block group.
LocatedBlock toCorruptLocatedBlock = indexedBlocks[0];
ExtendedBlock toCorruptBlock = toCorruptLocatedBlock.getBlock();
DataNode datanode = cluster.getDataNode(toCorruptLocatedBlock.getLocations()[0].getIpcPort());
File blockFile = getBlockFile(datanode.getFSDataset(),
toCorruptBlock.getBlockPoolId(), toCorruptBlock.getLocalBlock());
File metaFile = getMetaFile(datanode.getFSDataset(),
toCorruptBlock.getBlockPoolId(), toCorruptBlock.getLocalBlock());
// Write error bytes to block file and re-generate meta checksum.
byte[] errorBytes = new byte[2097152];
new Random(seed).nextBytes(errorBytes);
FileUtils.writeByteArrayToFile(blockFile, errorBytes);
metaFile.delete();
runCmd(new String[]{"computeMeta", "-block", blockFile.getAbsolutePath(),
"-out", metaFile.getAbsolutePath()});
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_corrupt"})
.contains("Status: ERROR, message: EC compute result not match."));
}
}