HDFS-8145. Fix the editlog corruption exposed by failed TestAddStripedBlocks. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2015-04-17 18:13:47 -07:00 committed by Zhe Zhang
parent ab76e1fe36
commit f6e1160ef1
8 changed files with 29 additions and 48 deletions

View File

@ -244,13 +244,6 @@ public int numNodes() {
return num; return num;
} }
@Override
public void write(DataOutput out) throws IOException {
out.writeShort(dataBlockNum);
out.writeShort(parityBlockNum);
super.write(out);
}
/** /**
* Convert a complete block to an under construction block. * Convert a complete block to an under construction block.
* @return BlockInfoUnderConstruction - an under construction block. * @return BlockInfoUnderConstruction - an under construction block.

View File

@ -54,10 +54,6 @@ public ErasureCodingZoneManager(FSDirectory dir) {
this.dir = dir; this.dir = dir;
} }
boolean getECPolicy(INodesInPath iip) throws IOException {
return getECSchema(iip) != null;
}
ECSchema getECSchema(INodesInPath iip) throws IOException { ECSchema getECSchema(INodesInPath iip) throws IOException {
ECZoneInfo ecZoneInfo = getECZoneInfo(iip); ECZoneInfo ecZoneInfo = getECZoneInfo(iip);
return ecZoneInfo == null ? null : ecZoneInfo.getSchema(); return ecZoneInfo == null ? null : ecZoneInfo.getSchema();
@ -109,7 +105,7 @@ XAttr createErasureCodingZone(String src, ECSchema schema)
throw new IOException("Attempt to create an erasure coding zone " + throw new IOException("Attempt to create an erasure coding zone " +
"for a file."); "for a file.");
} }
if (getECPolicy(srcIIP)) { if (getECSchema(srcIIP) != null) {
throw new IOException("Directory " + src + " is already in an " + throw new IOException("Directory " + src + " is already in an " +
"erasure coding zone."); "erasure coding zone.");
} }
@ -132,8 +128,10 @@ XAttr createErasureCodingZone(String src, ECSchema schema)
void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src) void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src)
throws IOException { throws IOException {
assert dir.hasReadLock(); assert dir.hasReadLock();
if (getECPolicy(srcIIP) final ECSchema srcSchema = getECSchema(srcIIP);
!= getECPolicy(dstIIP)) { final ECSchema dstSchema = getECSchema(dstIIP);
if ((srcSchema != null && !srcSchema.equals(dstSchema)) ||
(dstSchema != null && !dstSchema.equals(srcSchema))) {
throw new IOException( throw new IOException(
src + " can't be moved because the source and destination have " + src + " can't be moved because the source and destination have " +
"different erasure coding policies."); "different erasure coding policies.");

View File

@ -1237,7 +1237,7 @@ XAttr createErasureCodingZone(String src, ECSchema schema)
} }
} }
public boolean getECPolicy(INodesInPath iip) throws IOException { public boolean isInECZone(INodesInPath iip) throws IOException {
return getECSchema(iip) != null; return getECSchema(iip) != null;
} }

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@ -416,7 +417,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID); newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID); newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
// TODO whether the file is striped should later be retrieved from iip // TODO whether the file is striped should later be retrieved from iip
updateBlocks(fsDir, addCloseOp, iip, newFile, fsDir.getECPolicy(iip)); updateBlocks(fsDir, addCloseOp, iip, newFile, fsDir.isInECZone(iip));
break; break;
} }
case OP_CLOSE: { case OP_CLOSE: {
@ -437,7 +438,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID); file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID); file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
// TODO whether the file is striped should later be retrieved from iip // TODO whether the file is striped should later be retrieved from iip
updateBlocks(fsDir, addCloseOp, iip, file, fsDir.getECPolicy(iip)); updateBlocks(fsDir, addCloseOp, iip, file, fsDir.isInECZone(iip));
// Now close the file // Now close the file
if (!file.isUnderConstruction() && if (!file.isUnderConstruction() &&
@ -496,7 +497,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path); INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
// Update in-memory data structures // Update in-memory data structures
// TODO whether the file is striped should later be retrieved from iip // TODO whether the file is striped should later be retrieved from iip
updateBlocks(fsDir, updateOp, iip, oldFile, fsDir.getECPolicy(iip)); updateBlocks(fsDir, updateOp, iip, oldFile, fsDir.isInECZone(iip));
if (toAddRetryCache) { if (toAddRetryCache) {
fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId); fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
@ -514,7 +515,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path); INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
// add the new block to the INodeFile // add the new block to the INodeFile
// TODO whether the file is striped should later be retrieved from iip // TODO whether the file is striped should later be retrieved from iip
addNewBlock(addBlockOp, oldFile, fsDir.getECPolicy(iip)); addNewBlock(addBlockOp, oldFile, fsDir.isInECZone(iip));
break; break;
} }
case OP_SET_REPLICATION: { case OP_SET_REPLICATION: {
@ -1079,8 +1080,9 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
// is only executed when loading edits written by prior // is only executed when loading edits written by prior
// versions of Hadoop. Current versions always log // versions of Hadoop. Current versions always log
// OP_ADD operations as each block is allocated. // OP_ADD operations as each block is allocated.
newBI = new BlockInfoContiguous(newBlock, newBI = isStriped ? new BlockInfoStriped(newBlock,
file.getPreferredBlockReplication()); HdfsConstants.NUM_DATA_BLOCKS, HdfsConstants.NUM_PARITY_BLOCKS) :
new BlockInfoContiguous(newBlock, file.getPreferredBlockReplication());
} }
fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBI, file); fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBI, file);
file.addBlock(newBI); file.addBlock(newBI);

View File

@ -771,10 +771,8 @@ INode loadINode(final byte[] localName, boolean isSnapshotINode,
if (isStriped) { if (isStriped) {
blocks = new Block[numBlocks]; blocks = new Block[numBlocks];
for (int j = 0; j < numBlocks; j++) { for (int j = 0; j < numBlocks; j++) {
short dataBlockNum = in.readShort();
short parityBlockNum = in.readShort();
blocks[j] = new BlockInfoStriped(new Block(), blocks[j] = new BlockInfoStriped(new Block(),
dataBlockNum, parityBlockNum); HdfsConstants.NUM_DATA_BLOCKS, HdfsConstants.NUM_PARITY_BLOCKS);
blocks[j].readFields(in); blocks[j].readFields(in);
} }
} else { } else {

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
@ -139,17 +140,15 @@ static INodeFile readINodeUnderConstruction(
blocksStriped = new BlockInfoStriped[numBlocks]; blocksStriped = new BlockInfoStriped[numBlocks];
int i = 0; int i = 0;
for (; i < numBlocks - 1; i++) { for (; i < numBlocks - 1; i++) {
short dataBlockNum = in.readShort(); blocksStriped[i] = new BlockInfoStriped(new Block(),
short parityBlockNum = in.readShort(); HdfsConstants.NUM_DATA_BLOCKS,
blocksStriped[i] = new BlockInfoStriped(new Block(), dataBlockNum, HdfsConstants.NUM_PARITY_BLOCKS);
parityBlockNum);
blocksStriped[i].readFields(in); blocksStriped[i].readFields(in);
} }
if (numBlocks > 0) { if (numBlocks > 0) {
short dataBlockNum = in.readShort();
short parityBlockNum = in.readShort();
blocksStriped[i] = new BlockInfoStripedUnderConstruction(new Block(), blocksStriped[i] = new BlockInfoStripedUnderConstruction(new Block(),
dataBlockNum, parityBlockNum, BlockUCState.UNDER_CONSTRUCTION, null); HdfsConstants.NUM_DATA_BLOCKS, HdfsConstants.NUM_PARITY_BLOCKS,
BlockUCState.UNDER_CONSTRUCTION, null);
blocksStriped[i].readFields(in); blocksStriped[i].readFields(in);
} }
} else { } else {

View File

@ -21,7 +21,6 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox; import org.mockito.internal.util.reflection.Whitebox;
@ -43,12 +42,8 @@ public class TestBlockInfoStriped {
private static final int TOTAL_NUM_BLOCKS = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; private static final int TOTAL_NUM_BLOCKS = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
private static final long BASE_ID = -1600; private static final long BASE_ID = -1600;
private static final Block baseBlock = new Block(BASE_ID); private static final Block baseBlock = new Block(BASE_ID);
private BlockInfoStriped info; private final BlockInfoStriped info = new BlockInfoStriped(baseBlock,
NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
@Before
public void setup() {
info = new BlockInfoStriped(baseBlock, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
}
private Block[] createReportedBlocks(int num) { private Block[] createReportedBlocks(int num) {
Block[] blocks = new Block[num]; Block[] blocks = new Block[num];
@ -230,17 +225,14 @@ public void testWrite() {
long blkID = 1; long blkID = 1;
long numBytes = 1; long numBytes = 1;
long generationStamp = 1; long generationStamp = 1;
short dataBlockNum = 6; ByteBuffer byteBuffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE * 3);
short parityBlockNum = 3; byteBuffer.putLong(blkID).putLong(numBytes).putLong(generationStamp);
ByteBuffer byteBuffer = ByteBuffer.allocate(Long.SIZE/Byte.SIZE*3
+ Short.SIZE/Byte.SIZE*2);
byteBuffer.putShort(dataBlockNum).putShort(parityBlockNum)
.putLong(blkID).putLong(numBytes).putLong(generationStamp);
ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(byteStream); DataOutput out = new DataOutputStream(byteStream);
BlockInfoStriped blk = new BlockInfoStriped(new Block(1,1,1), BlockInfoStriped blk = new BlockInfoStriped(new Block(blkID, numBytes,
(short)6,(short)3); generationStamp), NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
try { try {
blk.write(out); blk.write(out);
} catch(Exception ex) { } catch(Exception ex) {
@ -249,5 +241,4 @@ public void testWrite() {
assertEquals(byteBuffer.array().length, byteStream.toByteArray().length); assertEquals(byteBuffer.array().length, byteStream.toByteArray().length);
assertArrayEquals(byteBuffer.array(), byteStream.toByteArray()); assertArrayEquals(byteBuffer.array(), byteStream.toByteArray());
} }
} }

View File

@ -158,7 +158,7 @@ private void testSaveAndLoadStripedINodeFile(FSNamesystem fsn, Configuration con
for (int i = 0; i < stripedBlks.length; i++) { for (int i = 0; i < stripedBlks.length; i++) {
stripedBlks[i] = new BlockInfoStriped( stripedBlks[i] = new BlockInfoStriped(
new Block(stripedBlkId + i, preferredBlockSize, timestamp), new Block(stripedBlkId + i, preferredBlockSize, timestamp),
(short) 6, (short) 3); HdfsConstants.NUM_DATA_BLOCKS, HdfsConstants.NUM_PARITY_BLOCKS);
file.getStripedBlocksFeature().addBlock(stripedBlks[i]); file.getStripedBlocksFeature().addBlock(stripedBlks[i]);
} }