HDFS-17599. EC: Fix the mismatch between locations and indices for mover (#6980)
This commit is contained in:
parent
0837c84a9f
commit
a962aa37e0
@ -539,6 +539,10 @@ public void setIndices(byte[] indices) {
|
|||||||
this.indices = indices;
|
this.indices = indices;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public byte[] getIndices() {
|
||||||
|
return this.indices;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adjust EC block indices,it will remove the element of adjustList from indices.
|
* Adjust EC block indices,it will remove the element of adjustList from indices.
|
||||||
* @param adjustList the list will be removed from indices
|
* @param adjustList the list will be removed from indices
|
||||||
@ -889,8 +893,8 @@ private long getBlockList() throws IOException, IllegalArgumentException {
|
|||||||
if (g != null) { // not unknown
|
if (g != null) { // not unknown
|
||||||
block.addLocation(g);
|
block.addLocation(g);
|
||||||
} else if (blkLocs instanceof StripedBlockWithLocations) {
|
} else if (blkLocs instanceof StripedBlockWithLocations) {
|
||||||
// some datanode may not in storageGroupMap due to decommission operation
|
// some datanode may not in storageGroupMap due to decommission or maintenance
|
||||||
// or balancer cli with "-exclude" parameter
|
// operation or balancer cli with "-exclude" parameter
|
||||||
adjustList.add(i);
|
adjustList.add(i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -49,6 +49,7 @@
|
|||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.Lists;
|
import org.apache.hadoop.util.Lists;
|
||||||
|
import org.apache.hadoop.util.Preconditions;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
@ -222,12 +223,27 @@ DBlock newDBlock(LocatedBlock lb, List<MLocation> locations,
|
|||||||
} else {
|
} else {
|
||||||
db = new DBlock(blk);
|
db = new DBlock(blk);
|
||||||
}
|
}
|
||||||
for(MLocation ml : locations) {
|
|
||||||
|
List<Integer> adjustList = new ArrayList<>();
|
||||||
|
for (int i = 0; i < locations.size(); i++) {
|
||||||
|
MLocation ml = locations.get(i);
|
||||||
StorageGroup source = storages.getSource(ml);
|
StorageGroup source = storages.getSource(ml);
|
||||||
if (source != null) {
|
if (source != null) {
|
||||||
db.addLocation(source);
|
db.addLocation(source);
|
||||||
|
} else if (lb.isStriped()) {
|
||||||
|
// some datanode may not in storages due to decommission or maintenance operation
|
||||||
|
// or balancer cli with "-exclude" parameter
|
||||||
|
adjustList.add(i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!adjustList.isEmpty()) {
|
||||||
|
// block.locations mismatch with block.indices
|
||||||
|
// adjust indices to get correct internalBlock
|
||||||
|
((DBlockStriped) db).adjustIndices(adjustList);
|
||||||
|
Preconditions.checkArgument(((DBlockStriped) db).getIndices().length
|
||||||
|
== db.getLocations().size());
|
||||||
|
}
|
||||||
return db;
|
return db;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_KERBEROS_PRINCIPAL_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_KERBEROS_PRINCIPAL_KEY;
|
||||||
@ -73,6 +74,7 @@
|
|||||||
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
@ -82,10 +84,13 @@
|
|||||||
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
|
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
|
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
|
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||||
import org.apache.hadoop.http.HttpConfig;
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
@ -98,6 +103,7 @@
|
|||||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.test.MetricsAsserts;
|
import org.apache.hadoop.test.MetricsAsserts;
|
||||||
|
import org.apache.hadoop.util.Lists;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -1005,6 +1011,148 @@ public void testMoverWithStripedFile() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testMoverWithStripedFileMaintenance() throws Exception {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
initConfWithStripe(conf);
|
||||||
|
|
||||||
|
// Start 9 datanodes
|
||||||
|
int numOfDatanodes = 9;
|
||||||
|
int storagesPerDatanode = 2;
|
||||||
|
long capacity = 9 * defaultBlockSize;
|
||||||
|
long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
|
||||||
|
for (int i = 0; i < numOfDatanodes; i++) {
|
||||||
|
for(int j = 0; j < storagesPerDatanode; j++){
|
||||||
|
capacities[i][j] = capacity;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(numOfDatanodes)
|
||||||
|
.storagesPerDatanode(storagesPerDatanode)
|
||||||
|
.storageTypes(new StorageType[][]{
|
||||||
|
{StorageType.SSD, StorageType.SSD},
|
||||||
|
{StorageType.SSD, StorageType.SSD},
|
||||||
|
{StorageType.SSD, StorageType.SSD},
|
||||||
|
{StorageType.SSD, StorageType.SSD},
|
||||||
|
{StorageType.SSD, StorageType.SSD},
|
||||||
|
{StorageType.SSD, StorageType.SSD},
|
||||||
|
{StorageType.SSD, StorageType.SSD},
|
||||||
|
{StorageType.SSD, StorageType.SSD},
|
||||||
|
{StorageType.SSD, StorageType.SSD}})
|
||||||
|
.storageCapacities(capacities)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
cluster.getFileSystem().enableErasureCodingPolicy(
|
||||||
|
StripedFileTestUtil.getDefaultECPolicy().getName());
|
||||||
|
|
||||||
|
ClientProtocol client = NameNodeProxies.createProxy(conf,
|
||||||
|
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
|
||||||
|
String barDir = "/bar";
|
||||||
|
client.mkdirs(barDir, new FsPermission((short) 777), true);
|
||||||
|
// Set "/bar" directory with ALL_SSD storage policy.
|
||||||
|
client.setStoragePolicy(barDir, "ALL_SSD");
|
||||||
|
// Set an EC policy on "/bar" directory
|
||||||
|
client.setErasureCodingPolicy(barDir,
|
||||||
|
StripedFileTestUtil.getDefaultECPolicy().getName());
|
||||||
|
|
||||||
|
// Write file to barDir
|
||||||
|
final String fooFile = "/bar/foo";
|
||||||
|
long fileLen = 6 * defaultBlockSize;
|
||||||
|
DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
|
||||||
|
fileLen, (short) 3, 0);
|
||||||
|
|
||||||
|
// Verify storage types and locations
|
||||||
|
LocatedBlocks locatedBlocks =
|
||||||
|
client.getBlockLocations(fooFile, 0, fileLen);
|
||||||
|
DatanodeInfoWithStorage location = null;
|
||||||
|
for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){
|
||||||
|
location = lb.getLocations()[8];
|
||||||
|
for(StorageType type : lb.getStorageTypes()){
|
||||||
|
Assert.assertEquals(StorageType.SSD, type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Maintain the last datanode later
|
||||||
|
FSNamesystem ns = cluster.getNamesystem(0);
|
||||||
|
DatanodeManager datanodeManager = ns.getBlockManager().getDatanodeManager();
|
||||||
|
DatanodeDescriptor dn = datanodeManager.getDatanode(location.getDatanodeUuid());
|
||||||
|
|
||||||
|
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
|
||||||
|
dataBlocks + parityBlocks);
|
||||||
|
|
||||||
|
// Start 5 more datanodes for mover
|
||||||
|
capacities = new long[5][storagesPerDatanode];
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
for(int j = 0; j < storagesPerDatanode; j++){
|
||||||
|
capacities[i][j] = capacity;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cluster.startDataNodes(conf, 5,
|
||||||
|
new StorageType[][]{
|
||||||
|
{StorageType.DISK, StorageType.DISK},
|
||||||
|
{StorageType.DISK, StorageType.DISK},
|
||||||
|
{StorageType.DISK, StorageType.DISK},
|
||||||
|
{StorageType.DISK, StorageType.DISK},
|
||||||
|
{StorageType.DISK, StorageType.DISK}},
|
||||||
|
true, null, null, null, capacities,
|
||||||
|
null, false, false, false, null, null, null);
|
||||||
|
cluster.triggerHeartbeats();
|
||||||
|
|
||||||
|
// Move blocks to DISK
|
||||||
|
client.setStoragePolicy(barDir, "HOT");
|
||||||
|
int rc = ToolRunner.run(conf, new Mover.Cli(),
|
||||||
|
new String[]{"-p", barDir});
|
||||||
|
// Verify the number of DISK storage types
|
||||||
|
waitForLocatedBlockWithDiskStorageType(cluster.getFileSystem(), fooFile, 5);
|
||||||
|
|
||||||
|
// Maintain a datanode that simulates that one node in the location list
|
||||||
|
// is in ENTERING_MAINTENANCE status.
|
||||||
|
datanodeManager.getDatanode(dn.getDatanodeUuid()).startMaintenance();
|
||||||
|
waitNodeState(dn, DatanodeInfo.AdminStates.ENTERING_MAINTENANCE);
|
||||||
|
|
||||||
|
// Move blocks back to SSD.
|
||||||
|
// Without HDFS-17599, locations and indices lengths might not match,
|
||||||
|
// resulting in getting the wrong blockId in DBlockStriped#getInternalBlock,
|
||||||
|
// and mover will fail to run.
|
||||||
|
client.setStoragePolicy(barDir, "ALL_SSD");
|
||||||
|
rc = ToolRunner.run(conf, new Mover.Cli(),
|
||||||
|
new String[]{"-p", barDir});
|
||||||
|
|
||||||
|
Assert.assertEquals("Movement to HOT should be successful", 0, rc);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait till DataNode is transitioned to the expected state.
|
||||||
|
*/
|
||||||
|
protected void waitNodeState(DatanodeInfo node, DatanodeInfo.AdminStates state) {
|
||||||
|
waitNodeState(Lists.newArrayList(node), state);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait till all DataNodes are transitioned to the expected state.
|
||||||
|
*/
|
||||||
|
protected void waitNodeState(List<DatanodeInfo> nodes, DatanodeInfo.AdminStates state) {
|
||||||
|
for (DatanodeInfo node : nodes) {
|
||||||
|
boolean done = (state == node.getAdminState());
|
||||||
|
while (!done) {
|
||||||
|
LOG.info("Waiting for node " + node + " to change state to "
|
||||||
|
+ state + " current state: " + node.getAdminState());
|
||||||
|
try {
|
||||||
|
Thread.sleep(DFS_HEARTBEAT_INTERVAL_DEFAULT * 10);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// nothing
|
||||||
|
}
|
||||||
|
done = (state == node.getAdminState());
|
||||||
|
}
|
||||||
|
LOG.info("node " + node + " reached the state " + state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wait until Namenode reports expected storage type for all blocks of
|
* Wait until Namenode reports expected storage type for all blocks of
|
||||||
* given file.
|
* given file.
|
||||||
|
Loading…
Reference in New Issue
Block a user