HDFS-5024. Make DatanodeProtocol#commitBlockSynchronization idempotent. Contributed by Arpit Agarwal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1506789 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2013-07-25 00:32:40 +00:00
parent cc0a0fce63
commit f138ae68f9
4 changed files with 244 additions and 22 deletions

View File

@ -470,6 +470,9 @@ Release 2.1.0-beta - 2013-07-02
HDFS-5020. Make DatanodeProtocol#blockReceivedAndDeleted idempotent. HDFS-5020. Make DatanodeProtocol#blockReceivedAndDeleted idempotent.
(jing9) (jing9)
HDFS-5024. Make DatanodeProtocol#commitBlockSynchronization idempotent.
(Arpit Agarwal via jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm) HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm)

View File

@ -3068,7 +3068,7 @@ void removePathAndBlocks(String src, BlocksMapUpdateInfo blocks,
for (Block b : blocks.getToDeleteList()) { for (Block b : blocks.getToDeleteList()) {
if (trackBlockCounts) { if (trackBlockCounts) {
BlockInfo bi = blockManager.getStoredBlock(b); BlockInfo bi = getStoredBlock(b);
if (bi.isComplete()) { if (bi.isComplete()) {
numRemovedComplete++; numRemovedComplete++;
if (bi.numNodes() >= blockManager.minReplication) { if (bi.numNodes() >= blockManager.minReplication) {
@ -3509,6 +3509,11 @@ private void finalizeINodeFileUnderConstruction(String src,
blockManager.checkReplication(newFile); blockManager.checkReplication(newFile);
} }
@VisibleForTesting
BlockInfo getStoredBlock(Block block) {
return blockManager.getStoredBlock(block);
}
void commitBlockSynchronization(ExtendedBlock lastblock, void commitBlockSynchronization(ExtendedBlock lastblock,
long newgenerationstamp, long newlength, long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
@ -3534,16 +3539,28 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
"Cannot commitBlockSynchronization while in safe mode", "Cannot commitBlockSynchronization while in safe mode",
safeMode); safeMode);
} }
final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock final BlockInfo storedBlock = getStoredBlock(
.getLocalBlock(lastblock)); ExtendedBlock.getLocalBlock(lastblock));
if (storedBlock == null) { if (storedBlock == null) {
throw new IOException("Block (=" + lastblock + ") not found"); if (deleteblock) {
// This may be a retry attempt so ignore the failure
// to locate the block.
if (LOG.isDebugEnabled()) {
LOG.debug("Block (=" + lastblock + ") not found");
}
return;
} else {
throw new IOException("Block (=" + lastblock + ") not found");
}
} }
INodeFile iFile = ((INode)storedBlock.getBlockCollection()).asFile(); INodeFile iFile = ((INode)storedBlock.getBlockCollection()).asFile();
if (!iFile.isUnderConstruction() || storedBlock.isComplete()) { if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
throw new IOException("Unexpected block (=" + lastblock if (LOG.isDebugEnabled()) {
+ ") since the file (=" + iFile.getLocalName() LOG.debug("Unexpected block (=" + lastblock
+ ") is not under construction"); + ") since the file (=" + iFile.getLocalName()
+ ") is not under construction");
}
return;
} }
long recoveryId = long recoveryId =
@ -3559,11 +3576,9 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
if (deleteblock) { if (deleteblock) {
Block blockToDel = ExtendedBlock.getLocalBlock(lastblock); Block blockToDel = ExtendedBlock.getLocalBlock(lastblock);
boolean remove = pendingFile.removeLastBlock(blockToDel); boolean remove = pendingFile.removeLastBlock(blockToDel);
if (!remove) { if (remove) {
throw new IOException("Trying to delete non-existant block " blockManager.removeBlockFromMap(storedBlock);
+ blockToDel);
} }
blockManager.removeBlockFromMap(storedBlock);
} }
else { else {
// update last block // update last block
@ -3593,17 +3608,11 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
pendingFile.setLastBlock(storedBlock, descriptors); pendingFile.setLastBlock(storedBlock, descriptors);
} }
src = leaseManager.findPath(pendingFile);
if (closeFile) { if (closeFile) {
// commit the last block and complete it if it has minimum replicas src = closeFileCommitBlocks(pendingFile, storedBlock);
commitOrCompleteLastBlock(pendingFile, storedBlock);
//remove lease, close file
finalizeINodeFileUnderConstruction(src, pendingFile,
Snapshot.findLatestSnapshot(pendingFile, null));
} else { } else {
// If this commit does not want to close the file, persist blocks // If this commit does not want to close the file, persist blocks
dir.persistBlocks(src, pendingFile); src = persistBlocks(pendingFile);
} }
} finally { } finally {
writeUnlock(); writeUnlock();
@ -3620,6 +3629,44 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
} }
} }
/**
*
* @param pendingFile
* @param storedBlock
* @return Path of the file that was closed.
* @throws IOException
*/
@VisibleForTesting
String closeFileCommitBlocks(INodeFileUnderConstruction pendingFile,
BlockInfo storedBlock)
throws IOException {
String src = leaseManager.findPath(pendingFile);
// commit the last block and complete it if it has minimum replicas
commitOrCompleteLastBlock(pendingFile, storedBlock);
//remove lease, close file
finalizeINodeFileUnderConstruction(src, pendingFile,
Snapshot.findLatestSnapshot(pendingFile, null));
return src;
}
/**
* Persist the block list for the given file.
*
* @param pendingFile
* @return Path to the given file.
* @throws IOException
*/
@VisibleForTesting
String persistBlocks(INodeFileUnderConstruction pendingFile)
throws IOException {
String src = leaseManager.findPath(pendingFile);
dir.persistBlocks(src, pendingFile);
return src;
}
/** /**
* Renew the lease(s) held by the given client * Renew the lease(s) held by the given client
@ -4662,7 +4709,7 @@ public void decrementSafeBlockCount(Block b) {
SafeModeInfo safeMode = this.safeMode; SafeModeInfo safeMode = this.safeMode;
if (safeMode == null) // mostly true if (safeMode == null) // mostly true
return; return;
BlockInfo storedBlock = blockManager.getStoredBlock(b); BlockInfo storedBlock = getStoredBlock(b);
if (storedBlock.isComplete()) { if (storedBlock.isComplete()) {
safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas()); safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
} }
@ -5279,7 +5326,7 @@ private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block,
} }
// check stored block state // check stored block state
BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock.getLocalBlock(block)); BlockInfo storedBlock = getStoredBlock(ExtendedBlock.getLocalBlock(block));
if (storedBlock == null || if (storedBlock == null ||
storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) { storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) {
throw new IOException(block + throw new IOException(block +

View File

@ -166,7 +166,7 @@ public void errorReport(DatanodeRegistration registration,
/** /**
* Commit block synchronization in lease recovery * Commit block synchronization in lease recovery
*/ */
@AtMostOnce @Idempotent
public void commitBlockSynchronization(ExtendedBlock block, public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength, long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.junit.Test;
/**
* Verify that TestCommitBlockSynchronization is idempotent.
*/
public class TestCommitBlockSynchronization {
private static final long blockId = 100;
private static final long length = 200;
private static final long genStamp = 300;
private FSNamesystem makeNameSystemSpy(Block block,
INodeFileUnderConstruction file)
throws IOException {
Configuration conf = new Configuration();
FSImage image = new FSImage(conf);
DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
FSNamesystem namesystem = new FSNamesystem(conf, image);
FSNamesystem namesystemSpy = spy(namesystem);
BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
block, 1, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
blockInfo.setBlockCollection(file);
blockInfo.setGenerationStamp(genStamp);
blockInfo.initializeBlockRecovery(genStamp);
doReturn(true).when(file).removeLastBlock(any(Block.class));
doReturn(blockInfo).when(namesystemSpy).getStoredBlock(any(Block.class));
doReturn("").when(namesystemSpy).closeFileCommitBlocks(
any(INodeFileUnderConstruction.class),
any(BlockInfo.class));
doReturn("").when(namesystemSpy).persistBlocks(
any(INodeFileUnderConstruction.class));
doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog();
return namesystemSpy;
}
@Test
public void testCommitBlockSynchronization() throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[0];
ExtendedBlock lastBlock = new ExtendedBlock();
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, false,
false, newTargets, null);
// Repeat the call to make sure it does not throw
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, false, false, newTargets, null);
// Simulate 'completing' the block.
BlockInfo completedBlockInfo = new BlockInfo(block, 1);
completedBlockInfo.setBlockCollection(file);
completedBlockInfo.setGenerationStamp(genStamp);
doReturn(completedBlockInfo).when(namesystemSpy)
.getStoredBlock(any(Block.class));
// Repeat the call to make sure it does not throw
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, false, false, newTargets, null);
}
@Test
public void testCommitBlockSynchronization2() throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[0];
ExtendedBlock lastBlock = new ExtendedBlock();
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, false,
false, newTargets, null);
// Make sure the call fails if the generation stamp does not match
// the block recovery ID.
try {
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp - 1, length, false, false, newTargets, null);
fail("Failed to get expected IOException on generation stamp/" +
"recovery ID mismatch");
} catch (IOException ioe) {
// Expected exception.
}
}
@Test
public void testCommitBlockSynchronizationWithDelete() throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
DatanodeID[] newTargets = new DatanodeID[0];
ExtendedBlock lastBlock = new ExtendedBlock();
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, false,
true, newTargets, null);
// Simulate removing the last block from the file.
doReturn(false).when(file).removeLastBlock(any(Block.class));
// Repeat the call to make sure it does not throw
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, false, true, newTargets, null);
}
@Test
public void testCommitBlockSynchronizationWithClose() throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
DatanodeID[] newTargets = new DatanodeID[0];
ExtendedBlock lastBlock = new ExtendedBlock();
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, true,
false, newTargets, null);
// Repeat the call to make sure it returns true
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, true, false, newTargets, null);
BlockInfo completedBlockInfo = new BlockInfo(block, 1);
completedBlockInfo.setBlockCollection(file);
completedBlockInfo.setGenerationStamp(genStamp);
doReturn(completedBlockInfo).when(namesystemSpy)
.getStoredBlock(any(Block.class));
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, true, false, newTargets, null);
}
}