HDFS-6481. DatanodeManager#getDatanodeStorageInfos() should check the length of storageIDs. (Contributed by szetszwo)
This commit is contained in:
parent
74a9a51886
commit
0b18e5e8c6
@ -2346,6 +2346,9 @@ Release 2.7.2 - UNRELEASED
|
||||
HDFS-9317. Document fsck -blockId and -storagepolicy options in branch-2.7.
|
||||
(aajisaka)
|
||||
|
||||
HDFS-6481. DatanodeManager#getDatanodeStorageInfos() should check the
|
||||
length of storageIDs. (szetszwo via Arpit Agarwal)
|
||||
|
||||
Release 2.7.1 - 2015-07-06
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -506,8 +506,18 @@ public DatanodeDescriptor getDatanode(DatanodeID nodeID
|
||||
}
|
||||
|
||||
public DatanodeStorageInfo[] getDatanodeStorageInfos(
|
||||
DatanodeID[] datanodeID, String[] storageIDs)
|
||||
throws UnregisteredNodeException {
|
||||
DatanodeID[] datanodeID, String[] storageIDs,
|
||||
String format, Object... args) throws UnregisteredNodeException {
|
||||
if (datanodeID.length != storageIDs.length) {
|
||||
final String err = (storageIDs.length == 0?
|
||||
"Missing storageIDs: It is likely that the HDFS client,"
|
||||
+ " who made this call, is running in an older version of Hadoop"
|
||||
+ " which does not support storageIDs."
|
||||
: "Length mismatched: storageIDs.length=" + storageIDs.length + " != "
|
||||
) + " datanodeID.length=" + datanodeID.length;
|
||||
throw new HadoopIllegalArgumentException(
|
||||
err + ", "+ String.format(format, args));
|
||||
}
|
||||
if (datanodeID.length == 0) {
|
||||
return null;
|
||||
}
|
||||
|
@ -2517,7 +2517,9 @@ LocatedBlock getAdditionalDatanode(String src, long fileId,
|
||||
|
||||
//find datanode storages
|
||||
final DatanodeManager dm = blockManager.getDatanodeManager();
|
||||
chosen = Arrays.asList(dm.getDatanodeStorageInfos(existings, storageIDs));
|
||||
chosen = Arrays.asList(dm.getDatanodeStorageInfos(existings, storageIDs,
|
||||
"src=%s, fileId=%d, blk=%s, clientName=%s, clientMachine=%s",
|
||||
src, fileId, blk, clientName, clientMachine));
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
@ -3319,7 +3321,7 @@ void commitBlockSynchronization(ExtendedBlock oldBlock,
|
||||
+ ", deleteBlock=" + deleteblock
|
||||
+ ")");
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
String src = "";
|
||||
final String src;
|
||||
waitForLoadingFSImage();
|
||||
writeLock();
|
||||
boolean copyTruncate = false;
|
||||
@ -3366,10 +3368,10 @@ void commitBlockSynchronization(ExtendedBlock oldBlock,
|
||||
}
|
||||
long bcId = storedBlock.getBlockCollectionId();
|
||||
INodeFile iFile = ((INode)getBlockCollection(bcId)).asFile();
|
||||
src = iFile.getFullPathName();
|
||||
if (isFileDeleted(iFile)) {
|
||||
throw new FileNotFoundException("File not found: "
|
||||
+ iFile.getFullPathName() + ", likely due to delayed block"
|
||||
+ " removal");
|
||||
+ src + ", likely due to delayed block removal");
|
||||
}
|
||||
if ((!iFile.isUnderConstruction() || storedBlock.isComplete()) &&
|
||||
iFile.getLastBlock().isComplete()) {
|
||||
@ -3443,7 +3445,10 @@ void commitBlockSynchronization(ExtendedBlock oldBlock,
|
||||
DatanodeStorageInfo[] trimmedStorageInfos =
|
||||
blockManager.getDatanodeManager().getDatanodeStorageInfos(
|
||||
trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
|
||||
trimmedStorages.toArray(new String[trimmedStorages.size()]));
|
||||
trimmedStorages.toArray(new String[trimmedStorages.size()]),
|
||||
"src=%s, oldBlock=%s, newgenerationstamp=%d, newlength=%d",
|
||||
src, oldBlock, newgenerationstamp, newlength);
|
||||
|
||||
if(copyTruncate) {
|
||||
iFile.convertLastBlockToUC(truncatedBlock, trimmedStorageInfos);
|
||||
} else {
|
||||
@ -3458,16 +3463,15 @@ void commitBlockSynchronization(ExtendedBlock oldBlock,
|
||||
|
||||
if (closeFile) {
|
||||
if(copyTruncate) {
|
||||
src = closeFileCommitBlocks(iFile, truncatedBlock);
|
||||
closeFileCommitBlocks(src, iFile, truncatedBlock);
|
||||
if(!iFile.isBlockInLatestSnapshot((BlockInfoContiguous) storedBlock)) {
|
||||
blockManager.removeBlock(storedBlock);
|
||||
}
|
||||
} else {
|
||||
src = closeFileCommitBlocks(iFile, storedBlock);
|
||||
closeFileCommitBlocks(src, iFile, storedBlock);
|
||||
}
|
||||
} else {
|
||||
// If this commit does not want to close the file, persist blocks
|
||||
src = iFile.getFullPathName();
|
||||
FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
|
||||
}
|
||||
} finally {
|
||||
@ -3493,20 +3497,16 @@ void commitBlockSynchronization(ExtendedBlock oldBlock,
|
||||
* @throws IOException on error
|
||||
*/
|
||||
@VisibleForTesting
|
||||
String closeFileCommitBlocks(INodeFile pendingFile, BlockInfo storedBlock)
|
||||
throws IOException {
|
||||
void closeFileCommitBlocks(String src, INodeFile pendingFile,
|
||||
BlockInfo storedBlock) throws IOException {
|
||||
final INodesInPath iip = INodesInPath.fromINode(pendingFile);
|
||||
final String src = iip.getPath();
|
||||
|
||||
// commit the last block and complete it if it has minimum replicas
|
||||
commitOrCompleteLastBlock(pendingFile, iip, storedBlock);
|
||||
|
||||
//remove lease, close file
|
||||
finalizeINodeFileUnderConstruction(src, pendingFile,
|
||||
Snapshot.findLatestSnapshot(pendingFile,
|
||||
Snapshot.CURRENT_STATE_ID));
|
||||
|
||||
return src;
|
||||
Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -5473,6 +5473,7 @@ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
|
||||
assert hasWriteLock();
|
||||
// check the vadility of the block and lease holder name
|
||||
final INodeFile pendingFile = checkUCBlock(oldBlock, clientName);
|
||||
final String src = pendingFile.getFullPathName();
|
||||
final BlockInfo lastBlock = pendingFile.getLastBlock();
|
||||
assert !lastBlock.isComplete();
|
||||
|
||||
@ -5498,11 +5499,12 @@ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
|
||||
|
||||
// find the DatanodeDescriptor objects
|
||||
final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
|
||||
.getDatanodeStorageInfos(newNodes, newStorageIDs);
|
||||
.getDatanodeStorageInfos(newNodes, newStorageIDs,
|
||||
"src=%s, oldBlock=%s, newBlock=%s, clientName=%s",
|
||||
src, oldBlock, newBlock, clientName);
|
||||
lastBlock.getUnderConstructionFeature().setExpectedLocations(lastBlock,
|
||||
storages, lastBlock.isStriped());
|
||||
|
||||
String src = pendingFile.getFullPathName();
|
||||
FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache);
|
||||
}
|
||||
|
||||
|
@ -80,8 +80,8 @@ private FSNamesystem makeNameSystemSpy(Block block, INodeFile file)
|
||||
|
||||
doReturn(blockInfo).when(namesystemSpy).getStoredBlock(any(Block.class));
|
||||
doReturn(blockInfo).when(file).getLastBlock();
|
||||
doReturn("").when(namesystemSpy).closeFileCommitBlocks(
|
||||
any(INodeFile.class), any(BlockInfo.class));
|
||||
doNothing().when(namesystemSpy).closeFileCommitBlocks(
|
||||
any(String.class), any(INodeFile.class), any(BlockInfo.class));
|
||||
doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog();
|
||||
|
||||
return namesystemSpy;
|
||||
|
Loading…
Reference in New Issue
Block a user