HDFS-5008. Make ClientProtocol#abandonBlock() idempotent. Contributed by Jing Zhao.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1505761 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c1314eb2a3
commit
11c073134a
@ -461,6 +461,8 @@ Release 2.1.0-beta - 2013-07-02
|
|||||||
HDFS-5007. Replace hard-coded property keys with DFSConfigKeys fields.
|
HDFS-5007. Replace hard-coded property keys with DFSConfigKeys fields.
|
||||||
(Kousuke Saruta via jing9)
|
(Kousuke Saruta via jing9)
|
||||||
|
|
||||||
|
HDFS-5008. Make ClientProtocol#abandonBlock() idempotent. (jing9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm)
|
HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm)
|
||||||
|
@ -274,8 +274,8 @@ public void setOwner(String src, String username, String groupname)
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* The client can give up on a block by calling abandonBlock().
|
* The client can give up on a block by calling abandonBlock().
|
||||||
* The client can then
|
* The client can then either obtain a new block, or complete or abandon the
|
||||||
* either obtain a new block, or complete or abandon the file.
|
* file.
|
||||||
* Any partial writes to the block will be discarded.
|
* Any partial writes to the block will be discarded.
|
||||||
*
|
*
|
||||||
* @throws AccessControlException If access is denied
|
* @throws AccessControlException If access is denied
|
||||||
@ -283,6 +283,7 @@ public void setOwner(String src, String username, String groupname)
|
|||||||
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
|
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
|
||||||
* @throws IOException If an I/O error occurred
|
* @throws IOException If an I/O error occurred
|
||||||
*/
|
*/
|
||||||
|
@Idempotent
|
||||||
public void abandonBlock(ExtendedBlock b, String src, String holder)
|
public void abandonBlock(ExtendedBlock b, String src, String holder)
|
||||||
throws AccessControlException, FileNotFoundException,
|
throws AccessControlException, FileNotFoundException,
|
||||||
UnresolvedLinkException, IOException;
|
UnresolvedLinkException, IOException;
|
||||||
|
@ -418,23 +418,27 @@ void closeFile(String path, INodeFile file) {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove a block from the file.
|
* Remove a block from the file.
|
||||||
|
* @return Whether the block exists in the corresponding file
|
||||||
*/
|
*/
|
||||||
void removeBlock(String path, INodeFileUnderConstruction fileNode,
|
boolean removeBlock(String path, INodeFileUnderConstruction fileNode,
|
||||||
Block block) throws IOException {
|
Block block) throws IOException {
|
||||||
waitForReady();
|
waitForReady();
|
||||||
|
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
unprotectedRemoveBlock(path, fileNode, block);
|
return unprotectedRemoveBlock(path, fileNode, block);
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void unprotectedRemoveBlock(String path, INodeFileUnderConstruction fileNode,
|
boolean unprotectedRemoveBlock(String path,
|
||||||
Block block) throws IOException {
|
INodeFileUnderConstruction fileNode, Block block) throws IOException {
|
||||||
// modify file-> block and blocksMap
|
// modify file-> block and blocksMap
|
||||||
fileNode.removeLastBlock(block);
|
boolean removed = fileNode.removeLastBlock(block);
|
||||||
|
if (!removed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
getBlockManager().removeBlockFromMap(block);
|
getBlockManager().removeBlockFromMap(block);
|
||||||
|
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
@ -446,6 +450,7 @@ void unprotectedRemoveBlock(String path, INodeFileUnderConstruction fileNode,
|
|||||||
// update space consumed
|
// update space consumed
|
||||||
final INodesInPath iip = rootDir.getINodesInPath4Write(path, true);
|
final INodesInPath iip = rootDir.getINodesInPath4Write(path, true);
|
||||||
updateCount(iip, 0, -fileNode.getBlockDiskspace(), true);
|
updateCount(iip, 0, -fileNode.getBlockDiskspace(), true);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -39,6 +39,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
|
||||||
@ -55,6 +56,8 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetNSQuotaOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetNSQuotaOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
|
||||||
@ -64,9 +67,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
|
||||||
@ -74,7 +74,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
|
||||||
import org.apache.hadoop.hdfs.util.Holder;
|
import org.apache.hadoop.hdfs.util.Holder;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
|
|
||||||
@ -659,8 +658,12 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
|
|||||||
throw new IOException("Trying to remove more than one block from file "
|
throw new IOException("Trying to remove more than one block from file "
|
||||||
+ path);
|
+ path);
|
||||||
}
|
}
|
||||||
fsDir.unprotectedRemoveBlock(path,
|
Block oldBlock = oldBlocks[oldBlocks.length - 1];
|
||||||
(INodeFileUnderConstruction)file, oldBlocks[oldBlocks.length - 1]);
|
boolean removed = fsDir.unprotectedRemoveBlock(path,
|
||||||
|
(INodeFileUnderConstruction) file, oldBlock);
|
||||||
|
if (!removed && !(op instanceof UpdateBlocksOp)) {
|
||||||
|
throw new IOException("Trying to delete non-existant block " + oldBlock);
|
||||||
|
}
|
||||||
} else if (newBlocks.length > oldBlocks.length) {
|
} else if (newBlocks.length > oldBlocks.length) {
|
||||||
// We're adding blocks
|
// We're adding blocks
|
||||||
for (int i = oldBlocks.length; i < newBlocks.length; i++) {
|
for (int i = oldBlocks.length; i < newBlocks.length; i++) {
|
||||||
|
@ -2578,7 +2578,11 @@ boolean abandonBlock(ExtendedBlock b, String src, String holder)
|
|||||||
// Remove the block from the pending creates list
|
// Remove the block from the pending creates list
|
||||||
//
|
//
|
||||||
INodeFileUnderConstruction file = checkLease(src, holder);
|
INodeFileUnderConstruction file = checkLease(src, holder);
|
||||||
dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
|
boolean removed = dir.removeBlock(src, file,
|
||||||
|
ExtendedBlock.getLocalBlock(b));
|
||||||
|
if (!removed) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
|
||||||
+ b + " is removed from pendingCreates");
|
+ b + " is removed from pendingCreates");
|
||||||
@ -3553,7 +3557,12 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
|
|||||||
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
|
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
|
||||||
|
|
||||||
if (deleteblock) {
|
if (deleteblock) {
|
||||||
pendingFile.removeLastBlock(ExtendedBlock.getLocalBlock(lastblock));
|
Block blockToDel = ExtendedBlock.getLocalBlock(lastblock);
|
||||||
|
boolean remove = pendingFile.removeLastBlock(blockToDel);
|
||||||
|
if (!remove) {
|
||||||
|
throw new IOException("Trying to delete non-existant block "
|
||||||
|
+ blockToDel);
|
||||||
|
}
|
||||||
blockManager.removeBlockFromMap(storedBlock);
|
blockManager.removeBlockFromMap(storedBlock);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -157,20 +157,21 @@ protected void assertAllBlocksComplete() {
|
|||||||
* Remove a block from the block list. This block should be
|
* Remove a block from the block list. This block should be
|
||||||
* the last one on the list.
|
* the last one on the list.
|
||||||
*/
|
*/
|
||||||
void removeLastBlock(Block oldblock) throws IOException {
|
boolean removeLastBlock(Block oldblock) throws IOException {
|
||||||
final BlockInfo[] blocks = getBlocks();
|
final BlockInfo[] blocks = getBlocks();
|
||||||
if (blocks == null) {
|
if (blocks == null || blocks.length == 0) {
|
||||||
throw new IOException("Trying to delete non-existant block " + oldblock);
|
return false;
|
||||||
}
|
}
|
||||||
int size_1 = blocks.length - 1;
|
int size_1 = blocks.length - 1;
|
||||||
if (!blocks[size_1].equals(oldblock)) {
|
if (!blocks[size_1].equals(oldblock)) {
|
||||||
throw new IOException("Trying to delete non-last block " + oldblock);
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
//copy to a new list
|
//copy to a new list
|
||||||
BlockInfo[] newlist = new BlockInfo[size_1];
|
BlockInfo[] newlist = new BlockInfo[size_1];
|
||||||
System.arraycopy(blocks, 0, newlist, 0, size_1);
|
System.arraycopy(blocks, 0, newlist, 0, size_1);
|
||||||
setBlocks(newlist);
|
setBlocks(newlist);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -21,11 +21,12 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
@ -45,7 +46,7 @@ public class TestAbandonBlock {
|
|||||||
static final String FILE_NAME_PREFIX
|
static final String FILE_NAME_PREFIX
|
||||||
= "/" + TestAbandonBlock.class.getSimpleName() + "_";
|
= "/" + TestAbandonBlock.class.getSimpleName() + "_";
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
private FileSystem fs;
|
private DistributedFileSystem fs;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
@ -73,29 +74,34 @@ public void testAbandonBlock() throws IOException {
|
|||||||
fout.hflush();
|
fout.hflush();
|
||||||
|
|
||||||
// Now abandon the last block
|
// Now abandon the last block
|
||||||
DFSClient dfsclient = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
|
DFSClient dfsclient = DFSClientAdapter.getDFSClient(fs);
|
||||||
LocatedBlocks blocks =
|
LocatedBlocks blocks =
|
||||||
dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE);
|
dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE);
|
||||||
int orginalNumBlocks = blocks.locatedBlockCount();
|
int orginalNumBlocks = blocks.locatedBlockCount();
|
||||||
LocatedBlock b = blocks.getLastLocatedBlock();
|
LocatedBlock b = blocks.getLastLocatedBlock();
|
||||||
dfsclient.getNamenode().abandonBlock(b.getBlock(), src, dfsclient.clientName);
|
dfsclient.getNamenode().abandonBlock(b.getBlock(), src,
|
||||||
|
dfsclient.clientName);
|
||||||
|
|
||||||
|
// call abandonBlock again to make sure the operation is idempotent
|
||||||
|
dfsclient.getNamenode().abandonBlock(b.getBlock(), src,
|
||||||
|
dfsclient.clientName);
|
||||||
|
|
||||||
// And close the file
|
// And close the file
|
||||||
fout.close();
|
fout.close();
|
||||||
|
|
||||||
// Close cluster and check the block has been abandoned after restart
|
// Close cluster and check the block has been abandoned after restart
|
||||||
cluster.restartNameNode();
|
cluster.restartNameNode();
|
||||||
blocks = dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE);
|
blocks = dfsclient.getNamenode().getBlockLocations(src, 0,
|
||||||
assert orginalNumBlocks == blocks.locatedBlockCount() + 1 :
|
Integer.MAX_VALUE);
|
||||||
"Blocks " + b + " has not been abandoned.";
|
Assert.assertEquals("Blocks " + b + " has not been abandoned.",
|
||||||
|
orginalNumBlocks, blocks.locatedBlockCount() + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
/** Make sure that the quota is decremented correctly when a block is abandoned */
|
/** Make sure that the quota is decremented correctly when a block is abandoned */
|
||||||
public void testQuotaUpdatedWhenBlockAbandoned() throws IOException {
|
public void testQuotaUpdatedWhenBlockAbandoned() throws IOException {
|
||||||
DistributedFileSystem dfs = (DistributedFileSystem)fs;
|
|
||||||
// Setting diskspace quota to 3MB
|
// Setting diskspace quota to 3MB
|
||||||
dfs.setQuota(new Path("/"), HdfsConstants.QUOTA_DONT_SET, 3 * 1024 * 1024);
|
fs.setQuota(new Path("/"), HdfsConstants.QUOTA_DONT_SET, 3 * 1024 * 1024);
|
||||||
|
|
||||||
// Start writing a file with 2 replicas to ensure each datanode has one.
|
// Start writing a file with 2 replicas to ensure each datanode has one.
|
||||||
// Block Size is 1MB.
|
// Block Size is 1MB.
|
||||||
|
Loading…
Reference in New Issue
Block a user