HDFS-8498. Blocks can be committed with wrong size. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2017-02-15 10:44:37 -08:00
parent 0fc6f38379
commit 627da6f717
3 changed files with 72 additions and 39 deletions

View File

@ -142,8 +142,6 @@ boolean continueRetryingOrThrow() throws InvalidEncryptionKeyException {
/** /**
* Record a connection exception. * Record a connection exception.
* @param e
* @throws InvalidEncryptionKeyException
*/ */
void recordFailure(final InvalidEncryptionKeyException e) void recordFailure(final InvalidEncryptionKeyException e)
throws InvalidEncryptionKeyException { throws InvalidEncryptionKeyException {
@ -178,9 +176,8 @@ void sendTransferBlock(final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes, final StorageType[] targetStorageTypes,
final Token<BlockTokenIdentifier> blockToken) throws IOException { final Token<BlockTokenIdentifier> blockToken) throws IOException {
//send the TRANSFER_BLOCK request //send the TRANSFER_BLOCK request
new Sender(out) new Sender(out).transferBlock(block.getCurrentBlock(), blockToken,
.transferBlock(block, blockToken, dfsClient.clientName, targets, dfsClient.clientName, targets, targetStorageTypes);
targetStorageTypes);
out.flush(); out.flush();
//ack //ack
BlockOpResponseProto transferResponse = BlockOpResponseProto BlockOpResponseProto transferResponse = BlockOpResponseProto
@ -199,6 +196,42 @@ public void close() throws IOException {
} }
} }
static class BlockToWrite {
private ExtendedBlock currentBlock;
BlockToWrite(ExtendedBlock block) {
setCurrentBlock(block);
}
synchronized ExtendedBlock getCurrentBlock() {
return currentBlock == null ? null : new ExtendedBlock(currentBlock);
}
synchronized long getNumBytes() {
return currentBlock == null ? 0 : currentBlock.getNumBytes();
}
synchronized void setCurrentBlock(ExtendedBlock block) {
currentBlock = (block == null || block.getLocalBlock() == null) ?
null : new ExtendedBlock(block);
}
synchronized void setNumBytes(long numBytes) {
assert currentBlock != null;
currentBlock.setNumBytes(numBytes);
}
synchronized void setGenerationStamp(long generationStamp) {
assert currentBlock != null;
currentBlock.setGenerationStamp(generationStamp);
}
@Override
public synchronized String toString() {
return currentBlock == null ? "null" : currentBlock.toString();
}
}
/** /**
* Create a socket for a write pipeline * Create a socket for a write pipeline
* *
@ -440,7 +473,7 @@ synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) {
} }
private volatile boolean streamerClosed = false; private volatile boolean streamerClosed = false;
protected volatile ExtendedBlock block; // its length is number of bytes acked protected final BlockToWrite block; // its length is number of bytes acked
protected Token<BlockTokenIdentifier> accessToken; protected Token<BlockTokenIdentifier> accessToken;
private DataOutputStream blockStream; private DataOutputStream blockStream;
private DataInputStream blockReplyStream; private DataInputStream blockReplyStream;
@ -508,7 +541,7 @@ private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
ByteArrayManager byteArrayManage, ByteArrayManager byteArrayManage,
boolean isAppend, String[] favoredNodes, boolean isAppend, String[] favoredNodes,
EnumSet<AddBlockFlag> flags) { EnumSet<AddBlockFlag> flags) {
this.block = block; this.block = new BlockToWrite(block);
this.dfsClient = dfsClient; this.dfsClient = dfsClient;
this.src = src; this.src = src;
this.progress = progress; this.progress = progress;
@ -1322,7 +1355,7 @@ private void addDatanode2ExistingPipeline() throws IOException {
LocatedBlock lb; LocatedBlock lb;
//get a new datanode //get a new datanode
lb = dfsClient.namenode.getAdditionalDatanode( lb = dfsClient.namenode.getAdditionalDatanode(
src, stat.getFileId(), block, nodes, storageIDs, src, stat.getFileId(), block.getCurrentBlock(), nodes, storageIDs,
exclude.toArray(new DatanodeInfo[exclude.size()]), exclude.toArray(new DatanodeInfo[exclude.size()]),
1, dfsClient.clientName); 1, dfsClient.clientName);
// a new node was allocated by the namenode. Update nodes. // a new node was allocated by the namenode. Update nodes.
@ -1440,7 +1473,7 @@ protected void setupPipelineInternal(DatanodeInfo[] datanodes,
} // while } // while
if (success) { if (success) {
block = updatePipeline(newGS); updatePipeline(newGS);
} }
} }
@ -1536,21 +1569,22 @@ void failPacket4Testing() {
} }
private LocatedBlock updateBlockForPipeline() throws IOException { private LocatedBlock updateBlockForPipeline() throws IOException {
return dfsClient.namenode.updateBlockForPipeline(block, return dfsClient.namenode.updateBlockForPipeline(block.getCurrentBlock(),
dfsClient.clientName); dfsClient.clientName);
} }
static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) { void updateBlockGS(final long newGS) {
return new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), block.setGenerationStamp(newGS);
b.getNumBytes(), newGS);
} }
/** update pipeline at the namenode */ /** update pipeline at the namenode */
ExtendedBlock updatePipeline(long newGS) throws IOException { private void updatePipeline(long newGS) throws IOException {
final ExtendedBlock newBlock = newBlock(block, newGS); final ExtendedBlock oldBlock = block.getCurrentBlock();
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, // the new GS has been propagated to all DN, it should be ok to update the
nodes, storageIDs); // local block state
return newBlock; updateBlockGS(newGS);
dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock,
block.getCurrentBlock(), nodes, storageIDs);
} }
DatanodeInfo[] getExcludedNodes() { DatanodeInfo[] getExcludedNodes() {
@ -1570,31 +1604,29 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
StorageType[] storageTypes; StorageType[] storageTypes;
int count = dfsClient.getConf().getNumBlockWriteRetry(); int count = dfsClient.getConf().getNumBlockWriteRetry();
boolean success; boolean success;
ExtendedBlock oldBlock = block; final ExtendedBlock oldBlock = block.getCurrentBlock();
do { do {
errorState.resetInternalError(); errorState.resetInternalError();
lastException.clear(); lastException.clear();
DatanodeInfo[] excluded = getExcludedNodes(); DatanodeInfo[] excluded = getExcludedNodes();
block = oldBlock; lb = locateFollowingBlock(
lb = locateFollowingBlock(excluded.length > 0 ? excluded : null); excluded.length > 0 ? excluded : null, oldBlock);
block = lb.getBlock(); block.setCurrentBlock(lb.getBlock());
block.setNumBytes(0); block.setNumBytes(0);
bytesSent = 0; bytesSent = 0;
accessToken = lb.getBlockToken(); accessToken = lb.getBlockToken();
nodes = lb.getLocations(); nodes = lb.getLocations();
storageTypes = lb.getStorageTypes(); storageTypes = lb.getStorageTypes();
//
// Connect to first DataNode in the list. // Connect to first DataNode in the list.
//
success = createBlockOutputStream(nodes, storageTypes, 0L, false); success = createBlockOutputStream(nodes, storageTypes, 0L, false);
if (!success) { if (!success) {
LOG.warn("Abandoning " + block); LOG.warn("Abandoning " + block);
dfsClient.namenode.abandonBlock(block, stat.getFileId(), src, dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
dfsClient.clientName); stat.getFileId(), src, dfsClient.clientName);
block = null; block.setCurrentBlock(null);
final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
LOG.warn("Excluding datanode " + badNode); LOG.warn("Excluding datanode " + badNode);
excludedNodes.put(badNode, badNode); excludedNodes.put(badNode, badNode);
@ -1655,7 +1687,7 @@ boolean createBlockOutputStream(DatanodeInfo[] nodes,
// We cannot change the block length in 'block' as it counts the number // We cannot change the block length in 'block' as it counts the number
// of bytes ack'ed. // of bytes ack'ed.
ExtendedBlock blockCopy = new ExtendedBlock(block); ExtendedBlock blockCopy = block.getCurrentBlock();
blockCopy.setNumBytes(stat.getBlockSize()); blockCopy.setNumBytes(stat.getBlockSize());
boolean[] targetPinnings = getPinnings(nodes); boolean[] targetPinnings = getPinnings(nodes);
@ -1765,9 +1797,9 @@ private boolean[] getPinnings(DatanodeInfo[] nodes) {
} }
} }
private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
throws IOException { ExtendedBlock oldBlock) throws IOException {
return DFSOutputStream.addBlock(excludedNodes, dfsClient, src, block, return DFSOutputStream.addBlock(excluded, dfsClient, src, oldBlock,
stat.getFileId(), favoredNodes, addBlockFlags); stat.getFileId(), favoredNodes, addBlockFlags);
} }
@ -1811,7 +1843,7 @@ private void backOffIfNecessary() throws InterruptedException {
* @return the block this streamer is writing to * @return the block this streamer is writing to
*/ */
ExtendedBlock getBlock() { ExtendedBlock getBlock() {
return block; return block.getCurrentBlock();
} }
/** /**
@ -2016,6 +2048,8 @@ void closeSocket() throws IOException {
@Override @Override
public String toString() { public String toString() {
return block == null? "block==null": "" + block.getLocalBlock(); final ExtendedBlock extendedBlock = block.getCurrentBlock();
return extendedBlock == null ?
"block==null" : "" + extendedBlock.getLocalBlock();
} }
} }

View File

@ -71,7 +71,7 @@ boolean isHealthy() {
@Override @Override
protected void endBlock() { protected void endBlock() {
coordinator.offerEndBlock(index, block); coordinator.offerEndBlock(index, block.getCurrentBlock());
super.endBlock(); super.endBlock();
} }
@ -93,7 +93,7 @@ private LocatedBlock getFollowingBlock() throws IOException {
protected LocatedBlock nextBlockOutputStream() throws IOException { protected LocatedBlock nextBlockOutputStream() throws IOException {
boolean success; boolean success;
LocatedBlock lb = getFollowingBlock(); LocatedBlock lb = getFollowingBlock();
block = lb.getBlock(); block.setCurrentBlock(lb.getBlock());
block.setNumBytes(0); block.setNumBytes(0);
bytesSent = 0; bytesSent = 0;
accessToken = lb.getBlockToken(); accessToken = lb.getBlockToken();
@ -105,7 +105,7 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
success = createBlockOutputStream(nodes, storageTypes, 0L, false); success = createBlockOutputStream(nodes, storageTypes, 0L, false);
if (!success) { if (!success) {
block = null; block.setCurrentBlock(null);
final DatanodeInfo badNode = nodes[getErrorState().getBadNodeIndex()]; final DatanodeInfo badNode = nodes[getErrorState().getBadNodeIndex()];
LOG.warn("Excluding datanode " + badNode); LOG.warn("Excluding datanode " + badNode);
excludedNodes.put(badNode, badNode); excludedNodes.put(badNode, badNode);
@ -161,7 +161,7 @@ protected void setupPipelineInternal(DatanodeInfo[] nodes,
success = coordinator.takeStreamerUpdateResult(index); success = coordinator.takeStreamerUpdateResult(index);
if (success) { if (success) {
// if all succeeded, update its block using the new GS // if all succeeded, update its block using the new GS
block = newBlock(block, newGS); updateBlockGS(newGS);
} else { } else {
// otherwise close the block stream and restart the recovery process // otherwise close the block stream and restart the recovery process
closeStream(); closeStream();

View File

@ -110,8 +110,7 @@ public void testCloseTwice() throws IOException {
* packet size < 64kB. See HDFS-7308 for details. * packet size < 64kB. See HDFS-7308 for details.
*/ */
@Test @Test
public void testComputePacketChunkSize() public void testComputePacketChunkSize() throws Exception {
throws Exception {
DistributedFileSystem fs = cluster.getFileSystem(); DistributedFileSystem fs = cluster.getFileSystem();
FSDataOutputStream os = fs.create(new Path("/test")); FSDataOutputStream os = fs.create(new Path("/test"));
DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os, DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,