HDFS-11673. [READ] Handle failures of Datanode with PROVIDED storage
This commit is contained in:
parent
55ade54b8e
commit
546b95f484
@ -24,6 +24,7 @@
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
@ -188,8 +189,15 @@ DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) {
|
||||
int len = getCapacity();
|
||||
for(int idx = 0; idx < len; idx++) {
|
||||
DatanodeStorageInfo cur = getStorageInfo(idx);
|
||||
if(cur != null && cur.getDatanodeDescriptor() == dn) {
|
||||
return cur;
|
||||
if(cur != null) {
|
||||
if (cur.getStorageType() == StorageType.PROVIDED) {
|
||||
//if block resides on provided storage, only match the storage ids
|
||||
if (dn.getStorageInfo(cur.getStorageID()) != null) {
|
||||
return cur;
|
||||
}
|
||||
} else if (cur.getDatanodeDescriptor() == dn) {
|
||||
return cur;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
@ -1514,6 +1514,7 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
|
||||
|
||||
/** Remove the blocks associated to the given datanode. */
|
||||
void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
|
||||
providedStorageMap.removeDatanode(node);
|
||||
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
|
||||
final Iterator<BlockInfo> it = storage.getBlockIterator();
|
||||
//add the BlockInfos to a new collection as the
|
||||
@ -2462,7 +2463,7 @@ public boolean processReport(final DatanodeID nodeID,
|
||||
// !#! Register DN with provided storage, not with storage owned by DN
|
||||
// !#! DN should still have a ref to the DNStorageInfo
|
||||
DatanodeStorageInfo storageInfo =
|
||||
providedStorageMap.getStorage(node, storage);
|
||||
providedStorageMap.getStorage(node, storage, context);
|
||||
|
||||
if (storageInfo == null) {
|
||||
// We handle this for backwards compatibility.
|
||||
@ -2589,7 +2590,7 @@ void rescanPostponedMisreplicatedBlocks() {
|
||||
}
|
||||
}
|
||||
|
||||
private Collection<Block> processReport(
|
||||
Collection<Block> processReport(
|
||||
final DatanodeStorageInfo storageInfo,
|
||||
final BlockListAsLongs report,
|
||||
BlockReportContext context) throws IOException {
|
||||
|
@ -20,6 +20,7 @@
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap.ProvidedBlockList;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||
import org.apache.hadoop.hdfs.util.RwLock;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -52,14 +53,23 @@ void init(RwLock lock, BlockManager bm, DatanodeStorageInfo storage) {
|
||||
* start the processing of block report for provided blocks.
|
||||
* @throws IOException
|
||||
*/
|
||||
void start() throws IOException {
|
||||
void start(BlockReportContext context) throws IOException {
|
||||
assert lock.hasWriteLock() : "Not holding write lock";
|
||||
if (hasDNs) {
|
||||
return;
|
||||
}
|
||||
LOG.info("Calling process first blk report from storage: " + storage);
|
||||
// first pass; periodic refresh should call bm.processReport
|
||||
bm.processFirstBlockReport(storage, new ProvidedBlockList(iterator()));
|
||||
if (storage.getBlockReportCount() == 0) {
|
||||
LOG.info("Calling process first blk report from storage: " + storage);
|
||||
// first pass; periodic refresh should call bm.processReport
|
||||
bm.processFirstBlockReport(storage, new ProvidedBlockList(iterator()));
|
||||
} else {
|
||||
bm.processReport(storage, new ProvidedBlockList(iterator()), context);
|
||||
}
|
||||
hasDNs = true;
|
||||
}
|
||||
|
||||
void stop() {
|
||||
assert lock.hasWriteLock() : "Not holding write lock";
|
||||
hasDNs = false;
|
||||
}
|
||||
}
|
||||
|
@ -40,6 +40,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
|
||||
import org.apache.hadoop.hdfs.util.RwLock;
|
||||
@ -103,17 +104,18 @@ public class ProvidedStorageMap {
|
||||
/**
|
||||
* @param dn datanode descriptor
|
||||
* @param s data node storage
|
||||
* @param context the block report context
|
||||
* @return the {@link DatanodeStorageInfo} for the specified datanode.
|
||||
* If {@code s} corresponds to a provided storage, the storage info
|
||||
* representing provided storage is returned.
|
||||
* @throws IOException
|
||||
*/
|
||||
DatanodeStorageInfo getStorage(DatanodeDescriptor dn, DatanodeStorage s)
|
||||
throws IOException {
|
||||
DatanodeStorageInfo getStorage(DatanodeDescriptor dn, DatanodeStorage s,
|
||||
BlockReportContext context) throws IOException {
|
||||
if (providedEnabled && storageId.equals(s.getStorageID())) {
|
||||
if (StorageType.PROVIDED.equals(s.getStorageType())) {
|
||||
// poll service, initiate
|
||||
blockProvider.start();
|
||||
blockProvider.start(context);
|
||||
dn.injectStorage(providedStorageInfo);
|
||||
return providedDescriptor.getProvidedStorage(dn, s);
|
||||
}
|
||||
@ -134,6 +136,15 @@ public LocatedBlockBuilder newLocatedBlocks(int maxValue) {
|
||||
return new ProvidedBlocksBuilder(maxValue);
|
||||
}
|
||||
|
||||
public void removeDatanode(DatanodeDescriptor dnToRemove) {
|
||||
if (providedDescriptor != null) {
|
||||
int remainingDatanodes = providedDescriptor.remove(dnToRemove);
|
||||
if (remainingDatanodes == 0) {
|
||||
blockProvider.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder used for creating {@link LocatedBlocks} when a block is provided.
|
||||
*/
|
||||
@ -282,7 +293,7 @@ DatanodeStorageInfo getProvidedStorage(
|
||||
|
||||
DatanodeStorageInfo createProvidedStorage(DatanodeStorage ds) {
|
||||
assert null == storageMap.get(ds.getStorageID());
|
||||
DatanodeStorageInfo storage = new DatanodeStorageInfo(this, ds);
|
||||
DatanodeStorageInfo storage = new ProvidedDatanodeStorageInfo(this, ds);
|
||||
storage.setHeartbeatedSinceFailover(true);
|
||||
storageMap.put(storage.getStorageID(), storage);
|
||||
return storage;
|
||||
@ -381,6 +392,22 @@ void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
|
||||
}
|
||||
}
|
||||
|
||||
int remove(DatanodeDescriptor dnToRemove) {
|
||||
// this operation happens under the FSNamesystem lock;
|
||||
// no additional synchronization required.
|
||||
if (dnToRemove != null) {
|
||||
DatanodeDescriptor storedDN = dns.get(dnToRemove.getDatanodeUuid());
|
||||
if (storedDN != null) {
|
||||
dns.remove(dnToRemove.getDatanodeUuid());
|
||||
}
|
||||
}
|
||||
return dns.size();
|
||||
}
|
||||
|
||||
int activeProvidedDatanodes() {
|
||||
return dns.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return (this == obj) || super.equals(obj);
|
||||
@ -392,6 +419,25 @@ public int hashCode() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The DatanodeStorageInfo used for the provided storage.
|
||||
*/
|
||||
static class ProvidedDatanodeStorageInfo extends DatanodeStorageInfo {
|
||||
|
||||
ProvidedDatanodeStorageInfo(ProvidedDescriptor dn, DatanodeStorage ds) {
|
||||
super(dn, ds);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean removeBlock(BlockInfo b) {
|
||||
ProvidedDescriptor dn = (ProvidedDescriptor) getDatanodeDescriptor();
|
||||
if (dn.activeProvidedDatanodes() == 0) {
|
||||
return super.removeBlock(b);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Used to emulate block reports for provided blocks.
|
||||
*/
|
||||
|
@ -119,9 +119,9 @@ public void testProvidedStorageMap() throws IOException {
|
||||
|
||||
when(nameSystemLock.hasWriteLock()).thenReturn(true);
|
||||
DatanodeStorageInfo dns1Provided = providedMap.getStorage(dn1,
|
||||
dn1ProvidedStorage);
|
||||
dn1ProvidedStorage, null);
|
||||
DatanodeStorageInfo dns1Disk = providedMap.getStorage(dn1,
|
||||
dn1DiskStorage);
|
||||
dn1DiskStorage, null);
|
||||
|
||||
assertTrue("The provided storages should be equal",
|
||||
dns1Provided == providedMapStorage);
|
||||
@ -131,7 +131,7 @@ public void testProvidedStorageMap() throws IOException {
|
||||
DatanodeStorageInfo dnsDisk = new DatanodeStorageInfo(dn1, dn1DiskStorage);
|
||||
dn1.injectStorage(dnsDisk);
|
||||
assertTrue("Disk storage must match the injected storage info",
|
||||
dnsDisk == providedMap.getStorage(dn1, dn1DiskStorage));
|
||||
dnsDisk == providedMap.getStorage(dn1, dn1DiskStorage, null));
|
||||
|
||||
//create a 2nd datanode
|
||||
DatanodeDescriptor dn2 = createDatanodeDescriptor(5010);
|
||||
@ -142,12 +142,10 @@ public void testProvidedStorageMap() throws IOException {
|
||||
StorageType.PROVIDED);
|
||||
|
||||
DatanodeStorageInfo dns2Provided = providedMap.getStorage(
|
||||
dn2, dn2ProvidedStorage);
|
||||
dn2, dn2ProvidedStorage, null);
|
||||
assertTrue("The provided storages should be equal",
|
||||
dns2Provided == providedMapStorage);
|
||||
assertTrue("The DatanodeDescriptor should contain the provided storage",
|
||||
dn2.getStorageInfo(providedStorageID) == providedMapStorage);
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -45,11 +45,14 @@
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockProvider;
|
||||
import org.apache.hadoop.hdfs.server.common.BlockFormat;
|
||||
import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
|
||||
import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat;
|
||||
import org.apache.hadoop.hdfs.server.common.TextFileRegionProvider;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
|
||||
|
||||
import org.junit.After;
|
||||
@ -406,9 +409,9 @@ public void testSetReplicationForProvidedFiles() throws Exception {
|
||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
||||
FixedBlockResolver.class);
|
||||
startCluster(NNDIRPATH, 2, null,
|
||||
new StorageType[][] {
|
||||
{StorageType.PROVIDED},
|
||||
{StorageType.DISK}},
|
||||
new StorageType[][]{
|
||||
{StorageType.PROVIDED},
|
||||
{StorageType.DISK}},
|
||||
false);
|
||||
|
||||
String filename = "/" + filePrefix + (numFiles - 1) + fileSuffix;
|
||||
@ -433,4 +436,67 @@ public void testSetReplicationForProvidedFiles() throws Exception {
|
||||
assertEquals(cluster.getDataNodes().get(0).getDatanodeUuid(),
|
||||
infos[0].getDatanodeUuid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProvidedDatanodeFailures() throws Exception {
|
||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
||||
FixedBlockResolver.class);
|
||||
startCluster(NNDIRPATH, 3, null,
|
||||
new StorageType[][] {
|
||||
{StorageType.PROVIDED},
|
||||
{StorageType.PROVIDED},
|
||||
{StorageType.DISK}},
|
||||
false);
|
||||
|
||||
DataNode providedDatanode1 = cluster.getDataNodes().get(0);
|
||||
DataNode providedDatanode2 = cluster.getDataNodes().get(1);
|
||||
|
||||
DFSClient client = new DFSClient(new InetSocketAddress("localhost",
|
||||
cluster.getNameNodePort()), cluster.getConfiguration(0));
|
||||
|
||||
if (numFiles >= 1) {
|
||||
String filename = "/" + filePrefix + (numFiles - 1) + fileSuffix;
|
||||
|
||||
DatanodeInfo[] dnInfos = getAndCheckBlockLocations(client, filename, 1);
|
||||
//the location should be one of the provided DNs available
|
||||
assertTrue(
|
||||
dnInfos[0].getDatanodeUuid().equals(
|
||||
providedDatanode1.getDatanodeUuid())
|
||||
|| dnInfos[0].getDatanodeUuid().equals(
|
||||
providedDatanode2.getDatanodeUuid()));
|
||||
|
||||
//stop the 1st provided datanode
|
||||
MiniDFSCluster.DataNodeProperties providedDNProperties1 =
|
||||
cluster.stopDataNode(0);
|
||||
|
||||
//make NameNode detect that datanode is down
|
||||
BlockManagerTestUtil.noticeDeadDatanode(
|
||||
cluster.getNameNode(),
|
||||
providedDatanode1.getDatanodeId().getXferAddr());
|
||||
|
||||
//should find the block on the 2nd provided datanode
|
||||
dnInfos = getAndCheckBlockLocations(client, filename, 1);
|
||||
assertEquals(providedDatanode2.getDatanodeUuid(),
|
||||
dnInfos[0].getDatanodeUuid());
|
||||
|
||||
//stop the 2nd provided datanode
|
||||
cluster.stopDataNode(1);
|
||||
// make NameNode detect that datanode is down
|
||||
BlockManagerTestUtil.noticeDeadDatanode(
|
||||
cluster.getNameNode(),
|
||||
providedDatanode2.getDatanodeId().getXferAddr());
|
||||
|
||||
getAndCheckBlockLocations(client, filename, 0);
|
||||
|
||||
//restart the provided datanode
|
||||
cluster.restartDataNode(providedDNProperties1, true);
|
||||
cluster.waitActive();
|
||||
|
||||
//should find the block on the 1st provided datanode now
|
||||
dnInfos = getAndCheckBlockLocations(client, filename, 1);
|
||||
//not comparing UUIDs as the datanode can now have a different one.
|
||||
assertEquals(providedDatanode1.getDatanodeId().getXferAddr(),
|
||||
dnInfos[0].getXferAddr());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user