HDFS-12946. Add a tool to check rack configuration against EC policies. Contributed by Kitti Nanasi.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Kitti Nanasi 2018-12-03 09:59:56 -08:00 committed by Wei-Chiu Chuang
parent 3044b78bd0
commit dd5e7c6b72
12 changed files with 479 additions and 17 deletions

View File

@ -706,4 +706,9 @@ public String getNameDirSize() {
public int getNumEncryptionZones() {
return 0;
}
@Override
public String getVerifyECWithTopologyResult() {
return null;
}
}

View File

@ -1253,6 +1253,13 @@ public int getNumDeadDataNodes() {
return getDatanodeListForReport(DatanodeReportType.DEAD).size();
}
/** @return the number of datanodes. */
public int getNumOfDataNodes() {
synchronized (this) {
return datanodeMap.size();
}
}
/** @return list of datanodes where decommissioning is in progress. */
public List<DatanodeDescriptor> getDecommissioningNodes() {
// There is no need to take namesystem reader lock as

View File

@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.common;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.server.namenode.ECTopologyVerifierResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* Class for verifying whether the cluster setup can support
* all enabled EC policies.
*
* Scenarios when the verification fails:
* 1. not enough data nodes compared to EC policy's highest data+parity number
* 2. not enough racks to satisfy BlockPlacementPolicyRackFaultTolerant
*/
@InterfaceAudience.Private
public final class ECTopologyVerifier {
public static final Logger LOG =
LoggerFactory.getLogger(ECTopologyVerifier.class);
private ECTopologyVerifier() {}
/**
* Verifies whether the cluster setup can support all enabled EC policies.
*
* @param report list of data node descriptors for all data nodes
* @param policies all system and user defined erasure coding policies
* @return the status of the verification
*/
public static ECTopologyVerifierResult getECTopologyVerifierResult(
final DatanodeInfo[] report, final ErasureCodingPolicyInfo[] policies) {
final int numOfRacks = getNumberOfRacks(report);
return getECTopologyVerifierResult(policies, numOfRacks, report.length);
}
/**
* Verifies whether the cluster setup can support all enabled EC policies.
*
* @param policies all system and user defined erasure coding policies
* @param numOfRacks number of racks
* @param numOfDataNodes number of data nodes
* @return the status of the verification
*/
public static ECTopologyVerifierResult getECTopologyVerifierResult(
final ErasureCodingPolicyInfo[] policies, final int numOfRacks,
final int numOfDataNodes) {
int minDN = 0;
int minRack = 0;
for (ErasureCodingPolicyInfo policy: policies) {
if (policy.isEnabled()) {
final int policyDN =
policy.getPolicy().getNumDataUnits() + policy.getPolicy()
.getNumParityUnits();
minDN = Math.max(minDN, policyDN);
final int policyRack = (int) Math.ceil(
policyDN / (double) policy.getPolicy().getNumParityUnits());
minRack = Math.max(minRack, policyRack);
}
}
if (minDN == 0 || minRack == 0) {
String resultMessage = "No erasure coding policy is enabled.";
LOG.trace(resultMessage);
return new ECTopologyVerifierResult(true, resultMessage);
}
return verifyECWithTopology(minDN, minRack, numOfRacks, numOfDataNodes);
}
private static ECTopologyVerifierResult verifyECWithTopology(
final int minDN, final int minRack,
final int numOfRacks, final int numOfDataNodes) {
String resultMessage;
if (numOfDataNodes < minDN) {
resultMessage = "The number of DataNodes (" + numOfDataNodes
+ ") is less than the minimum required number of DataNodes ("
+ minDN + ") for enabled erasure coding policy.";
LOG.debug(resultMessage);
return new ECTopologyVerifierResult(false, resultMessage);
}
if (numOfRacks < minRack) {
resultMessage = "The number of racks (" + numOfRacks
+ ") is less than the minimum required number of racks ("
+ minRack + ") for enabled erasure coding policy.";
LOG.debug(resultMessage);
return new ECTopologyVerifierResult(false, resultMessage);
}
return new ECTopologyVerifierResult(true,
"The cluster setup can support all enabled EC policies");
}
private static int getNumberOfRacks(DatanodeInfo[] report) {
final Map<String, Integer> racks = new HashMap<>();
for (DatanodeInfo dni : report) {
Integer count = racks.get(dni.getNetworkLocation());
if (count == null) {
count = 0;
}
racks.put(dni.getNetworkLocation(), count + 1);
}
return racks.size();
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Result of the verification whether the current cluster setup can
* support all enabled EC policies.
*/
@InterfaceAudience.Private
public class ECTopologyVerifierResult {
private final String resultMessage;
private final boolean isSupported;
public ECTopologyVerifierResult(boolean isSupported,
String resultMessage) {
this.resultMessage = resultMessage;
this.isSupported = isSupported;
}
public String getResultMessage() {
return resultMessage;
}
public boolean isSupported() {
return isSupported;
}
}

View File

@ -37,6 +37,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@ -204,6 +205,14 @@ public ErasureCodingPolicyInfo[] getPersistedPolicies() {
.toArray(new ErasureCodingPolicyInfo[0]);
}
public ErasureCodingPolicyInfo[] getCopyOfPolicies() {
ErasureCodingPolicyInfo[] copy;
synchronized (this) {
copy = Arrays.copyOf(allPolicies, allPolicies.length);
}
return copy;
}
/**
* Get a {@link ErasureCodingPolicy} by policy ID, including system policy
* and user defined policy.

View File

@ -102,6 +102,7 @@
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.server.common.ECTopologyVerifier;
import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.util.Time;
@ -8167,6 +8168,24 @@ public int getNumEnteringMaintenanceDataNodes() {
.size();
}
@Override // NameNodeMXBean
public String getVerifyECWithTopologyResult() {
int numOfDataNodes = getBlockManager().getDatanodeManager()
.getNumOfDataNodes();
int numOfRacks = getBlockManager().getDatanodeManager()
.getNetworkTopology().getNumOfRacks();
ErasureCodingPolicyInfo[] ecPolicies =
getErasureCodingPolicyManager().getCopyOfPolicies();
ECTopologyVerifierResult result =
ECTopologyVerifier.getECTopologyVerifierResult(ecPolicies,
numOfRacks, numOfDataNodes);
Map<String, String> resultMap = new HashMap<String, String>();
resultMap.put("isSupported", Boolean.toString(result.isSupported()));
resultMap.put("resultMessage", result.getResultMessage());
return JSON.toString(resultMap);
}
// This method logs operatoinName without super user privilege.
// It should be called without holding FSN lock.
void checkSuperuserPrivilege(String operationName)

View File

@ -303,4 +303,11 @@ public interface NameNodeMXBean {
*/
String getNameDirSize();
/**
* Verifies whether the cluster setup can support all enabled EC policies.
*
* @return the result of the verification
*/
String getVerifyECWithTopologyResult();
}

View File

@ -24,9 +24,13 @@
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.NoECPolicySetException;
import org.apache.hadoop.hdfs.server.common.ECTopologyVerifier;
import org.apache.hadoop.hdfs.server.namenode.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.util.ECPolicyLoader;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.tools.TableListing;
@ -588,6 +592,50 @@ public int run(Configuration conf, List<String> args) throws IOException {
}
}
/**
* Command to verify the cluster setup can support all enabled EC policies.
*/
private static class VerifyClusterSetupCommand
implements AdminHelper.Command {
@Override
public String getName() {
return "-verifyClusterSetup";
}
@Override
public String getShortUsage() {
return "[" + getName() + "]\n";
}
@Override
public String getLongUsage() {
return getShortUsage() + "\n"
+ "Verify the cluster setup can support all enabled erasure coding"
+ " policies.\n";
}
@Override
public int run(Configuration conf, List<String> args) throws IOException {
if (args.size() > 0) {
System.err.println(getName() + ": Too many arguments");
return 1;
}
final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
final ErasureCodingPolicyInfo[] policies =
dfs.getClient().getNamenode().getErasureCodingPolicies();
final DatanodeInfo[] report = dfs.getClient().getNamenode()
.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
ECTopologyVerifierResult result = ECTopologyVerifier
.getECTopologyVerifierResult(report, policies);
System.out.println(result.getResultMessage());
if (result.isSupported()) {
return 0;
}
return 2;
}
}
private static final AdminHelper.Command[] COMMANDS = {
new ListECPoliciesCommand(),
new AddECPoliciesCommand(),
@ -597,6 +645,7 @@ public int run(Configuration conf, List<String> args) throws IOException {
new UnsetECPolicyCommand(),
new ListECCodecsCommand(),
new EnableECPolicyCommand(),
new DisableECPolicyCommand()
new DisableECPolicyCommand(),
new VerifyClusterSetupCommand()
};
}

View File

@ -2398,6 +2398,41 @@ public static HashSet<Path> closeOpenFiles(
return closedFiles;
}
/**
* Setup cluster with desired number of DN, racks, and specified number of
* rack that only has 1 DN. Other racks will be evenly setup with the number
* of DNs.
*
* @param conf the conf object to start the cluster.
* @param numDatanodes number of total Datanodes.
* @param numRacks number of total racks
* @param numSingleDnRacks number of racks that only has 1 DN
* @throws Exception
*/
public static MiniDFSCluster setupCluster(final Configuration conf,
final int numDatanodes,
final int numRacks,
final int numSingleDnRacks)
throws Exception {
assert numDatanodes > numRacks;
assert numRacks > numSingleDnRacks;
assert numSingleDnRacks >= 0;
final String[] racks = new String[numDatanodes];
for (int i = 0; i < numSingleDnRacks; i++) {
racks[i] = "/rack" + i;
}
for (int i = numSingleDnRacks; i < numDatanodes; i++) {
racks[i] =
"/rack" + (numSingleDnRacks + (i % (numRacks - numSingleDnRacks)));
}
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes)
.racks(racks)
.build();
cluster.waitActive();
return cluster;
}
/**
* Check the correctness of the snapshotDiff report.
* Make sure all items in the passed entries are in the snapshotDiff

View File

@ -92,23 +92,9 @@ public void setup() {
*/
public void setupCluster(final int numDatanodes, final int numRacks,
final int numSingleDnRacks) throws Exception {
assert numDatanodes > numRacks;
assert numRacks > numSingleDnRacks;
assert numSingleDnRacks >= 0;
final String[] racks = new String[numDatanodes];
for (int i = 0; i < numSingleDnRacks; i++) {
racks[i] = "/rack" + i;
}
for (int i = numSingleDnRacks; i < numDatanodes; i++) {
racks[i] =
"/rack" + (numSingleDnRacks + (i % (numRacks - numSingleDnRacks)));
}
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes)
.racks(racks)
.build();
cluster = DFSTestUtil
.setupCluster(conf, numDatanodes, numRacks, numSingleDnRacks);
dfs = cluster.getFileSystem();
cluster.waitActive();
dfs.setErasureCodingPolicy(new Path("/"), ecPolicy.getName());
}

