From 2844b9855876a5caeb84160487431d730a9eb661 Mon Sep 17 00:00:00 2001 From: daimin Date: Thu, 4 Nov 2021 04:19:56 +0800 Subject: [PATCH] HDFS-16286. Add a debug tool to verify the correctness of erasure coding on file (#3593) (cherry picked from commit a21895a5b3644944fe04cf558d593b96da0263fd) --- .../apache/hadoop/hdfs/tools/DebugAdmin.java | 230 ++++++++++++++++++ .../src/site/markdown/HDFSCommands.md | 10 + .../hadoop/hdfs/tools/TestDebugAdmin.java | 118 ++++++++- 3 files changed, 350 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java index efe355d1b0..32e8248adc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java @@ -24,15 +24,41 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.client.impl.BlockReaderRemote; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Uninterruptibles; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; @@ -69,6 +95,7 @@ public class DebugAdmin extends Configured implements Tool { new VerifyMetaCommand(), new ComputeMetaCommand(), new RecoverLeaseCommand(), + new VerifyECCommand(), new HelpCommand() }; @@ -387,6 +414,209 @@ int run(List args) throws IOException { } } + /** + * The command for verifying the correctness of erasure coding on an erasure coded file. + */ + private class VerifyECCommand extends DebugCommand { + private DFSClient client; + private int dataBlkNum; + private int parityBlkNum; + private int cellSize; + private boolean useDNHostname; + private CachingStrategy cachingStrategy; + private int stripedReadBufferSize; + private CompletionService readService; + private RawErasureEncoder encoder; + private BlockReader[] blockReaders; + + + VerifyECCommand() { + super("verifyEC", + "verifyEC -file ", + " Verify HDFS erasure coding on all block groups of the file."); + } + + int run(List args) throws IOException { + if (args.size() < 2) { + System.out.println(usageText); + System.out.println(helpText + System.lineSeparator()); + return 1; + } + String file = StringUtils.popOptionWithArgument("-file", args); + Path path = new Path(file); + DistributedFileSystem dfs = AdminHelper.getDFS(getConf()); + this.client = dfs.getClient(); + + FileStatus fileStatus; + try { + fileStatus = dfs.getFileStatus(path); + } catch (FileNotFoundException e) { + System.err.println("File " + file + " does not exist."); + return 1; + } + + if (!fileStatus.isFile()) { + System.err.println("File " + file + " is not a regular file."); + return 1; + } + if (!dfs.isFileClosed(path)) { + System.err.println("File " + file + " is not closed."); + return 1; + } + this.useDNHostname = getConf().getBoolean(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, + DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); + this.cachingStrategy = CachingStrategy.newDefaultStrategy(); + this.stripedReadBufferSize = getConf().getInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT); + + LocatedBlocks locatedBlocks = client.getLocatedBlocks(file, 0, fileStatus.getLen()); + if (locatedBlocks.getErasureCodingPolicy() == null) { + System.err.println("File " + file + " is not erasure coded."); + return 1; + } + ErasureCodingPolicy ecPolicy = locatedBlocks.getErasureCodingPolicy(); + this.dataBlkNum = ecPolicy.getNumDataUnits(); + this.parityBlkNum = ecPolicy.getNumParityUnits(); + this.cellSize = ecPolicy.getCellSize(); + this.encoder = CodecUtil.createRawEncoder(getConf(), ecPolicy.getCodecName(), + new ErasureCoderOptions( + ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits())); + int blockNum = dataBlkNum + parityBlkNum; + this.readService = new ExecutorCompletionService<>( + DFSUtilClient.getThreadPoolExecutor(blockNum, blockNum, 60, + new LinkedBlockingQueue<>(), "read-", false)); + this.blockReaders = new BlockReader[dataBlkNum + parityBlkNum]; + + for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { + System.out.println("Checking EC block group: blk_" + locatedBlock.getBlock().getBlockId()); + LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock; + + try { + verifyBlockGroup(blockGroup); + System.out.println("Status: OK"); + } catch (Exception e) { + System.err.println("Status: ERROR, message: " + e.getMessage()); + return 1; + } finally { + closeBlockReaders(); + } + } + System.out.println("\nAll EC block group status: OK"); + return 0; + } + + private void verifyBlockGroup(LocatedStripedBlock blockGroup) throws Exception { + final LocatedBlock[] indexedBlocks = StripedBlockUtil.parseStripedBlockGroup(blockGroup, + cellSize, dataBlkNum, parityBlkNum); + + int blockNumExpected = Math.min(dataBlkNum, + (int) ((blockGroup.getBlockSize() - 1) / cellSize + 1)) + parityBlkNum; + if (blockGroup.getBlockIndices().length < blockNumExpected) { + throw new Exception("Block group is under-erasure-coded."); + } + + long maxBlockLen = 0L; + DataChecksum checksum = null; + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { + LocatedBlock block = indexedBlocks[i]; + if (block == null) { + blockReaders[i] = null; + continue; + } + if (block.getBlockSize() > maxBlockLen) { + maxBlockLen = block.getBlockSize(); + } + BlockReader blockReader = createBlockReader(block.getBlock(), + block.getLocations()[0], block.getBlockToken()); + if (checksum == null) { + checksum = blockReader.getDataChecksum(); + } else { + assert checksum.equals(blockReader.getDataChecksum()); + } + blockReaders[i] = blockReader; + } + assert checksum != null; + int bytesPerChecksum = checksum.getBytesPerChecksum(); + int bufferSize = stripedReadBufferSize < bytesPerChecksum ? bytesPerChecksum : + stripedReadBufferSize - stripedReadBufferSize % bytesPerChecksum; + final ByteBuffer[] buffers = new ByteBuffer[dataBlkNum + parityBlkNum]; + final ByteBuffer[] outputs = new ByteBuffer[parityBlkNum]; + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { + buffers[i] = ByteBuffer.allocate(bufferSize); + } + for (int i = 0; i < parityBlkNum; i++) { + outputs[i] = ByteBuffer.allocate(bufferSize); + } + long positionInBlock = 0L; + while (positionInBlock < maxBlockLen) { + final int toVerifyLen = (int) Math.min(bufferSize, maxBlockLen - positionInBlock); + List> futures = new ArrayList<>(dataBlkNum + parityBlkNum); + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { + final int fi = i; + futures.add(this.readService.submit(() -> { + BlockReader blockReader = blockReaders[fi]; + ByteBuffer buffer = buffers[fi]; + buffer.clear(); + buffer.limit(toVerifyLen); + int readLen = 0; + if (blockReader != null) { + int toRead = buffer.remaining(); + while (readLen < toRead) { + int nread = blockReader.read(buffer); + if (nread <= 0) { + break; + } + readLen += nread; + } + } + while (buffer.hasRemaining()) { + buffer.put((byte) 0); + } + buffer.flip(); + return readLen; + })); + } + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { + futures.get(i).get(1, TimeUnit.MINUTES); + } + ByteBuffer[] inputs = new ByteBuffer[dataBlkNum]; + System.arraycopy(buffers, 0, inputs, 0, dataBlkNum); + for (int i = 0; i < parityBlkNum; i++) { + outputs[i].clear(); + outputs[i].limit(toVerifyLen); + } + this.encoder.encode(inputs, outputs); + for (int i = 0; i < parityBlkNum; i++) { + if (!buffers[dataBlkNum + i].equals(outputs[i])) { + throw new Exception("EC compute result not match."); + } + } + positionInBlock += toVerifyLen; + } + } + + private BlockReader createBlockReader(ExtendedBlock block, DatanodeInfo dnInfo, + Token token) throws IOException { + InetSocketAddress dnAddress = NetUtils.createSocketAddr(dnInfo.getXferAddr(useDNHostname)); + Peer peer = client.newConnectedPeer(dnAddress, token, dnInfo); + return BlockReaderRemote.newBlockReader( + "dummy", block, token, 0, + block.getNumBytes(), true, "", peer, dnInfo, + null, cachingStrategy, -1, getConf()); + } + + private void closeBlockReaders() { + for (int i = 0; i < blockReaders.length; i++) { + if (blockReaders[i] != null) { + IOUtils.closeStream(blockReaders[i]); + blockReaders[i] = null; + } + } + } + + } + /** * The command for getting help about other commands. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index c24cc7ae46..82a5af9deb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -697,6 +697,16 @@ Usage: `hdfs debug recoverLease -path [-retries ]` Recover the lease on the specified path. The path must reside on an HDFS file system. The default number of retries is 1. +### `verifyEC` + +Usage: `hdfs debug verifyEC -file ` + +| COMMAND\_OPTION | Description | +|:---- |:---- | +| [`-file` *EC-file*] | HDFS EC file to be verified. | + +Verify the correctness of erasure coding on an erasure coded file. + dfsadmin with ViewFsOverloadScheme ---------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java index 5c890a5481..8dd303d84d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java @@ -17,15 +17,22 @@ */ package org.apache.hadoop.hdfs.tools; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.IOUtils; import org.junit.After; import org.junit.Before; @@ -34,6 +41,8 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.PrintStream; +import java.util.List; +import java.util.Random; import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil.*; import static org.junit.Assert.assertEquals; @@ -44,23 +53,16 @@ public class TestDebugAdmin { static private final String TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp"), TestDebugAdmin.class.getSimpleName()).getAbsolutePath(); - + private Configuration conf = new Configuration(); private MiniDFSCluster cluster; - private DistributedFileSystem fs; private DebugAdmin admin; - private DataNode datanode; @Before public void setUp() throws Exception { final File testRoot = new File(TEST_ROOT_DIR); testRoot.delete(); testRoot.mkdirs(); - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); admin = new DebugAdmin(conf); - datanode = cluster.getDataNodes().get(0); } @After @@ -92,8 +94,11 @@ private String runCmd(String[] cmd) throws Exception { @Test(timeout = 60000) public void testRecoverLease() throws Exception { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); assertEquals("ret: 1, You must supply a -path argument to recoverLease.", runCmd(new String[]{"recoverLease", "-retries", "1"})); + DistributedFileSystem fs = cluster.getFileSystem(); FSDataOutputStream out = fs.create(new Path("/foo")); out.write(123); out.close(); @@ -103,6 +108,10 @@ public void testRecoverLease() throws Exception { @Test(timeout = 60000) public void testVerifyMetaCommand() throws Exception { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + DataNode datanode = cluster.getDataNodes().get(0); DFSTestUtil.createFile(fs, new Path("/bar"), 1234, (short) 1, 0xdeadbeef); FsDatasetSpi fsd = datanode.getFSDataset(); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/bar")); @@ -128,6 +137,10 @@ public void testVerifyMetaCommand() throws Exception { @Test(timeout = 60000) public void testComputeMetaCommand() throws Exception { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + DataNode datanode = cluster.getDataNodes().get(0); DFSTestUtil.createFile(fs, new Path("/bar"), 1234, (short) 1, 0xdeadbeef); FsDatasetSpi fsd = datanode.getFSDataset(); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/bar")); @@ -166,8 +179,97 @@ public void testComputeMetaCommand() throws Exception { @Test(timeout = 60000) public void testRecoverLeaseforFileNotFound() throws Exception { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); assertTrue(runCmd(new String[] { "recoverLease", "-path", "/foo", "-retries", "2" }).contains( "Giving up on recoverLease for /foo after 1 try")); } + + @Test(timeout = 60000) + public void testVerifyECCommand() throws Exception { + final ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getByID( + SystemErasureCodingPolicies.RS_3_2_POLICY_ID); + cluster = DFSTestUtil.setupCluster(conf, 6, 5, 0); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + + assertEquals("ret: 1, verifyEC -file Verify HDFS erasure coding on " + + "all block groups of the file.", runCmd(new String[]{"verifyEC"})); + + assertEquals("ret: 1, File /bar does not exist.", + runCmd(new String[]{"verifyEC", "-file", "/bar"})); + + fs.create(new Path("/bar")).close(); + assertEquals("ret: 1, File /bar is not erasure coded.", + runCmd(new String[]{"verifyEC", "-file", "/bar"})); + + + final Path ecDir = new Path("/ec"); + fs.mkdir(ecDir, FsPermission.getDirDefault()); + fs.enableErasureCodingPolicy(ecPolicy.getName()); + fs.setErasureCodingPolicy(ecDir, ecPolicy.getName()); + + assertEquals("ret: 1, File /ec is not a regular file.", + runCmd(new String[]{"verifyEC", "-file", "/ec"})); + + fs.create(new Path(ecDir, "foo")); + assertEquals("ret: 1, File /ec/foo is not closed.", + runCmd(new String[]{"verifyEC", "-file", "/ec/foo"})); + + final short repl = 1; + final long k = 1024; + final long m = k * k; + final long seed = 0x1234567L; + DFSTestUtil.createFile(fs, new Path(ecDir, "foo_65535"), 65535, repl, seed); + assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_65535"}) + .contains("All EC block group status: OK")); + DFSTestUtil.createFile(fs, new Path(ecDir, "foo_256k"), 256 * k, repl, seed); + assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_256k"}) + .contains("All EC block group status: OK")); + DFSTestUtil.createFile(fs, new Path(ecDir, "foo_1m"), m, repl, seed); + assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_1m"}) + .contains("All EC block group status: OK")); + DFSTestUtil.createFile(fs, new Path(ecDir, "foo_2m"), 2 * m, repl, seed); + assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_2m"}) + .contains("All EC block group status: OK")); + DFSTestUtil.createFile(fs, new Path(ecDir, "foo_3m"), 3 * m, repl, seed); + assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_3m"}) + .contains("All EC block group status: OK")); + DFSTestUtil.createFile(fs, new Path(ecDir, "foo_5m"), 5 * m, repl, seed); + assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_5m"}) + .contains("All EC block group status: OK")); + DFSTestUtil.createFile(fs, new Path(ecDir, "foo_6m"), (int) k, 6 * m, m, repl, seed); + assertEquals("ret: 0, Checking EC block group: blk_x;Status: OK" + + "Checking EC block group: blk_x;Status: OK" + + "All EC block group status: OK", + runCmd(new String[]{"verifyEC", "-file", "/ec/foo_6m"}) + .replaceAll("blk_-[0-9]+", "blk_x;")); + + Path corruptFile = new Path(ecDir, "foo_corrupt"); + DFSTestUtil.createFile(fs, corruptFile, 5841961, repl, seed); + List blocks = DFSTestUtil.getAllBlocks(fs, corruptFile); + assertEquals(1, blocks.size()); + LocatedStripedBlock blockGroup = (LocatedStripedBlock) blocks.get(0); + LocatedBlock[] indexedBlocks = StripedBlockUtil.parseStripedBlockGroup(blockGroup, + ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits()); + // Try corrupt block 0 in block group. + LocatedBlock toCorruptLocatedBlock = indexedBlocks[0]; + ExtendedBlock toCorruptBlock = toCorruptLocatedBlock.getBlock(); + DataNode datanode = cluster.getDataNode(toCorruptLocatedBlock.getLocations()[0].getIpcPort()); + File blockFile = getBlockFile(datanode.getFSDataset(), + toCorruptBlock.getBlockPoolId(), toCorruptBlock.getLocalBlock()); + File metaFile = getMetaFile(datanode.getFSDataset(), + toCorruptBlock.getBlockPoolId(), toCorruptBlock.getLocalBlock()); + // Write error bytes to block file and re-generate meta checksum. + byte[] errorBytes = new byte[2097152]; + new Random(seed).nextBytes(errorBytes); + FileUtils.writeByteArrayToFile(blockFile, errorBytes); + metaFile.delete(); + runCmd(new String[]{"computeMeta", "-block", blockFile.getAbsolutePath(), + "-out", metaFile.getAbsolutePath()}); + assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_corrupt"}) + .contains("Status: ERROR, message: EC compute result not match.")); + } + }