HDFS-5134. Move blockContentsStale, heartbeatedSinceFailover and firstBlockReport from DatanodeDescriptor to DatanodeStorageInfo; and fix a synchronization problem in DatanodeStorageInfo.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1520938 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-09-08 22:53:36 +00:00
parent bbce64c8c5
commit 282be1b38e
9 changed files with 124 additions and 97 deletions

View File

@ -17,3 +17,7 @@ IMPROVEMENTS:
(Junping Du via szetszwo) (Junping Du via szetszwo)
HDFS-5009. Include storage information in the LocatedBlock. (szetszwo) HDFS-5009. Include storage information in the LocatedBlock. (szetszwo)
HDFS-5134. Move blockContentsStale, heartbeatedSinceFailover and
firstBlockReport from DatanodeDescriptor to DatanodeStorageInfo; and
fix a synchronization problem in DatanodeStorageInfo. (szetszwo)

View File

@ -324,14 +324,6 @@ BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
return head; return head;
} }
int listCount(DatanodeStorageInfo storage) {
int count = 0;
for(BlockInfo cur = this; cur != null;
cur = cur.getNext(cur.findStorageInfo(storage)))
count++;
return count;
}
/** /**
* BlockInfo represents a block that is not being constructed. * BlockInfo represents a block that is not being constructed.
* In order to start modifying the block, the BlockInfo should be converted * In order to start modifying the block, the BlockInfo should be converted

View File

@ -510,7 +510,7 @@ private void dumpBlockMeta(Block block, PrintWriter out) {
state = "(decommissioned)"; state = "(decommissioned)";
} }
if (node.areBlockContentsStale()) { if (storage.areBlockContentsStale()) {
state += " (block deletions maybe out of date)"; state += " (block deletions maybe out of date)";
} }
out.print(" " + node + state + " : "); out.print(" " + node + state + " : ");
@ -993,7 +993,14 @@ void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
// failover, then we may have been holding up on processing // failover, then we may have been holding up on processing
// over-replicated blocks because of it. But we can now // over-replicated blocks because of it. But we can now
// process those blocks. // process those blocks.
if (node.areBlockContentsStale()) { boolean stale = false;
for(DatanodeStorageInfo storage : node.getStorageInfos()) {
if (storage.areBlockContentsStale()) {
stale = true;
break;
}
}
if (stale) {
rescanPostponedMisreplicatedBlocks(); rescanPostponedMisreplicatedBlocks();
} }
} }
@ -1616,14 +1623,16 @@ public void processReport(final DatanodeID nodeID,
// To minimize startup time, we discard any second (or later) block reports // To minimize startup time, we discard any second (or later) block reports
// that we receive while still in startup phase. // that we receive while still in startup phase.
if (namesystem.isInStartupSafeMode() && !node.isFirstBlockReport()) { final DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());
if (namesystem.isInStartupSafeMode()
&& storageInfo.getBlockReportCount() > 0) {
blockLog.info("BLOCK* processReport: " blockLog.info("BLOCK* processReport: "
+ "discarded non-initial block report from " + nodeID + "discarded non-initial block report from " + nodeID
+ " because namenode still in startup phase"); + " because namenode still in startup phase");
return; return;
} }
if (node.numBlocks() == 0) { if (storageInfo.numBlocks() == 0) {
// The first block report can be processed a lot more efficiently than // The first block report can be processed a lot more efficiently than
// ordinary block reports. This shortens restart times. // ordinary block reports. This shortens restart times.
processFirstBlockReport(node, storage.getStorageID(), newReport); processFirstBlockReport(node, storage.getStorageID(), newReport);
@ -1633,9 +1642,9 @@ public void processReport(final DatanodeID nodeID,
// Now that we have an up-to-date block report, we know that any // Now that we have an up-to-date block report, we know that any
// deletions from a previous NN iteration have been accounted for. // deletions from a previous NN iteration have been accounted for.
boolean staleBefore = node.areBlockContentsStale(); boolean staleBefore = storageInfo.areBlockContentsStale();
node.receivedBlockReport(); storageInfo.receivedBlockReport();
if (staleBefore && !node.areBlockContentsStale()) { if (staleBefore && !storageInfo.areBlockContentsStale()) {
LOG.info("BLOCK* processReport: Received first block report from " LOG.info("BLOCK* processReport: Received first block report from "
+ node + " after starting up or becoming active. Its block " + node + " after starting up or becoming active. Its block "
+ "contents are no longer considered stale"); + "contents are no longer considered stale");
@ -1747,7 +1756,7 @@ private void processFirstBlockReport(final DatanodeDescriptor node,
final BlockListAsLongs report) throws IOException { final BlockListAsLongs report) throws IOException {
if (report == null) return; if (report == null) return;
assert (namesystem.hasWriteLock()); assert (namesystem.hasWriteLock());
assert (node.numBlocks() == 0); assert (node.getStorageInfo(storageID).numBlocks() == 0);
BlockReportIterator itBR = report.getBlockReportIterator(); BlockReportIterator itBR = report.getBlockReportIterator();
while(itBR.hasNext()) { while(itBR.hasNext()) {
@ -2421,10 +2430,11 @@ private void processOverReplicatedBlock(final Block block,
.getNodes(block); .getNodes(block);
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (cur.areBlockContentsStale()) { if (storage.areBlockContentsStale()) {
LOG.info("BLOCK* processOverReplicatedBlock: " + LOG.info("BLOCK* processOverReplicatedBlock: " +
"Postponing processing of over-replicated " + "Postponing processing of over-replicated " +
block + " since datanode " + cur + " does not yet have up-to-date " + block + " since storage + " + storage
+ "datanode " + cur + " does not yet have up-to-date " +
"block information."); "block information.");
postponeBlock(block); postponeBlock(block);
return; return;
@ -2756,7 +2766,7 @@ public NumberReplicas countNodes(Block b) {
live++; live++;
} }
} }
if (node.areBlockContentsStale()) { if (storage.areBlockContentsStale()) {
stale++; stale++;
} }
} }

View File

@ -106,23 +106,6 @@ synchronized void clear() {
public boolean isAlive = false; public boolean isAlive = false;
public boolean needKeyUpdate = false; public boolean needKeyUpdate = false;
/**
* Set to false on any NN failover, and reset to true
* whenever a block report is received.
*/
private boolean heartbeatedSinceFailover = false;
/**
* At startup or at any failover, the DNs in the cluster may
* have pending block deletions from a previous incarnation
* of the NameNode. Thus, we consider their block contents
* stale until we have received a block report. When a DN
* is considered stale, any replicas on it are transitively
* considered stale. If any block has at least one stale replica,
* then no invalidations will be processed for this block.
* See HDFS-1972.
*/
private boolean blockContentsStale = true;
// A system administrator can tune the balancer bandwidth parameter // A system administrator can tune the balancer bandwidth parameter
// (dfs.balance.bandwidthPerSec) dynamically by calling // (dfs.balance.bandwidthPerSec) dynamically by calling
@ -151,9 +134,6 @@ synchronized void clear() {
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
private int volumeFailures = 0; private int volumeFailures = 0;
/** Set to false after processing first block report */
private boolean firstBlockReport = true;
/** /**
* When set to true, the node is not in include list and is not allowed * When set to true, the node is not in include list and is not allowed
* to communicate with the namenode * to communicate with the namenode
@ -234,11 +214,15 @@ public boolean addBlock(String storageID, BlockInfo b) {
return false; return false;
} }
DatanodeStorageInfo getStorageInfo(String storageID) { public DatanodeStorageInfo getStorageInfo(String storageID) {
synchronized (storageMap) {
return storageMap.get(storageID); return storageMap.get(storageID);
} }
}
public Collection<DatanodeStorageInfo> getStorageInfos() { public Collection<DatanodeStorageInfo> getStorageInfos() {
return storageMap.values(); synchronized (storageMap) {
return new ArrayList<DatanodeStorageInfo>(storageMap.values());
}
} }
/** /**
@ -314,9 +298,8 @@ public void clearBlockQueues() {
} }
public int numBlocks() { public int numBlocks() {
// TODO: synchronization
int blocks = 0; int blocks = 0;
for (DatanodeStorageInfo entry : storageMap.values()) { for (DatanodeStorageInfo entry : getStorageInfos()) {
blocks += entry.numBlocks(); blocks += entry.numBlocks();
} }
return blocks; return blocks;
@ -334,7 +317,9 @@ public void updateHeartbeat(long capacity, long dfsUsed, long remaining,
setXceiverCount(xceiverCount); setXceiverCount(xceiverCount);
setLastUpdate(Time.now()); setLastUpdate(Time.now());
this.volumeFailures = volFailures; this.volumeFailures = volFailures;
this.heartbeatedSinceFailover = true; for(DatanodeStorageInfo storage : getStorageInfos()) {
storage.receivedHeartbeat();
}
rollBlocksScheduled(getLastUpdate()); rollBlocksScheduled(getLastUpdate());
} }
@ -380,10 +365,10 @@ private void update() {
} }
Iterator<BlockInfo> getBlockIterator() { Iterator<BlockInfo> getBlockIterator() {
return new BlockIterator(storageMap.values()); return new BlockIterator(getStorageInfos());
} }
Iterator<BlockInfo> getBlockIterator(final String storageID) { Iterator<BlockInfo> getBlockIterator(final String storageID) {
return new BlockIterator(storageMap.get(storageID)); return new BlockIterator(getStorageInfo(storageID));
} }
/** /**
@ -585,7 +570,11 @@ public int getVolumeFailures() {
@Override @Override
public void updateRegInfo(DatanodeID nodeReg) { public void updateRegInfo(DatanodeID nodeReg) {
super.updateRegInfo(nodeReg); super.updateRegInfo(nodeReg);
firstBlockReport = true; // must re-process IBR after re-registration
// must re-process IBR after re-registration
for(DatanodeStorageInfo storage : getStorageInfos()) {
storage.setBlockReportCount(0);
}
} }
/** /**
@ -602,26 +591,6 @@ public void setBalancerBandwidth(long bandwidth) {
this.bandwidth = bandwidth; this.bandwidth = bandwidth;
} }
public boolean areBlockContentsStale() {
return blockContentsStale;
}
public void markStaleAfterFailover() {
heartbeatedSinceFailover = false;
blockContentsStale = true;
}
public void receivedBlockReport() {
if (heartbeatedSinceFailover) {
blockContentsStale = false;
}
firstBlockReport = false;
}
boolean isFirstBlockReport() {
return firstBlockReport;
}
@Override @Override
public String dumpDatanode() { public String dumpDatanode() {
StringBuilder sb = new StringBuilder(super.dumpDatanode()); StringBuilder sb = new StringBuilder(super.dumpDatanode());
@ -641,7 +610,8 @@ public String dumpDatanode() {
} }
DatanodeStorageInfo updateStorage(DatanodeStorage s) { DatanodeStorageInfo updateStorage(DatanodeStorage s) {
DatanodeStorageInfo storage = getStorageInfo(s.getStorageID()); synchronized (storageMap) {
DatanodeStorageInfo storage = storageMap.get(s.getStorageID());
if (storage == null) { if (storage == null) {
storage = new DatanodeStorageInfo(this, s); storage = new DatanodeStorageInfo(this, s);
storageMap.put(s.getStorageID(), storage); storageMap.put(s.getStorageID(), storage);
@ -651,3 +621,4 @@ DatanodeStorageInfo updateStorage(DatanodeStorage s) {
return storage; return storage;
} }
} }
}

View File

@ -713,8 +713,10 @@ boolean checkDecommissionState(DatanodeDescriptor node) {
/** Start decommissioning the specified datanode. */ /** Start decommissioning the specified datanode. */
private void startDecommission(DatanodeDescriptor node) { private void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
LOG.info("Start Decommissioning " + node + " with " + for (DatanodeStorageInfo storage : node.getStorageInfos()) {
node.numBlocks() + " blocks"); LOG.info("Start Decommissioning " + node + " " + storage
+ " with " + storage.numBlocks() + " blocks");
}
heartbeatManager.startDecommission(node); heartbeatManager.startDecommission(node);
node.decommissioningStatus.setStartTime(now()); node.decommissioningStatus.setStartTime(now());
@ -1345,7 +1347,9 @@ public void markAllDatanodesStale() {
LOG.info("Marking all datandoes as stale"); LOG.info("Marking all datandoes as stale");
synchronized (datanodeMap) { synchronized (datanodeMap) {
for (DatanodeDescriptor dn : datanodeMap.values()) { for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.markStaleAfterFailover(); for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
storage.markStaleAfterFailover();
}
} }
} }
} }

