HDFS-8308. Erasure Coding: NameNode may get blocked in waitForLoadingFSImage() when loading editlog. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2015-04-30 19:42:29 -07:00 committed by Zhe Zhang
parent 1a31f1c303
commit 5059958bf4
6 changed files with 52 additions and 37 deletions

View File

@ -152,3 +152,6 @@
HDFS-8183. Erasure Coding: Improve DFSStripedOutputStream closing of HDFS-8183. Erasure Coding: Improve DFSStripedOutputStream closing of
datastreamer threads. (Rakesh R via Zhe Zhang) datastreamer threads. (Rakesh R via Zhe Zhang)
HDFS-8308. Erasure Coding: NameNode may get blocked in waitForLoadingFSImage()
when loading editlog. (jing9)

View File

@ -79,7 +79,8 @@ ECZoneInfo getECZoneInfo(INodesInPath iip) throws IOException {
for (XAttr xAttr : xAttrs) { for (XAttr xAttr : xAttrs) {
if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixName(xAttr))) { if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixName(xAttr))) {
String schemaName = new String(xAttr.getValue()); String schemaName = new String(xAttr.getValue());
ECSchema schema = dir.getFSNamesystem().getECSchema(schemaName); ECSchema schema = dir.getFSNamesystem().getSchemaManager()
.getSchema(schemaName);
return new ECZoneInfo(inode.getFullPathName(), schema); return new ECZoneInfo(inode.getFullPathName(), schema);
} }
} }

View File

@ -7529,9 +7529,9 @@ BatchedListEntries<EncryptionZone> listEncryptionZones(long prevId)
/** /**
* Create an erasure coding zone on directory src. * Create an erasure coding zone on directory src.
* @param schema ECSchema for the erasure coding zone * @param srcArg the path of a directory which will be the root of the
* @param src the path of a directory which will be the root of the
* erasure coding zone. The directory must be empty. * erasure coding zone. The directory must be empty.
* @param schema ECSchema for the erasure coding zone
* *
* @throws AccessControlException if the caller is not the superuser. * @throws AccessControlException if the caller is not the superuser.
* @throws UnresolvedLinkException if the path can't be resolved. * @throws UnresolvedLinkException if the path can't be resolved.

View File

@ -1948,4 +1948,16 @@ public static Block addStripedBlockToFile(List<DataNode> dataNodes,
lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS); lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS);
return lastBlock; return lastBlock;
} }
/**
* Because currently DFSStripedOutputStream does not support hflush/hsync,
* tests can use this method to flush all the buffered data to DataNodes.
*/
public static void writeAndFlushStripedOutputStream(
DFSStripedOutputStream out, int chunkSize) throws IOException {
// FSOutputSummer.BUFFER_NUM_CHUNKS == 9
byte[] toWrite = new byte[chunkSize * 9 + 1];
out.write(toWrite);
out.flushInternal();
}
} }

View File

@ -35,8 +35,6 @@
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class TestErasureCodingZones { public class TestErasureCodingZones {
private final int NUM_OF_DATANODES = 3;
private Configuration conf;
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private DistributedFileSystem fs; private DistributedFileSystem fs;
private static final int BLOCK_SIZE = 1024; private static final int BLOCK_SIZE = 1024;
@ -44,10 +42,10 @@ public class TestErasureCodingZones {
@Before @Before
public void setupCluster() throws IOException { public void setupCluster() throws IOException {
conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
cluster = new MiniDFSCluster.Builder(conf). cluster = new MiniDFSCluster.Builder(conf).
numDataNodes(NUM_OF_DATANODES).build(); numDataNodes(1).build();
cluster.waitActive(); cluster.waitActive();
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
namesystem = cluster.getNamesystem(); namesystem = cluster.getNamesystem();

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSStripedOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -55,6 +56,7 @@
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -103,36 +105,31 @@ public void testAllocateBlockId() throws Exception {
Assert.assertEquals(firstId + HdfsConstants.MAX_BLOCKS_IN_GROUP, secondId); Assert.assertEquals(firstId + HdfsConstants.MAX_BLOCKS_IN_GROUP, secondId);
} }
@Test @Test (timeout=60000)
public void testAddStripedBlock() throws Exception { public void testAddStripedBlock() throws Exception {
final Path file = new Path("/file1"); final Path file = new Path("/file1");
// create an empty file // create an empty file
FSDataOutputStream out = null; FSDataOutputStream out = null;
try { try {
out = dfs.create(file, (short) 1); out = dfs.create(file, (short) 1);
DFSTestUtil.writeAndFlushStripedOutputStream(
(DFSStripedOutputStream) out.getWrappedStream(),
DFS_BYTES_PER_CHECKSUM_DEFAULT);
FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile(); INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
LocatedBlock newBlock = cluster.getNamesystem().getAdditionalBlock(
file.toString(), fileNode.getId(), dfs.getClient().getClientName(),
null, null, null);
assertEquals(GROUP_SIZE, newBlock.getLocations().length);
assertEquals(GROUP_SIZE, newBlock.getStorageIDs().length);
BlockInfo[] blocks = fileNode.getBlocks(); BlockInfo[] blocks = fileNode.getBlocks();
assertEquals(1, blocks.length); assertEquals(1, blocks.length);
Assert.assertTrue(blocks[0].isStriped()); Assert.assertTrue(blocks[0].isStriped());
checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), true); checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), true);
} finally {
IOUtils.cleanup(null, out);
}
// restart NameNode to check editlog // restart NameNode to check editlog
cluster.restartNameNode(true); cluster.restartNameNode(true);
FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); fsdir = cluster.getNamesystem().getFSDirectory();
INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile(); fileNode = fsdir.getINode4Write(file.toString()).asFile();
BlockInfo[] blocks = fileNode.getBlocks(); blocks = fileNode.getBlocks();
assertEquals(1, blocks.length); assertEquals(1, blocks.length);
Assert.assertTrue(blocks[0].isStriped()); Assert.assertTrue(blocks[0].isStriped());
checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), false); checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), false);
@ -149,6 +146,9 @@ public void testAddStripedBlock() throws Exception {
assertEquals(1, blocks.length); assertEquals(1, blocks.length);
Assert.assertTrue(blocks[0].isStriped()); Assert.assertTrue(blocks[0].isStriped());
checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), false); checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), false);
} finally {
IOUtils.cleanup(null, out);
}
} }
private void checkStripedBlockUC(BlockInfoStriped block, private void checkStripedBlockUC(BlockInfoStriped block,
@ -190,11 +190,12 @@ public void testGetLocatedStripedBlocks() throws Exception {
FSDataOutputStream out = null; FSDataOutputStream out = null;
try { try {
out = dfs.create(file, (short) 1); out = dfs.create(file, (short) 1);
DFSTestUtil.writeAndFlushStripedOutputStream(
(DFSStripedOutputStream) out.getWrappedStream(),
DFS_BYTES_PER_CHECKSUM_DEFAULT);
FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile(); INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
cluster.getNamesystem().getAdditionalBlock(file.toString(),
fileNode.getId(), dfs.getClient().getClientName(), null, null, null);
BlockInfoStripedUnderConstruction lastBlk = BlockInfoStripedUnderConstruction lastBlk =
(BlockInfoStripedUnderConstruction) fileNode.getLastBlock(); (BlockInfoStripedUnderConstruction) fileNode.getLastBlock();
DatanodeInfo[] expectedDNs = DatanodeStorageInfo DatanodeInfo[] expectedDNs = DatanodeStorageInfo