HDFS-5439. Fix TestPendingReplication. (Contributed by Junping Du, Arpit Agarwal)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1539247 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-11-06 06:50:33 +00:00
parent 103fd6c6d8
commit fa5ba6d977
9 changed files with 50 additions and 32 deletions

View File

@ -76,3 +76,6 @@ IMPROVEMENTS:
HDFS-5466. Update storage IDs when the pipeline is updated. (Contributed HDFS-5466. Update storage IDs when the pipeline is updated. (Contributed
by szetszwo) by szetszwo)
HDFS-5439. Fix TestPendingReplication. (Contributed by Junping Du, Arpit
Agarwal)

View File

@ -1332,7 +1332,8 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
// Move the block-replication into a "pending" state. // Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry // The reason we use 'pending' is so we can retry
// replications that fail after an appropriate amount of time. // replications that fail after an appropriate amount of time.
pendingReplications.increment(block, targets); pendingReplications.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors(targets));
if(blockLog.isDebugEnabled()) { if(blockLog.isDebugEnabled()) {
blockLog.debug( blockLog.debug(
"BLOCK* block " + block "BLOCK* block " + block
@ -1357,7 +1358,7 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
StringBuilder targetList = new StringBuilder("datanode(s)"); StringBuilder targetList = new StringBuilder("datanode(s)");
for (int k = 0; k < targets.length; k++) { for (int k = 0; k < targets.length; k++) {
targetList.append(' '); targetList.append(' ');
targetList.append(targets[k]); targetList.append(targets[k].getDatanodeDescriptor());
} }
blockLog.info("BLOCK* ask " + rw.srcNode blockLog.info("BLOCK* ask " + rw.srcNode
+ " to replicate " + rw.block + " to " + targetList); + " to replicate " + rw.block + " to " + targetList);
@ -2645,7 +2646,7 @@ void addBlock(DatanodeDescriptor node, String storageID, Block block, String del
// //
// Modify the blocks->datanode map and node's map. // Modify the blocks->datanode map and node's map.
// //
pendingReplications.decrement(block, node, storageID); pendingReplications.decrement(block, node);
processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED, processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
delHintNode); delHintNode);
} }

View File

@ -496,7 +496,7 @@ protected DatanodeStorageInfo chooseRandom(int numOfReplicas,
builder.setLength(0); builder.setLength(0);
builder.append("["); builder.append("[");
} }
boolean goodTarget = false; boolean badTarget = false;
DatanodeStorageInfo firstChosen = null; DatanodeStorageInfo firstChosen = null;
while(numOfReplicas > 0 && numOfAvailableNodes > 0) { while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
DatanodeDescriptor chosenNode = DatanodeDescriptor chosenNode =
@ -506,26 +506,30 @@ protected DatanodeStorageInfo chooseRandom(int numOfReplicas,
final DatanodeStorageInfo[] storages = DFSUtil.shuffle( final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
chosenNode.getStorageInfos()); chosenNode.getStorageInfos());
for(int i = 0; i < storages.length && !goodTarget; i++) { int i;
for(i = 0; i < storages.length; i++) {
final int newExcludedNodes = addIfIsGoodTarget(storages[i], final int newExcludedNodes = addIfIsGoodTarget(storages[i],
excludedNodes, blocksize, maxNodesPerRack, considerLoad, results, excludedNodes, blocksize, maxNodesPerRack, considerLoad, results,
avoidStaleNodes, storageType); avoidStaleNodes, storageType);
goodTarget = newExcludedNodes >= 0; if (newExcludedNodes >= 0) {
if (goodTarget) {
numOfReplicas--; numOfReplicas--;
if (firstChosen == null) { if (firstChosen == null) {
firstChosen = storages[i]; firstChosen = storages[i];
} }
numOfAvailableNodes -= newExcludedNodes; numOfAvailableNodes -= newExcludedNodes;
break;
} }
} }
// If no candidate storage was found on this DN then set badTarget.
badTarget = (i == storages.length);
} }
} }
if (numOfReplicas>0) { if (numOfReplicas>0) {
String detail = enableDebugLogging; String detail = enableDebugLogging;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
if (!goodTarget && builder != null) { if (badTarget && builder != null) {
detail = builder.append("]").toString(); detail = builder.append("]").toString();
builder.setLength(0); builder.setLength(0);
} else detail = ""; } else detail = "";

View File

@ -46,6 +46,15 @@ static DatanodeInfo[] toDatanodeInfos(List<DatanodeStorageInfo> storages) {
return datanodes; return datanodes;
} }
static DatanodeDescriptor[] toDatanodeDescriptors(
DatanodeStorageInfo[] storages) {
DatanodeDescriptor[] datanodes = new DatanodeDescriptor[storages.length];
for (int i = 0; i < storages.length; ++i) {
datanodes[i] = storages[i].getDatanodeDescriptor();
}
return datanodes;
}
public static String[] toStorageIDs(DatanodeStorageInfo[] storages) { public static String[] toStorageIDs(DatanodeStorageInfo[] storages) {
String[] storageIDs = new String[storages.length]; String[] storageIDs = new String[storages.length];
for(int i = 0; i < storageIDs.length; i++) { for(int i = 0; i < storageIDs.length; i++) {

View File

@ -76,7 +76,7 @@ void start() {
* @param block The corresponding block * @param block The corresponding block
* @param targets The DataNodes where replicas of the block should be placed * @param targets The DataNodes where replicas of the block should be placed
*/ */
void increment(Block block, DatanodeStorageInfo[] targets) { void increment(Block block, DatanodeDescriptor[] targets) {
synchronized (pendingReplications) { synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block); PendingBlockInfo found = pendingReplications.get(block);
if (found == null) { if (found == null) {
@ -95,14 +95,14 @@ void increment(Block block, DatanodeStorageInfo[] targets) {
* *
* @param The DataNode that finishes the replication * @param The DataNode that finishes the replication
*/ */
void decrement(Block block, DatanodeDescriptor dn, String storageID) { void decrement(Block block, DatanodeDescriptor dn) {
synchronized (pendingReplications) { synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block); PendingBlockInfo found = pendingReplications.get(block);
if (found != null) { if (found != null) {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Removing pending replication for " + block); LOG.debug("Removing pending replication for " + block);
} }
found.decrementReplicas(dn.getStorageInfo(storageID)); found.decrementReplicas(dn);
if (found.getNumReplicas() <= 0) { if (found.getNumReplicas() <= 0) {
pendingReplications.remove(block); pendingReplications.remove(block);
} }
@ -174,12 +174,12 @@ Block[] getTimedOutBlocks() {
*/ */
static class PendingBlockInfo { static class PendingBlockInfo {
private long timeStamp; private long timeStamp;
private final List<DatanodeStorageInfo> targets; private final List<DatanodeDescriptor> targets;
PendingBlockInfo(DatanodeStorageInfo[] targets) { PendingBlockInfo(DatanodeDescriptor[] targets) {
this.timeStamp = now(); this.timeStamp = now();
this.targets = targets == null ? new ArrayList<DatanodeStorageInfo>() this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
: new ArrayList<DatanodeStorageInfo>(Arrays.asList(targets)); : new ArrayList<DatanodeDescriptor>(Arrays.asList(targets));
} }
long getTimeStamp() { long getTimeStamp() {
@ -190,16 +190,16 @@ void setTimeStamp() {
timeStamp = now(); timeStamp = now();
} }
void incrementReplicas(DatanodeStorageInfo... newTargets) { void incrementReplicas(DatanodeDescriptor... newTargets) {
if (newTargets != null) { if (newTargets != null) {
for (DatanodeStorageInfo dn : newTargets) { for (DatanodeDescriptor dn : newTargets) {
targets.add(dn); targets.add(dn);
} }
} }
} }
void decrementReplicas(DatanodeStorageInfo storage) { void decrementReplicas(DatanodeDescriptor target) {
targets.remove(storage); targets.remove(target);
} }
int getNumReplicas() { int getNumReplicas() {

View File

@ -267,8 +267,6 @@ void reportBadBlocks(ExtendedBlock block,
/** /**
* Report received blocks and delete hints to the Namenode * Report received blocks and delete hints to the Namenode
* TODO: Fix reportReceivedDeletedBlocks to send reports per-volume.
*
* @throws IOException * @throws IOException
*/ */
private void reportReceivedDeletedBlocks() throws IOException { private void reportReceivedDeletedBlocks() throws IOException {

View File

@ -867,7 +867,7 @@ private void initStorage(final NamespaceInfo nsInfo) throws IOException {
final StorageInfo bpStorage = storage.getBPStorage(bpid); final StorageInfo bpStorage = storage.getBPStorage(bpid);
LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID() LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
+ ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion() + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()
+ ";nsInfo=" + nsInfo); + ";nsInfo=" + nsInfo + ";dnuuid=" + storage.getDatanodeUuid());
} }
synchronized(this) { synchronized(this) {

View File

@ -82,7 +82,7 @@ public String getAddress() {
public String toString() { public String toString() {
return getClass().getSimpleName() return getClass().getSimpleName()
+ "(" + getIpAddr() + "(" + getIpAddr()
+ ", storageID=" + getDatanodeUuid() + ", datanodeUuid=" + getDatanodeUuid()
+ ", infoPort=" + getInfoPort() + ", infoPort=" + getInfoPort()
+ ", ipcPort=" + getIpcPort() + ", ipcPort=" + getIpcPort()
+ ", storageInfo=" + storageInfo + ", storageInfo=" + storageInfo

View File

@ -67,7 +67,8 @@ public void testPendingReplication() {
Block block = new Block(i, i, 0); Block block = new Block(i, i, 0);
DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i]; DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i];
System.arraycopy(storages, 0, targets, 0, i); System.arraycopy(storages, 0, targets, 0, i);
pendingReplications.increment(block, targets); pendingReplications.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors(targets));
} }
assertEquals("Size of pendingReplications ", assertEquals("Size of pendingReplications ",
10, pendingReplications.size()); 10, pendingReplications.size());
@ -77,18 +78,18 @@ public void testPendingReplication() {
// remove one item and reinsert it // remove one item and reinsert it
// //
Block blk = new Block(8, 8, 0); Block blk = new Block(8, 8, 0);
pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor(), pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica
storages[7].getStorageID()); // removes one replica
assertEquals("pendingReplications.getNumReplicas ", assertEquals("pendingReplications.getNumReplicas ",
7, pendingReplications.getNumReplicas(blk)); 7, pendingReplications.getNumReplicas(blk));
for (int i = 0; i < 7; i++) { for (int i = 0; i < 7; i++) {
// removes all replicas // removes all replicas
pendingReplications.decrement(blk, storages[i].getDatanodeDescriptor(), pendingReplications.decrement(blk, storages[i].getDatanodeDescriptor());
storages[i].getStorageID());
} }
assertTrue(pendingReplications.size() == 9); assertTrue(pendingReplications.size() == 9);
pendingReplications.increment(blk, DFSTestUtil.createDatanodeStorageInfos(8)); pendingReplications.increment(blk,
DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(8)));
assertTrue(pendingReplications.size() == 10); assertTrue(pendingReplications.size() == 10);
// //
@ -116,7 +117,9 @@ public void testPendingReplication() {
for (int i = 10; i < 15; i++) { for (int i = 10; i < 15; i++) {
Block block = new Block(i, i, 0); Block block = new Block(i, i, 0);
pendingReplications.increment(block, DFSTestUtil.createDatanodeStorageInfos(i)); pendingReplications.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(i)));
} }
assertTrue(pendingReplications.size() == 15); assertTrue(pendingReplications.size() == 15);
@ -198,7 +201,7 @@ public void testBlockReceived() throws Exception {
DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP( DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP(
poolId); poolId);
StorageReceivedDeletedBlocks[] report = { StorageReceivedDeletedBlocks[] report = {
new StorageReceivedDeletedBlocks(dnR.getDatanodeUuid(), new StorageReceivedDeletedBlocks("Fake-storage-ID-Ignored",
new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo( new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) }; blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) };
cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report); cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
@ -215,7 +218,7 @@ public void testBlockReceived() throws Exception {
DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP( DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP(
poolId); poolId);
StorageReceivedDeletedBlocks[] report = StorageReceivedDeletedBlocks[] report =
{ new StorageReceivedDeletedBlocks(dnR.getDatanodeUuid(), { new StorageReceivedDeletedBlocks("Fake-storage-ID-Ignored",
new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo( new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) }; blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) };
cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report); cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);