View File

@ -78,6 +78,26 @@ public void remove() {
private long dfsUsed; private long dfsUsed;
private long remaining; private long remaining;
private volatile BlockInfo blockList = null; private volatile BlockInfo blockList = null;
private int numBlocks = 0;
/** The number of block reports received */
private int blockReportCount = 0;
/**
* Set to false on any NN failover, and reset to true
* whenever a block report is received.
*/
private boolean heartbeatedSinceFailover = false;
/**
* At startup or at failover, the storages in the cluster may have pending
* block deletions from a previous incarnation of the NameNode. The block
* contents are considered as stale until a block report is received. When a
* storage is considered as stale, the replicas on it are also considered as
* stale. If any block has at least one stale replica, then no invalidations
* will be processed for this block. See HDFS-1972.
*/
private boolean blockContentsStale = true;
public DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) { public DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) {
this.dn = dn; this.dn = dn;
@ -86,6 +106,34 @@ public DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) {
this.state = s.getState(); this.state = s.getState();
} }
int getBlockReportCount() {
return blockReportCount;
}
void setBlockReportCount(int blockReportCount) {
this.blockReportCount = blockReportCount;
}
public boolean areBlockContentsStale() {
return blockContentsStale;
}
public void markStaleAfterFailover() {
heartbeatedSinceFailover = false;
blockContentsStale = true;
}
public void receivedHeartbeat() {
heartbeatedSinceFailover = true;
}
public void receivedBlockReport() {
if (heartbeatedSinceFailover) {
blockContentsStale = false;
}
blockReportCount++;
}
public void setUtilization(long capacity, long dfsUsed, long remaining) { public void setUtilization(long capacity, long dfsUsed, long remaining) {
this.capacity = capacity; this.capacity = capacity;
this.dfsUsed = dfsUsed; this.dfsUsed = dfsUsed;
@ -127,16 +175,22 @@ public boolean addBlock(BlockInfo b) {
return false; return false;
// add to the head of the data-node list // add to the head of the data-node list
blockList = b.listInsert(blockList, this); blockList = b.listInsert(blockList, this);
numBlocks++;
return true; return true;
} }
public boolean removeBlock(BlockInfo b) { public boolean removeBlock(BlockInfo b) {
blockList = b.listRemove(blockList, this); blockList = b.listRemove(blockList, this);
return b.removeStorage(this); if (b.removeStorage(this)) {
numBlocks--;
return true;
} else {
return false;
}
} }
public int numBlocks() { public int numBlocks() {
return blockList == null ? 0 : blockList.listCount(this); return numBlocks;
} }
Iterator<BlockInfo> getBlockIterator() { Iterator<BlockInfo> getBlockIterator() {

View File

@ -90,7 +90,6 @@
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
@ -979,8 +978,6 @@ public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
bm.processReport(nodeReg, r.getStorage(), poolId, blocks); bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
} }
DatanodeDescriptor datanode = bm.getDatanodeManager().getDatanode(nodeReg);
datanode.receivedBlockReport();
if (nn.getFSImage().isUpgradeFinalized() && !nn.isStandbyState()) if (nn.getFSImage().isUpgradeFinalized() && !nn.isStandbyState())
return new FinalizeCommand(poolId); return new FinalizeCommand(poolId);
return null; return null;

View File

@ -20,7 +20,6 @@
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;

View File

@ -523,33 +523,30 @@ public void testSafeModeIBR() throws Exception {
bm.getDatanodeManager().registerDatanode(nodeReg); bm.getDatanodeManager().registerDatanode(nodeReg);
bm.getDatanodeManager().addDatanode(node); // swap in spy bm.getDatanodeManager().addDatanode(node); // swap in spy
assertEquals(node, bm.getDatanodeManager().getDatanode(node)); assertEquals(node, bm.getDatanodeManager().getDatanode(node));
assertTrue(node.isFirstBlockReport()); assertEquals(0, ds.getBlockReportCount());
// send block report, should be processed // send block report, should be processed
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
new BlockListAsLongs(null, null)); new BlockListAsLongs(null, null));
verify(node).receivedBlockReport(); assertEquals(1, ds.getBlockReportCount());
assertFalse(node.isFirstBlockReport());
// send block report again, should NOT be processed // send block report again, should NOT be processed
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
new BlockListAsLongs(null, null)); new BlockListAsLongs(null, null));
verify(node, never()).receivedBlockReport(); assertEquals(1, ds.getBlockReportCount());
assertFalse(node.isFirstBlockReport());
// re-register as if node restarted, should update existing node // re-register as if node restarted, should update existing node
bm.getDatanodeManager().removeDatanode(node); bm.getDatanodeManager().removeDatanode(node);
reset(node); reset(node);
bm.getDatanodeManager().registerDatanode(nodeReg); bm.getDatanodeManager().registerDatanode(nodeReg);
verify(node).updateRegInfo(nodeReg); verify(node).updateRegInfo(nodeReg);
assertTrue(node.isFirstBlockReport()); // ready for report again assertEquals(0, ds.getBlockReportCount()); // ready for report again
// send block report, should be processed after restart // send block report, should be processed after restart
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
new BlockListAsLongs(null, null)); new BlockListAsLongs(null, null));
verify(node).receivedBlockReport(); assertEquals(1, ds.getBlockReportCount());
assertFalse(node.isFirstBlockReport());
} }
@Test @Test
@ -570,13 +567,12 @@ public void testSafeModeIBRAfterIncremental() throws Exception {
bm.getDatanodeManager().registerDatanode(nodeReg); bm.getDatanodeManager().registerDatanode(nodeReg);
bm.getDatanodeManager().addDatanode(node); // swap in spy bm.getDatanodeManager().addDatanode(node); // swap in spy
assertEquals(node, bm.getDatanodeManager().getDatanode(node)); assertEquals(node, bm.getDatanodeManager().getDatanode(node));
assertTrue(node.isFirstBlockReport()); assertEquals(0, ds.getBlockReportCount());
// send block report while pretending to already have blocks // send block report while pretending to already have blocks
reset(node); reset(node);
doReturn(1).when(node).numBlocks(); doReturn(1).when(node).numBlocks();
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
new BlockListAsLongs(null, null)); new BlockListAsLongs(null, null));
verify(node).receivedBlockReport(); assertEquals(1, ds.getBlockReportCount());
assertFalse(node.isFirstBlockReport());
} }
} }