Revert 1161976 since the log message was incorrectly marked the issue as HDFS-349.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1161991 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
522fe14a89
commit
e680023f8b
@ -59,12 +59,10 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
|
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
|
|
||||||
@ -2004,7 +2002,7 @@ private void addToExcessReplicate(DatanodeInfo dn, Block block) {
|
|||||||
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
|
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
|
||||||
* removed block is still valid.
|
* removed block is still valid.
|
||||||
*/
|
*/
|
||||||
public void removeStoredBlock(Block block, DatanodeDescriptor node) {
|
private void removeStoredBlock(Block block, DatanodeDescriptor node) {
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
|
NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
|
||||||
+ block + " from " + node.getName());
|
+ block + " from " + node.getName());
|
||||||
@ -2123,48 +2121,27 @@ void addBlock(DatanodeDescriptor node, Block block, String delHint)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** The given node is reporting that it received/deleted certain blocks. */
|
/** The given node is reporting that it received a certain block. */
|
||||||
public void blockReceivedAndDeleted(final DatanodeID nodeID,
|
public void blockReceived(final DatanodeID nodeID, final String poolId,
|
||||||
final String poolId,
|
final Block block, final String delHint) throws IOException {
|
||||||
final ReceivedDeletedBlockInfo receivedAndDeletedBlocks[]
|
|
||||||
) throws IOException {
|
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
int received = 0;
|
|
||||||
int deleted = 0;
|
|
||||||
try {
|
try {
|
||||||
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
|
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
|
||||||
if (node == null || !node.isAlive) {
|
if (node == null || !node.isAlive) {
|
||||||
NameNode.stateChangeLog
|
final String s = block + " is received from dead or unregistered node "
|
||||||
.warn("BLOCK* blockReceivedDeleted"
|
+ nodeID.getName();
|
||||||
+ " is received from dead or unregistered node "
|
NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + s);
|
||||||
+ nodeID.getName());
|
throw new IOException(s);
|
||||||
throw new IOException(
|
}
|
||||||
"Got blockReceivedDeleted message from unregistered or dead node");
|
|
||||||
|
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
|
NameNode.stateChangeLog.debug("BLOCK* blockReceived: " + block
|
||||||
|
+ " is received from " + nodeID.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
|
addBlock(node, block, delHint);
|
||||||
if (receivedAndDeletedBlocks[i].isDeletedBlock()) {
|
|
||||||
removeStoredBlock(
|
|
||||||
receivedAndDeletedBlocks[i].getBlock(), node);
|
|
||||||
deleted++;
|
|
||||||
} else {
|
|
||||||
addBlock(node, receivedAndDeletedBlocks[i].getBlock(),
|
|
||||||
receivedAndDeletedBlocks[i].getDelHints());
|
|
||||||
received++;
|
|
||||||
}
|
|
||||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
|
||||||
NameNode.stateChangeLog.debug("BLOCK* block"
|
|
||||||
+ (receivedAndDeletedBlocks[i].isDeletedBlock() ? "Deleted"
|
|
||||||
: "Received") + ": " + receivedAndDeletedBlocks[i].getBlock()
|
|
||||||
+ " is received from " + nodeID.getName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
namesystem.writeUnlock();
|
namesystem.writeUnlock();
|
||||||
NameNode.stateChangeLog
|
|
||||||
.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from "
|
|
||||||
+ nodeID.getName() + " received: " + received + ", "
|
|
||||||
+ " deleted: " + deleted);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2339,7 +2316,6 @@ public int getTotalBlocks() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void removeBlock(Block block) {
|
public void removeBlock(Block block) {
|
||||||
block.setNumBytes(BlockCommand.NO_ACK);
|
|
||||||
addToInvalidates(block);
|
addToInvalidates(block);
|
||||||
corruptReplicas.removeFromCorruptReplicasMap(block);
|
corruptReplicas.removeFromCorruptReplicasMap(block);
|
||||||
blocksMap.removeBlock(block);
|
blocksMap.removeBlock(block);
|
||||||
|
@ -106,7 +106,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
||||||
@ -349,8 +348,6 @@ void refreshNamenodes(Configuration conf)
|
|||||||
ThreadGroup threadGroup = null;
|
ThreadGroup threadGroup = null;
|
||||||
long blockReportInterval;
|
long blockReportInterval;
|
||||||
boolean resetBlockReportTime = true;
|
boolean resetBlockReportTime = true;
|
||||||
long deleteReportInterval;
|
|
||||||
long lastDeletedReport = 0;
|
|
||||||
long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L;
|
long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L;
|
||||||
long heartBeatInterval;
|
long heartBeatInterval;
|
||||||
private boolean heartbeatsDisabledForTests = false;
|
private boolean heartbeatsDisabledForTests = false;
|
||||||
@ -461,7 +458,6 @@ private void initConfig(Configuration conf) {
|
|||||||
this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
|
this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
|
||||||
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
|
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
|
||||||
|
|
||||||
this.deleteReportInterval = 100 * heartBeatInterval;
|
|
||||||
// do we need to sync block file contents to disk when blockfile is closed?
|
// do we need to sync block file contents to disk when blockfile is closed?
|
||||||
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
|
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
|
||||||
DFS_DATANODE_SYNCONCLOSE_DEFAULT);
|
DFS_DATANODE_SYNCONCLOSE_DEFAULT);
|
||||||
@ -647,17 +643,6 @@ protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// calls specific to BP
|
|
||||||
protected void notifyNamenodeDeletedBlock(ExtendedBlock block) {
|
|
||||||
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
|
|
||||||
if (bpos != null) {
|
|
||||||
bpos.notifyNamenodeDeletedBlock(block);
|
|
||||||
} else {
|
|
||||||
LOG.warn("Cannot find BPOfferService for reporting block deleted for bpid="
|
|
||||||
+ block.getBlockPoolId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void reportBadBlocks(ExtendedBlock block) throws IOException{
|
public void reportBadBlocks(ExtendedBlock block) throws IOException{
|
||||||
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
|
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
|
||||||
if(bpos == null || bpos.bpNamenode == null) {
|
if(bpos == null || bpos.bpNamenode == null) {
|
||||||
@ -692,9 +677,8 @@ class BPOfferService implements Runnable {
|
|||||||
private String blockPoolId;
|
private String blockPoolId;
|
||||||
private long lastHeartbeat = 0;
|
private long lastHeartbeat = 0;
|
||||||
private volatile boolean initialized = false;
|
private volatile boolean initialized = false;
|
||||||
private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList
|
private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
|
||||||
= new LinkedList<ReceivedDeletedBlockInfo>();
|
private final LinkedList<String> delHints = new LinkedList<String>();
|
||||||
private volatile int pendingReceivedRequests = 0;
|
|
||||||
private volatile boolean shouldServiceRun = true;
|
private volatile boolean shouldServiceRun = true;
|
||||||
private boolean isBlockTokenInitialized = false;
|
private boolean isBlockTokenInitialized = false;
|
||||||
UpgradeManagerDatanode upgradeManager = null;
|
UpgradeManagerDatanode upgradeManager = null;
|
||||||
@ -864,33 +848,41 @@ private void reportBadBlocks(ExtendedBlock block) {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Report received blocks and delete hints to the Namenode
|
* Report received blocks and delete hints to the Namenode
|
||||||
*
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void reportReceivedDeletedBlocks() throws IOException {
|
private void reportReceivedBlocks() throws IOException {
|
||||||
|
//check if there are newly received blocks
|
||||||
// check if there are newly received blocks
|
Block [] blockArray=null;
|
||||||
ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
|
String [] delHintArray=null;
|
||||||
int currentReceivedRequestsCounter;
|
synchronized(receivedBlockList) {
|
||||||
synchronized (receivedAndDeletedBlockList) {
|
synchronized(delHints){
|
||||||
currentReceivedRequestsCounter = pendingReceivedRequests;
|
int numBlocks = receivedBlockList.size();
|
||||||
int numBlocks = receivedAndDeletedBlockList.size();
|
if (numBlocks > 0) {
|
||||||
if (numBlocks > 0) {
|
if(numBlocks!=delHints.size()) {
|
||||||
//
|
LOG.warn("Panic: receiveBlockList and delHints are not of " +
|
||||||
// Send newly-received and deleted blockids to namenode
|
"the same length" );
|
||||||
//
|
}
|
||||||
receivedAndDeletedBlockArray = receivedAndDeletedBlockList
|
//
|
||||||
.toArray(new ReceivedDeletedBlockInfo[numBlocks]);
|
// Send newly-received blockids to namenode
|
||||||
|
//
|
||||||
|
blockArray = receivedBlockList.toArray(new Block[numBlocks]);
|
||||||
|
delHintArray = delHints.toArray(new String[numBlocks]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (receivedAndDeletedBlockArray != null) {
|
if (blockArray != null) {
|
||||||
bpNamenode.blockReceivedAndDeleted(bpRegistration, blockPoolId,
|
if(delHintArray == null || delHintArray.length != blockArray.length ) {
|
||||||
receivedAndDeletedBlockArray);
|
LOG.warn("Panic: block array & delHintArray are not the same" );
|
||||||
synchronized (receivedAndDeletedBlockList) {
|
}
|
||||||
for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
|
bpNamenode.blockReceived(bpRegistration, blockPoolId, blockArray,
|
||||||
receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
|
delHintArray);
|
||||||
|
synchronized(receivedBlockList) {
|
||||||
|
synchronized(delHints){
|
||||||
|
for(int i=0; i<blockArray.length; i++) {
|
||||||
|
receivedBlockList.remove(blockArray[i]);
|
||||||
|
delHints.remove(delHintArray[i]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pendingReceivedRequests -= currentReceivedRequestsCounter;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -901,39 +893,23 @@ private void reportReceivedDeletedBlocks() throws IOException {
|
|||||||
* client? For now we don't.
|
* client? For now we don't.
|
||||||
*/
|
*/
|
||||||
void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
|
void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
|
||||||
if (block == null || delHint == null) {
|
if(block==null || delHint==null) {
|
||||||
throw new IllegalArgumentException(block == null ? "Block is null"
|
throw new IllegalArgumentException(
|
||||||
: "delHint is null");
|
block==null?"Block is null":"delHint is null");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!block.getBlockPoolId().equals(blockPoolId)) {
|
if (!block.getBlockPoolId().equals(blockPoolId)) {
|
||||||
LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
|
LOG.warn("BlockPool mismatch " + block.getBlockPoolId() +
|
||||||
+ blockPoolId);
|
" vs. " + blockPoolId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized (receivedAndDeletedBlockList) {
|
synchronized (receivedBlockList) {
|
||||||
receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
|
synchronized (delHints) {
|
||||||
.getLocalBlock(), delHint));
|
receivedBlockList.add(block.getLocalBlock());
|
||||||
pendingReceivedRequests++;
|
delHints.add(delHint);
|
||||||
receivedAndDeletedBlockList.notifyAll();
|
receivedBlockList.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
void notifyNamenodeDeletedBlock(ExtendedBlock block) {
|
|
||||||
if (block == null) {
|
|
||||||
throw new IllegalArgumentException("Block is null");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!block.getBlockPoolId().equals(blockPoolId)) {
|
|
||||||
LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
|
|
||||||
+ blockPoolId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (receivedAndDeletedBlockList) {
|
|
||||||
receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
|
|
||||||
.getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1051,8 +1027,7 @@ private synchronized void cleanUp() {
|
|||||||
* forever calling remote NameNode functions.
|
* forever calling remote NameNode functions.
|
||||||
*/
|
*/
|
||||||
private void offerService() throws Exception {
|
private void offerService() throws Exception {
|
||||||
LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
|
LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
|
||||||
+ deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
|
|
||||||
+ blockReportInterval + "msec" + " Initial delay: "
|
+ blockReportInterval + "msec" + " Initial delay: "
|
||||||
+ initialBlockReportDelay + "msec" + "; heartBeatInterval="
|
+ initialBlockReportDelay + "msec" + "; heartBeatInterval="
|
||||||
+ heartBeatInterval);
|
+ heartBeatInterval);
|
||||||
@ -1083,11 +1058,8 @@ private void offerService() throws Exception {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pendingReceivedRequests > 0
|
|
||||||
|| (startTime - lastDeletedReport > deleteReportInterval)) {
|
reportReceivedBlocks();
|
||||||
reportReceivedDeletedBlocks();
|
|
||||||
lastDeletedReport = startTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
DatanodeCommand cmd = blockReport();
|
DatanodeCommand cmd = blockReport();
|
||||||
processCommand(cmd);
|
processCommand(cmd);
|
||||||
@ -1103,10 +1075,10 @@ private void offerService() throws Exception {
|
|||||||
//
|
//
|
||||||
long waitTime = heartBeatInterval -
|
long waitTime = heartBeatInterval -
|
||||||
(System.currentTimeMillis() - lastHeartbeat);
|
(System.currentTimeMillis() - lastHeartbeat);
|
||||||
synchronized(receivedAndDeletedBlockList) {
|
synchronized(receivedBlockList) {
|
||||||
if (waitTime > 0 && pendingReceivedRequests == 0) {
|
if (waitTime > 0 && receivedBlockList.size() == 0) {
|
||||||
try {
|
try {
|
||||||
receivedAndDeletedBlockList.wait(waitTime);
|
receivedBlockList.wait(waitTime);
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
LOG.warn("BPOfferService for block pool="
|
LOG.warn("BPOfferService for block pool="
|
||||||
+ this.getBlockPoolId() + " received exception:" + ie);
|
+ this.getBlockPoolId() + " received exception:" + ie);
|
||||||
|
@ -1182,7 +1182,7 @@ public FSDataset(DataNode datanode, DataStorage storage, Configuration conf)
|
|||||||
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
||||||
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
|
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
|
||||||
}
|
}
|
||||||
asyncDiskService = new FSDatasetAsyncDiskService(this, roots);
|
asyncDiskService = new FSDatasetAsyncDiskService(roots);
|
||||||
registerMBean(storage.getStorageID());
|
registerMBean(storage.getStorageID());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2089,19 +2089,15 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
|
|||||||
}
|
}
|
||||||
File metaFile = getMetaFile(f, invalidBlks[i].getGenerationStamp());
|
File metaFile = getMetaFile(f, invalidBlks[i].getGenerationStamp());
|
||||||
long dfsBytes = f.length() + metaFile.length();
|
long dfsBytes = f.length() + metaFile.length();
|
||||||
|
|
||||||
// Delete the block asynchronously to make sure we can do it fast enough
|
// Delete the block asynchronously to make sure we can do it fast enough
|
||||||
asyncDiskService.deleteAsync(v, f, metaFile, dfsBytes,
|
asyncDiskService.deleteAsync(v, bpid, f, metaFile, dfsBytes,
|
||||||
new ExtendedBlock(bpid, invalidBlks[i]));
|
invalidBlks[i].toString());
|
||||||
}
|
}
|
||||||
if (error) {
|
if (error) {
|
||||||
throw new IOException("Error in deleting blocks.");
|
throw new IOException("Error in deleting blocks.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void notifyNamenodeDeletedBlock(ExtendedBlock block){
|
|
||||||
datanode.notifyNamenodeDeletedBlock(block);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Turn the block identifier into a filename; ignore generation stamp!!!
|
* Turn the block identifier into a filename; ignore generation stamp!!!
|
||||||
|
@ -28,8 +28,6 @@
|
|||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This class is a container of multiple thread pools, each for a volume,
|
* This class is a container of multiple thread pools, each for a volume,
|
||||||
@ -49,8 +47,6 @@
|
|||||||
*/
|
*/
|
||||||
class FSDatasetAsyncDiskService {
|
class FSDatasetAsyncDiskService {
|
||||||
|
|
||||||
final FSDataset dataset;
|
|
||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(FSDatasetAsyncDiskService.class);
|
public static final Log LOG = LogFactory.getLog(FSDatasetAsyncDiskService.class);
|
||||||
|
|
||||||
// ThreadPool core pool size
|
// ThreadPool core pool size
|
||||||
@ -74,8 +70,8 @@ class FSDatasetAsyncDiskService {
|
|||||||
*
|
*
|
||||||
* @param volumes The roots of the data volumes.
|
* @param volumes The roots of the data volumes.
|
||||||
*/
|
*/
|
||||||
FSDatasetAsyncDiskService(FSDataset dataset, File[] volumes) {
|
FSDatasetAsyncDiskService(File[] volumes) {
|
||||||
this.dataset = dataset;
|
|
||||||
// Create one ThreadPool per volume
|
// Create one ThreadPool per volume
|
||||||
for (int v = 0 ; v < volumes.length; v++) {
|
for (int v = 0 ; v < volumes.length; v++) {
|
||||||
final File vol = volumes[v];
|
final File vol = volumes[v];
|
||||||
@ -151,12 +147,13 @@ synchronized void shutdown() {
|
|||||||
* Delete the block file and meta file from the disk asynchronously, adjust
|
* Delete the block file and meta file from the disk asynchronously, adjust
|
||||||
* dfsUsed statistics accordingly.
|
* dfsUsed statistics accordingly.
|
||||||
*/
|
*/
|
||||||
void deleteAsync(FSDataset.FSVolume volume, File blockFile, File metaFile,
|
void deleteAsync(FSDataset.FSVolume volume, String bpid, File blockFile,
|
||||||
long dfsBytes, ExtendedBlock block) {
|
File metaFile, long dfsBytes, String blockName) {
|
||||||
DataNode.LOG.info("Scheduling block " + block.getLocalBlock().toString()
|
DataNode.LOG.info("Scheduling block " + blockName + " file " + blockFile
|
||||||
+ " file " + blockFile + " for deletion");
|
+ " for deletion");
|
||||||
ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(dataset,
|
ReplicaFileDeleteTask deletionTask =
|
||||||
volume, blockFile, metaFile, dfsBytes, block);
|
new ReplicaFileDeleteTask(volume, bpid, blockFile, metaFile, dfsBytes,
|
||||||
|
blockName);
|
||||||
execute(volume.getCurrentDir(), deletionTask);
|
execute(volume.getCurrentDir(), deletionTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -164,21 +161,21 @@ void deleteAsync(FSDataset.FSVolume volume, File blockFile, File metaFile,
|
|||||||
* as decrement the dfs usage of the volume.
|
* as decrement the dfs usage of the volume.
|
||||||
*/
|
*/
|
||||||
static class ReplicaFileDeleteTask implements Runnable {
|
static class ReplicaFileDeleteTask implements Runnable {
|
||||||
final FSDataset dataset;
|
|
||||||
final FSDataset.FSVolume volume;
|
final FSDataset.FSVolume volume;
|
||||||
|
final String blockPoolId;
|
||||||
final File blockFile;
|
final File blockFile;
|
||||||
final File metaFile;
|
final File metaFile;
|
||||||
final long dfsBytes;
|
final long dfsBytes;
|
||||||
final ExtendedBlock block;
|
final String blockName;
|
||||||
|
|
||||||
ReplicaFileDeleteTask(FSDataset dataset, FSDataset.FSVolume volume, File blockFile,
|
ReplicaFileDeleteTask(FSDataset.FSVolume volume, String bpid,
|
||||||
File metaFile, long dfsBytes, ExtendedBlock block) {
|
File blockFile, File metaFile, long dfsBytes, String blockName) {
|
||||||
this.dataset = dataset;
|
|
||||||
this.volume = volume;
|
this.volume = volume;
|
||||||
|
this.blockPoolId = bpid;
|
||||||
this.blockFile = blockFile;
|
this.blockFile = blockFile;
|
||||||
this.metaFile = metaFile;
|
this.metaFile = metaFile;
|
||||||
this.dfsBytes = dfsBytes;
|
this.dfsBytes = dfsBytes;
|
||||||
this.block = block;
|
this.blockName = blockName;
|
||||||
}
|
}
|
||||||
|
|
||||||
FSDataset.FSVolume getVolume() {
|
FSDataset.FSVolume getVolume() {
|
||||||
@ -188,24 +185,21 @@ FSDataset.FSVolume getVolume() {
|
|||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
// Called in AsyncDiskService.execute for displaying error messages.
|
// Called in AsyncDiskService.execute for displaying error messages.
|
||||||
return "deletion of block " + block.getBlockPoolId() + " "
|
return "deletion of block " + blockPoolId + " " + blockName
|
||||||
+ block.getLocalBlock().toString() + " with block file " + blockFile
|
+ " with block file " + blockFile + " and meta file " + metaFile
|
||||||
+ " and meta file " + metaFile + " from volume " + volume;
|
+ " from volume " + volume;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
|
if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
|
||||||
DataNode.LOG.warn("Unexpected error trying to delete block "
|
DataNode.LOG.warn("Unexpected error trying to delete block "
|
||||||
+ block.getBlockPoolId() + " " + block.getLocalBlock().toString()
|
+ blockPoolId + " " + blockName + " at file " + blockFile
|
||||||
+ " at file " + blockFile + ". Ignored.");
|
+ ". Ignored.");
|
||||||
} else {
|
} else {
|
||||||
if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
|
volume.decDfsUsed(blockPoolId, dfsBytes);
|
||||||
dataset.notifyNamenodeDeletedBlock(block);
|
DataNode.LOG.info("Deleted block " + blockPoolId + " " + blockName
|
||||||
}
|
+ " at file " + blockFile);
|
||||||
volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
|
|
||||||
DataNode.LOG.info("Deleted block " + block.getBlockPoolId() + " "
|
|
||||||
+ block.getLocalBlock().toString() + " at file " + blockFile);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -83,7 +83,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
@ -1210,16 +1209,17 @@ public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override // DatanodeProtocol
|
@Override // DatanodeProtocol
|
||||||
public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
|
public void blockReceived(DatanodeRegistration nodeReg, String poolId,
|
||||||
ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException {
|
Block blocks[], String delHints[]) throws IOException {
|
||||||
verifyRequest(nodeReg);
|
verifyRequest(nodeReg);
|
||||||
if(stateChangeLog.isDebugEnabled()) {
|
if(stateChangeLog.isDebugEnabled()) {
|
||||||
stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
|
stateChangeLog.debug("*BLOCK* NameNode.blockReceived: "
|
||||||
+"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length
|
+"from "+nodeReg.getName()+" "+blocks.length+" blocks.");
|
||||||
+" blocks.");
|
}
|
||||||
|
for (int i = 0; i < blocks.length; i++) {
|
||||||
|
namesystem.getBlockManager().blockReceived(
|
||||||
|
nodeReg, poolId, blocks[i], delHints[i]);
|
||||||
}
|
}
|
||||||
namesystem.getBlockManager().blockReceivedAndDeleted(
|
|
||||||
nodeReg, poolId, receivedAndDeletedBlocks);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // DatanodeProtocol
|
@Override // DatanodeProtocol
|
||||||
|
@ -44,16 +44,6 @@
|
|||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class BlockCommand extends DatanodeCommand {
|
public class BlockCommand extends DatanodeCommand {
|
||||||
|
|
||||||
/**
|
|
||||||
* This constant is used to indicate that the block deletion does not need
|
|
||||||
* explicit ACK from the datanode. When a block is put into the list of blocks
|
|
||||||
* to be deleted, it's size is set to this constant. We assume that no block
|
|
||||||
* would actually have this size. Otherwise, we would miss ACKs for blocks
|
|
||||||
* with such size. Positive number is used for compatibility reasons.
|
|
||||||
*/
|
|
||||||
public static final long NO_ACK = Long.MAX_VALUE;
|
|
||||||
|
|
||||||
String poolId;
|
String poolId;
|
||||||
Block blocks[];
|
Block blocks[];
|
||||||
DatanodeInfo targets[][];
|
DatanodeInfo targets[][];
|
||||||
|
@ -126,19 +126,17 @@ public DatanodeCommand blockReport(DatanodeRegistration registration,
|
|||||||
long[] blocks) throws IOException;
|
long[] blocks) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* blockReceivedAndDeleted() allows the DataNode to tell the NameNode about
|
* blockReceived() allows the DataNode to tell the NameNode about
|
||||||
* recently-received and -deleted block data.
|
* recently-received block data, with a hint for pereferred replica
|
||||||
*
|
* to be deleted when there is any excessive blocks.
|
||||||
* For the case of received blocks, a hint for preferred replica to be
|
|
||||||
* deleted when there is any excessive blocks is provided.
|
|
||||||
* For example, whenever client code
|
* For example, whenever client code
|
||||||
* writes a new Block here, or another DataNode copies a Block to
|
* writes a new Block here, or another DataNode copies a Block to
|
||||||
* this DataNode, it will call blockReceived().
|
* this DataNode, it will call blockReceived().
|
||||||
*/
|
*/
|
||||||
public void blockReceivedAndDeleted(DatanodeRegistration registration,
|
public void blockReceived(DatanodeRegistration registration,
|
||||||
String poolId,
|
String poolId,
|
||||||
ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks)
|
Block blocks[],
|
||||||
throws IOException;
|
String[] delHints) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* errorReport() tells the NameNode about something that has gone
|
* errorReport() tells the NameNode about something that has gone
|
||||||
|
@ -1,101 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.protocol;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.DataOutput;
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.io.Writable;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A data structure to store Block and delHints together, used to send
|
|
||||||
* received/deleted ACKs.
|
|
||||||
*/
|
|
||||||
public class ReceivedDeletedBlockInfo implements Writable {
|
|
||||||
Block block;
|
|
||||||
String delHints;
|
|
||||||
|
|
||||||
public final static String TODELETE_HINT = "-";
|
|
||||||
|
|
||||||
public ReceivedDeletedBlockInfo() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public ReceivedDeletedBlockInfo(Block blk, String delHints) {
|
|
||||||
this.block = blk;
|
|
||||||
this.delHints = delHints;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Block getBlock() {
|
|
||||||
return this.block;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setBlock(Block blk) {
|
|
||||||
this.block = blk;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getDelHints() {
|
|
||||||
return this.delHints;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setDelHints(String hints) {
|
|
||||||
this.delHints = hints;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean equals(Object o) {
|
|
||||||
if (!(o instanceof ReceivedDeletedBlockInfo)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
ReceivedDeletedBlockInfo other = (ReceivedDeletedBlockInfo) o;
|
|
||||||
return this.block.equals(other.getBlock())
|
|
||||||
&& this.delHints.equals(other.delHints);
|
|
||||||
}
|
|
||||||
|
|
||||||
public int hashCode() {
|
|
||||||
assert false : "hashCode not designed";
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean blockEquals(Block b) {
|
|
||||||
return this.block.equals(b);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isDeletedBlock() {
|
|
||||||
return delHints.equals(TODELETE_HINT);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void write(DataOutput out) throws IOException {
|
|
||||||
this.block.write(out);
|
|
||||||
Text.writeString(out, this.delHints);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void readFields(DataInput in) throws IOException {
|
|
||||||
this.block = new Block();
|
|
||||||
this.block.readFields(in);
|
|
||||||
this.delHints = Text.readString(in);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String toString() {
|
|
||||||
return block.toString() + ", delHint: " + delHints;
|
|
||||||
}
|
|
||||||
}
|
|
@ -47,7 +47,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
@ -878,10 +877,10 @@ private int transferBlocks( Block blocks[],
|
|||||||
receivedDNReg.setStorageInfo(
|
receivedDNReg.setStorageInfo(
|
||||||
new DataStorage(nsInfo, dnInfo.getStorageID()));
|
new DataStorage(nsInfo, dnInfo.getStorageID()));
|
||||||
receivedDNReg.setInfoPort(dnInfo.getInfoPort());
|
receivedDNReg.setInfoPort(dnInfo.getInfoPort());
|
||||||
nameNode.blockReceivedAndDeleted(receivedDNReg, nameNode
|
nameNode.blockReceived( receivedDNReg,
|
||||||
.getNamesystem().getBlockPoolId(),
|
nameNode.getNamesystem().getBlockPoolId(),
|
||||||
new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
|
new Block[] {blocks[i]},
|
||||||
blocks[i], DataNode.EMPTY_DEL_HINT) });
|
new String[] {DataNode.EMPTY_DEL_HINT});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return blocks.length;
|
return blocks.length;
|
||||||
@ -993,10 +992,11 @@ private ExtendedBlock addBlocks(String fileName, String clientName)
|
|||||||
for(DatanodeInfo dnInfo : loc.getLocations()) {
|
for(DatanodeInfo dnInfo : loc.getLocations()) {
|
||||||
int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
|
int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
|
||||||
datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock());
|
datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock());
|
||||||
nameNode.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
|
nameNode.blockReceived(
|
||||||
.getBlock().getBlockPoolId(),
|
datanodes[dnIdx].dnRegistration,
|
||||||
new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(loc
|
loc.getBlock().getBlockPoolId(),
|
||||||
.getBlock().getLocalBlock(), "") });
|
new Block[] {loc.getBlock().getLocalBlock()},
|
||||||
|
new String[] {""});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return prevBlock;
|
return prevBlock;
|
||||||
|
@ -36,7 +36,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -105,12 +104,12 @@ public void testDeadDatanode() throws Exception {
|
|||||||
|
|
||||||
DatanodeProtocol dnp = cluster.getNameNode();
|
DatanodeProtocol dnp = cluster.getNameNode();
|
||||||
|
|
||||||
ReceivedDeletedBlockInfo[] blocks = { new ReceivedDeletedBlockInfo(
|
Block[] blocks = new Block[] { new Block(0) };
|
||||||
new Block(0), "") };
|
String[] delHints = new String[] { "" };
|
||||||
|
|
||||||
// Ensure blockReceived call from dead datanode is rejected with IOException
|
// Ensure blockReceived call from dead datanode is rejected with IOException
|
||||||
try {
|
try {
|
||||||
dnp.blockReceivedAndDeleted(reg, poolId, blocks);
|
dnp.blockReceived(reg, poolId, blocks, delHints);
|
||||||
Assert.fail("Expected IOException is not thrown");
|
Assert.fail("Expected IOException is not thrown");
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
// Expected
|
// Expected
|
||||||
|
Loading…
Reference in New Issue
Block a user