HDFS-7596. NameNode should prune dead storages from storageMap. Contributed by Arpit Agarwal.

This commit is contained in:
cnauroth 2015-01-10 09:18:33 -08:00
parent 4d29142100
commit ef3c3a832c
4 changed files with 168 additions and 4 deletions

View File

@ -664,6 +664,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7579. Improve log reporting during block report rpc failure. HDFS-7579. Improve log reporting during block report rpc failure.
(Charles Lamb via cnauroth) (Charles Lamb via cnauroth)
HDFS-7596. NameNode should prune dead storages from storageMap.
(Arpit Agarwal via cnauroth)
Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -416,6 +416,46 @@ public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
if (checkFailedStorages) { if (checkFailedStorages) {
updateFailedStorage(failedStorageInfos); updateFailedStorage(failedStorageInfos);
} }
if (storageMap.size() != reports.length) {
pruneStorageMap(reports);
}
}
/**
* Remove stale storages from storageMap. We must not remove any storages
* as long as they have associated block replicas.
*/
private void pruneStorageMap(final StorageReport[] reports) {
if (LOG.isDebugEnabled()) {
LOG.debug("Number of storages reported in heartbeat=" + reports.length +
"; Number of storages in storageMap=" + storageMap.size());
}
HashMap<String, DatanodeStorageInfo> excessStorages;
synchronized (storageMap) {
// Init excessStorages with all known storages.
excessStorages = new HashMap<String, DatanodeStorageInfo>(storageMap);
// Remove storages that the DN reported in the heartbeat.
for (final StorageReport report : reports) {
excessStorages.remove(report.getStorage().getStorageID());
}
// For each remaining storage, remove it if there are no associated
// blocks.
for (final DatanodeStorageInfo storageInfo : excessStorages.values()) {
if (storageInfo.numBlocks() == 0) {
storageMap.remove(storageInfo.getStorageID());
LOG.info("Removed storage " + storageInfo + " from DataNode" + this);
} else if (LOG.isDebugEnabled()) {
// This can occur until all block reports are received.
LOG.debug("Deferring removal of stale storage " + storageInfo +
" with " + storageInfo.numBlocks() + " blocks");
}
}
}
} }
private void updateFailedStorage( private void updateFailedStorage(
@ -747,8 +787,6 @@ DatanodeStorageInfo updateStorage(DatanodeStorage s) {
// For backwards compatibility, make sure that the type and // For backwards compatibility, make sure that the type and
// state are updated. Some reports from older datanodes do // state are updated. Some reports from older datanodes do
// not include these fields so we may have assumed defaults. // not include these fields so we may have assumed defaults.
// This check can be removed in the next major release after
// 2.4.
storage.updateFromStorage(s); storage.updateFromStorage(s);
storageMap.put(storage.getStorageID(), storage); storageMap.put(storage.getStorageID(), storage);
} }

View File

@ -571,11 +571,13 @@ public void testSafeModeIBR() throws Exception {
reset(node); reset(node);
bm.getDatanodeManager().registerDatanode(nodeReg); bm.getDatanodeManager().registerDatanode(nodeReg);
verify(node).updateRegInfo(nodeReg); verify(node).updateRegInfo(nodeReg);
assertEquals(0, ds.getBlockReportCount()); // ready for report again
// send block report, should be processed after restart // send block report, should be processed after restart
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null)); new BlockListAsLongs(null, null));
// Reinitialize as registration with empty storage list pruned
// node.storageMap.
ds = node.getStorageInfos()[0];
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
} }

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.stat.inference.TestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
import java.io.IOException;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
public class TestNameNodePrunesMissingStorages {
static final Log LOG = LogFactory.getLog(TestNameNodePrunesMissingStorages.class);
private static void runTest(final String testCaseName,
final boolean createFiles,
final int numInitialStorages,
final int expectedStoragesAfterTest) throws IOException {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster
.Builder(conf)
.numDataNodes(1)
.storagesPerDatanode(numInitialStorages)
.build();
cluster.waitActive();
final DataNode dn0 = cluster.getDataNodes().get(0);
// Ensure NN knows about the storage.
final DatanodeID dnId = dn0.getDatanodeId();
final DatanodeDescriptor dnDescriptor =
cluster.getNamesystem().getBlockManager().getDatanodeManager().getDatanode(dnId);
assertThat(dnDescriptor.getStorageInfos().length, is(numInitialStorages));
final String bpid = cluster.getNamesystem().getBlockPoolId();
final DatanodeRegistration dnReg = dn0.getDNRegistrationForBP(bpid);
DataNodeTestUtils.triggerBlockReport(dn0);
if (createFiles) {
final Path path = new Path("/", testCaseName);
DFSTestUtil.createFile(
cluster.getFileSystem(), path, 1024, (short) 1, 0x1BAD5EED);
DataNodeTestUtils.triggerBlockReport(dn0);
}
// Generate a fake StorageReport that is missing one storage.
final StorageReport reports[] =
dn0.getFSDataset().getStorageReports(bpid);
final StorageReport prunedReports[] = new StorageReport[numInitialStorages - 1];
System.arraycopy(reports, 0, prunedReports, 0, prunedReports.length);
// Stop the DataNode and send fake heartbeat with missing storage.
cluster.stopDataNode(0);
cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0, 0);
// Check that the missing storage was pruned.
assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test that the NameNode prunes empty storage volumes that are no longer
* reported by the DataNode.
* @throws IOException
*/
@Test (timeout=300000)
public void testUnusedStorageIsPruned() throws IOException {
// Run the test with 1 storage, after the text expect 0 storages.
runTest(GenericTestUtils.getMethodName(), false, 1, 0);
}
/**
* Verify that the NameNode does not prune storages with blocks.
* @throws IOException
*/
@Test (timeout=300000)
public void testStorageWithBlocksIsNotPruned() throws IOException {
// Run the test with 1 storage, after the text still expect 1 storage.
runTest(GenericTestUtils.getMethodName(), true, 1, 1);
}
}