View File

@ -1008,6 +1008,7 @@ void verifyTotalBlocksMetrics(long expectedTotalReplicatedBlocks,
expectedTotalReplicatedBlocks, totalReplicaBlocks.longValue());
assertEquals("Unexpected total ec block groups!",
expectedTotalECBlockGroups, totalECBlockGroups.longValue());
verifyEcClusterSetupVerifyResult(mbs);
}
private String getEnabledEcPoliciesMetric() throws Exception {
@ -1017,4 +1018,22 @@ private String getEnabledEcPoliciesMetric() throws Exception {
return (String) (mbs.getAttribute(mxbeanName,
"EnabledEcPolicies"));
}
private void verifyEcClusterSetupVerifyResult(MBeanServer mbs)
throws Exception{
ObjectName namenodeMXBeanName = new ObjectName(
"Hadoop:service=NameNode,name=NameNodeInfo");
String result = (String) mbs.getAttribute(namenodeMXBeanName,
"VerifyECWithTopologyResult");
ObjectMapper mapper = new ObjectMapper();
Map<String, String> resultMap = mapper.readValue(result, Map.class);
Boolean isSupported = Boolean.parseBoolean(resultMap.get("isSupported"));
String resultMessage = resultMap.get("resultMessage");
assertFalse("Test cluster does not support all enabled " +
"erasure coding policies.", isSupported);
assertTrue(resultMessage.contains("The number of racks"));
assertTrue(resultMessage.contains("is less than the minimum required " +
"number of racks"));
}
}

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.tools;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests some ECAdmin scenarios that are hard to test from
* {@link org.apache.hadoop.cli.TestErasureCodingCLI}.
*/
public class TestECAdmin {
public static final Logger LOG = LoggerFactory.getLogger(TestECAdmin.class);
private Configuration conf = new Configuration();
private MiniDFSCluster cluster;
private ECAdmin admin = new ECAdmin(conf);
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
private static final PrintStream OLD_OUT = System.out;
@Rule
public Timeout globalTimeout =
new Timeout(300000, TimeUnit.MILLISECONDS);
@Before
public void setup() throws Exception {
System.setOut(new PrintStream(out));
}
@After
public void tearDown() throws Exception {
try {
System.out.flush();
System.err.flush();
out.reset();
} finally {
System.setOut(OLD_OUT);
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Test
public void testRS63MinDN() throws Exception {
cluster = DFSTestUtil.setupCluster(conf, 6, 3, 0);
String[] args = {"-verifyClusterSetup"};
final int ret = admin.run(args);
LOG.info("Commend stdout: {}", out.toString());
assertEquals(2, ret);
assertTrue(out.toString()
.contains("less than the minimum required number of DataNodes"));
}
@Test
public void testRS104MinRacks() throws Exception {
cluster = DFSTestUtil.setupCluster(conf, 15, 3, 0);
cluster.getFileSystem().enableErasureCodingPolicy(
SystemErasureCodingPolicies
.getByID(SystemErasureCodingPolicies.RS_10_4_POLICY_ID).getName());
String[] args = {"-verifyClusterSetup"};
final int ret = admin.run(args);
LOG.info("Commend stdout: {}", out.toString());
assertEquals(2, ret);
assertTrue(out.toString()
.contains("less than the minimum required number of racks"));
}
@Test
public void testXOR21MinRacks() throws Exception {
cluster = DFSTestUtil.setupCluster(conf, 5, 2, 0);
cluster.getFileSystem().disableErasureCodingPolicy(
SystemErasureCodingPolicies
.getByID(SystemErasureCodingPolicies.RS_6_3_POLICY_ID).getName());
cluster.getFileSystem().enableErasureCodingPolicy(
SystemErasureCodingPolicies
.getByID(SystemErasureCodingPolicies.XOR_2_1_POLICY_ID).getName());
String[] args = {"-verifyClusterSetup"};
final int ret = admin.run(args);
LOG.info("Commend stdout: {}", out.toString());
assertEquals(2, ret);
assertTrue(out.toString()
.contains("less than the minimum required number of racks"));
}
@Test
public void testRS32MinRacks() throws Exception {
cluster = DFSTestUtil.setupCluster(conf, 5, 2, 0);
cluster.getFileSystem().disableErasureCodingPolicy(
SystemErasureCodingPolicies
.getByID(SystemErasureCodingPolicies.RS_6_3_POLICY_ID).getName());
cluster.getFileSystem().enableErasureCodingPolicy(
SystemErasureCodingPolicies
.getByID(SystemErasureCodingPolicies.RS_3_2_POLICY_ID).getName());
String[] args = {"-verifyClusterSetup"};
final int ret = admin.run(args);
LOG.info("Commend stdout: {}", out.toString());
assertEquals(2, ret);
assertTrue(out.toString()
.contains("less than the minimum required number of racks"));
}
@Test
public void testRS63Good() throws Exception {
cluster = DFSTestUtil.setupCluster(conf, 9, 3, 0);
String[] args = {"-verifyClusterSetup"};
final int ret = admin.run(args);
LOG.info("Commend stdout: {}", out.toString());
assertEquals(0, ret);
}
@Test
public void testNoECEnabled() throws Exception {
cluster = DFSTestUtil.setupCluster(conf, 9, 3, 0);
cluster.getFileSystem().disableErasureCodingPolicy(
SystemErasureCodingPolicies
.getByID(SystemErasureCodingPolicies.RS_6_3_POLICY_ID).getName());
String[] args = {"-verifyClusterSetup"};
final int ret = admin.run(args);
LOG.info("Commend stdout: {}", out.toString());
assertEquals(0, ret);
assertTrue(out.toString().contains("No erasure coding policy is enabled"));
}
}