HDFS-14061. Check if the cluster topology supports the EC policy before setting, enabling or adding it. Contributed by Kitti Nanasi.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
parent
0b91329ed6
commit
951cdd7e4c
@ -18,13 +18,15 @@
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ECTopologyVerifierResult;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Class for verifying whether the cluster setup can support
|
||||
@ -43,58 +45,57 @@ public final class ECTopologyVerifier {
|
||||
private ECTopologyVerifier() {}
|
||||
|
||||
/**
|
||||
* Verifies whether the cluster setup can support all enabled EC policies.
|
||||
* Verifies whether the cluster setup can support the given EC policies.
|
||||
*
|
||||
* @param report list of data node descriptors for all data nodes
|
||||
* @param policies all system and user defined erasure coding policies
|
||||
* @param policies erasure coding policies to verify
|
||||
* @return the status of the verification
|
||||
*/
|
||||
public static ECTopologyVerifierResult getECTopologyVerifierResult(
|
||||
final DatanodeInfo[] report, final ErasureCodingPolicyInfo[] policies) {
|
||||
final DatanodeInfo[] report, final ErasureCodingPolicy... policies) {
|
||||
final int numOfRacks = getNumberOfRacks(report);
|
||||
return getECTopologyVerifierResult(policies, numOfRacks, report.length);
|
||||
return getECTopologyVerifierResult(numOfRacks, report.length, policies);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies whether the cluster setup can support all enabled EC policies.
|
||||
*
|
||||
* @param policies all system and user defined erasure coding policies
|
||||
* @param policies erasure coding policies to verify
|
||||
* @param numOfRacks number of racks
|
||||
* @param numOfDataNodes number of data nodes
|
||||
* @return the status of the verification
|
||||
*/
|
||||
public static ECTopologyVerifierResult getECTopologyVerifierResult(
|
||||
final ErasureCodingPolicyInfo[] policies, final int numOfRacks,
|
||||
final int numOfDataNodes) {
|
||||
final int numOfRacks, final int numOfDataNodes,
|
||||
final ErasureCodingPolicy... policies) {
|
||||
int minDN = 0;
|
||||
int minRack = 0;
|
||||
for (ErasureCodingPolicyInfo policy: policies) {
|
||||
if (policy.isEnabled()) {
|
||||
final int policyDN =
|
||||
policy.getPolicy().getNumDataUnits() + policy.getPolicy()
|
||||
.getNumParityUnits();
|
||||
minDN = Math.max(minDN, policyDN);
|
||||
final int policyRack = (int) Math.ceil(
|
||||
policyDN / (double) policy.getPolicy().getNumParityUnits());
|
||||
minRack = Math.max(minRack, policyRack);
|
||||
}
|
||||
for (ErasureCodingPolicy policy: policies) {
|
||||
final int policyDN =
|
||||
policy.getNumDataUnits() + policy
|
||||
.getNumParityUnits();
|
||||
minDN = Math.max(minDN, policyDN);
|
||||
final int policyRack = (int) Math.ceil(
|
||||
policyDN / (double) policy.getNumParityUnits());
|
||||
minRack = Math.max(minRack, policyRack);
|
||||
}
|
||||
if (minDN == 0 || minRack == 0) {
|
||||
String resultMessage = "No erasure coding policy is enabled.";
|
||||
String resultMessage = "No erasure coding policy is given.";
|
||||
LOG.trace(resultMessage);
|
||||
return new ECTopologyVerifierResult(true, resultMessage);
|
||||
}
|
||||
return verifyECWithTopology(minDN, minRack, numOfRacks, numOfDataNodes);
|
||||
return verifyECWithTopology(minDN, minRack, numOfRacks, numOfDataNodes,
|
||||
getReadablePolicies(policies));
|
||||
}
|
||||
|
||||
private static ECTopologyVerifierResult verifyECWithTopology(
|
||||
final int minDN, final int minRack,
|
||||
final int numOfRacks, final int numOfDataNodes) {
|
||||
final int numOfRacks, final int numOfDataNodes, String readablePolicies) {
|
||||
String resultMessage;
|
||||
if (numOfDataNodes < minDN) {
|
||||
resultMessage = "The number of DataNodes (" + numOfDataNodes
|
||||
+ ") is less than the minimum required number of DataNodes ("
|
||||
+ minDN + ") for enabled erasure coding policy.";
|
||||
+ minDN + ") for the erasure coding policies: " + readablePolicies;
|
||||
LOG.debug(resultMessage);
|
||||
return new ECTopologyVerifierResult(false, resultMessage);
|
||||
}
|
||||
@ -102,12 +103,14 @@ private static ECTopologyVerifierResult verifyECWithTopology(
|
||||
if (numOfRacks < minRack) {
|
||||
resultMessage = "The number of racks (" + numOfRacks
|
||||
+ ") is less than the minimum required number of racks ("
|
||||
+ minRack + ") for enabled erasure coding policy.";
|
||||
+ minRack + ") for the erasure coding policies: "
|
||||
+ readablePolicies;
|
||||
LOG.debug(resultMessage);
|
||||
return new ECTopologyVerifierResult(false, resultMessage);
|
||||
}
|
||||
return new ECTopologyVerifierResult(true,
|
||||
"The cluster setup can support all enabled EC policies");
|
||||
"The cluster setup can support EC policies: "
|
||||
+ readablePolicies);
|
||||
}
|
||||
|
||||
private static int getNumberOfRacks(DatanodeInfo[] report) {
|
||||
@ -121,4 +124,12 @@ private static int getNumberOfRacks(DatanodeInfo[] report) {
|
||||
}
|
||||
return racks.size();
|
||||
}
|
||||
|
||||
private static String getReadablePolicies(
|
||||
final ErasureCodingPolicy... policies) {
|
||||
return Arrays.asList(policies)
|
||||
.stream()
|
||||
.map(policyInfo -> policyInfo.getName())
|
||||
.collect(Collectors.joining(", "));
|
||||
}
|
||||
}
|
||||
|
@ -213,10 +213,10 @@ public ErasureCodingPolicyInfo[] getPersistedPolicies() {
|
||||
.toArray(new ErasureCodingPolicyInfo[0]);
|
||||
}
|
||||
|
||||
public ErasureCodingPolicyInfo[] getCopyOfPolicies() {
|
||||
ErasureCodingPolicyInfo[] copy;
|
||||
public ErasureCodingPolicy[] getCopyOfEnabledPolicies() {
|
||||
ErasureCodingPolicy[] copy;
|
||||
synchronized (this) {
|
||||
copy = Arrays.copyOf(allPolicies, allPolicies.length);
|
||||
copy = Arrays.copyOf(enabledPolicies, enabledPolicies.length);
|
||||
}
|
||||
return copy;
|
||||
}
|
||||
|
@ -8194,11 +8194,11 @@ public String getVerifyECWithTopologyResult() {
|
||||
.getNumOfDataNodes();
|
||||
int numOfRacks = getBlockManager().getDatanodeManager()
|
||||
.getNetworkTopology().getNumOfRacks();
|
||||
ErasureCodingPolicyInfo[] ecPolicies =
|
||||
getErasureCodingPolicyManager().getCopyOfPolicies();
|
||||
ErasureCodingPolicy[] enabledEcPolicies =
|
||||
getErasureCodingPolicyManager().getCopyOfEnabledPolicies();
|
||||
ECTopologyVerifierResult result =
|
||||
ECTopologyVerifier.getECTopologyVerifierResult(ecPolicies,
|
||||
numOfRacks, numOfDataNodes);
|
||||
ECTopologyVerifier.getECTopologyVerifierResult(
|
||||
numOfRacks, numOfDataNodes, enabledEcPolicies);
|
||||
|
||||
Map<String, String> resultMap = new HashMap<String, String>();
|
||||
resultMap.put("isSupported", Boolean.toString(result.isSupported()));
|
||||
|
@ -16,6 +16,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.tools;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
@ -536,6 +537,13 @@ public int run(Configuration conf, List<String> args) throws IOException {
|
||||
dfs.enableErasureCodingPolicy(ecPolicyName);
|
||||
System.out.println("Erasure coding policy " + ecPolicyName +
|
||||
" is enabled");
|
||||
ECTopologyVerifierResult result =
|
||||
getECTopologyVerifierResultForPolicy(dfs, ecPolicyName);
|
||||
if (!result.isSupported()) {
|
||||
System.err.println("Warning: The cluster setup does not support " +
|
||||
"EC policy " + ecPolicyName + ". Reason: " +
|
||||
result.getResultMessage());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
System.err.println(AdminHelper.prettifyException(e));
|
||||
return 2;
|
||||
@ -621,13 +629,7 @@ public int run(Configuration conf, List<String> args) throws IOException {
|
||||
return 1;
|
||||
}
|
||||
final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
|
||||
final ErasureCodingPolicyInfo[] policies =
|
||||
dfs.getClient().getNamenode().getErasureCodingPolicies();
|
||||
final DatanodeInfo[] report = dfs.getClient().getNamenode()
|
||||
.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
|
||||
|
||||
ECTopologyVerifierResult result = ECTopologyVerifier
|
||||
.getECTopologyVerifierResult(report, policies);
|
||||
ECTopologyVerifierResult result = getECTopologyVerifierResult(dfs);
|
||||
System.out.println(result.getResultMessage());
|
||||
if (result.isSupported()) {
|
||||
return 0;
|
||||
@ -636,6 +638,47 @@ public int run(Configuration conf, List<String> args) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
private static ECTopologyVerifierResult getECTopologyVerifierResult(
|
||||
final DistributedFileSystem dfs) throws IOException {
|
||||
final ErasureCodingPolicyInfo[] policies =
|
||||
dfs.getClient().getNamenode().getErasureCodingPolicies();
|
||||
final DatanodeInfo[] report = dfs.getClient().getNamenode()
|
||||
.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
|
||||
|
||||
return ECTopologyVerifier.getECTopologyVerifierResult(report,
|
||||
getEnabledPolicies(policies));
|
||||
}
|
||||
|
||||
private static ECTopologyVerifierResult getECTopologyVerifierResultForPolicy(
|
||||
final DistributedFileSystem dfs, final String policyName)
|
||||
throws IOException {
|
||||
final ErasureCodingPolicy policy =
|
||||
getPolicy(dfs.getClient().getNamenode().getErasureCodingPolicies(),
|
||||
policyName);
|
||||
final DatanodeInfo[] report = dfs.getClient().getNamenode()
|
||||
.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
|
||||
return ECTopologyVerifier.getECTopologyVerifierResult(report, policy);
|
||||
}
|
||||
|
||||
private static ErasureCodingPolicy getPolicy(
|
||||
final ErasureCodingPolicyInfo[] policies, final String policyName) {
|
||||
for (ErasureCodingPolicyInfo policy : policies) {
|
||||
if (policyName.equals(policy.getPolicy().getName())) {
|
||||
return policy.getPolicy();
|
||||
}
|
||||
}
|
||||
throw new HadoopIllegalArgumentException("The given erasure coding " +
|
||||
"policy " + policyName + " does not exist.");
|
||||
}
|
||||
|
||||
private static ErasureCodingPolicy[] getEnabledPolicies(
|
||||
final ErasureCodingPolicyInfo[] policies) {
|
||||
return Arrays.asList(policies).stream()
|
||||
.filter(policyInfo -> policyInfo.isEnabled())
|
||||
.map(ErasureCodingPolicyInfo::getPolicy)
|
||||
.toArray(ErasureCodingPolicy[]::new);
|
||||
}
|
||||
|
||||
private static final AdminHelper.Command[] COMMANDS = {
|
||||
new ListECPoliciesCommand(),
|
||||
new AddECPoliciesCommand(),
|
||||
|
@ -34,6 +34,7 @@
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
@ -47,7 +48,10 @@ public class TestECAdmin {
|
||||
private ECAdmin admin = new ECAdmin(conf);
|
||||
|
||||
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
private final ByteArrayOutputStream err = new ByteArrayOutputStream();
|
||||
|
||||
private static final PrintStream OLD_OUT = System.out;
|
||||
private static final PrintStream OLD_ERR = System.err;
|
||||
|
||||
@Rule
|
||||
public Timeout globalTimeout =
|
||||
@ -56,6 +60,7 @@ public class TestECAdmin {
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
System.setOut(new PrintStream(out));
|
||||
System.setErr(new PrintStream(err));
|
||||
}
|
||||
|
||||
@After
|
||||
@ -64,8 +69,10 @@ public void tearDown() throws Exception {
|
||||
System.out.flush();
|
||||
System.err.flush();
|
||||
out.reset();
|
||||
err.reset();
|
||||
} finally {
|
||||
System.setOut(OLD_OUT);
|
||||
System.setErr(OLD_ERR);
|
||||
}
|
||||
|
||||
if (cluster != null) {
|
||||
@ -79,10 +86,13 @@ public void testRS63MinDN() throws Exception {
|
||||
cluster = DFSTestUtil.setupCluster(conf, 6, 3, 0);
|
||||
String[] args = {"-verifyClusterSetup"};
|
||||
final int ret = admin.run(args);
|
||||
LOG.info("Commend stdout: {}", out.toString());
|
||||
assertEquals(2, ret);
|
||||
assertTrue(out.toString()
|
||||
LOG.info("Command stdout: {}", out.toString());
|
||||
LOG.info("Command stderr: {}", err.toString());
|
||||
assertEquals("Return value of the command is not successful", 2, ret);
|
||||
assertTrue("Result of cluster topology verify " +
|
||||
"should be logged correctly", out.toString()
|
||||
.contains("less than the minimum required number of DataNodes"));
|
||||
assertTrue("Error output should be empty", err.toString().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -93,10 +103,13 @@ public void testRS104MinRacks() throws Exception {
|
||||
.getByID(SystemErasureCodingPolicies.RS_10_4_POLICY_ID).getName());
|
||||
String[] args = {"-verifyClusterSetup"};
|
||||
final int ret = admin.run(args);
|
||||
LOG.info("Commend stdout: {}", out.toString());
|
||||
assertEquals(2, ret);
|
||||
assertTrue(out.toString()
|
||||
LOG.info("Command stdout: {}", out.toString());
|
||||
LOG.info("Command stderr: {}", err.toString());
|
||||
assertEquals("Return value of the command is not successful", 2, ret);
|
||||
assertTrue("Result of cluster topology verify " +
|
||||
"should be logged correctly", out.toString()
|
||||
.contains("less than the minimum required number of racks"));
|
||||
assertTrue("Error output should be empty", err.toString().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -110,10 +123,13 @@ public void testXOR21MinRacks() throws Exception {
|
||||
.getByID(SystemErasureCodingPolicies.XOR_2_1_POLICY_ID).getName());
|
||||
String[] args = {"-verifyClusterSetup"};
|
||||
final int ret = admin.run(args);
|
||||
LOG.info("Commend stdout: {}", out.toString());
|
||||
assertEquals(2, ret);
|
||||
assertTrue(out.toString()
|
||||
LOG.info("Command stdout: {}", out.toString());
|
||||
LOG.info("Command stderr: {}", err.toString());
|
||||
assertEquals("Return value of the command is not successful", 2, ret);
|
||||
assertTrue("Result of cluster topology verify " +
|
||||
"should be logged correctly", out.toString()
|
||||
.contains("less than the minimum required number of racks"));
|
||||
assertTrue("Error output should be empty", err.toString().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -127,10 +143,13 @@ public void testRS32MinRacks() throws Exception {
|
||||
.getByID(SystemErasureCodingPolicies.RS_3_2_POLICY_ID).getName());
|
||||
String[] args = {"-verifyClusterSetup"};
|
||||
final int ret = admin.run(args);
|
||||
LOG.info("Commend stdout: {}", out.toString());
|
||||
assertEquals(2, ret);
|
||||
assertTrue(out.toString()
|
||||
LOG.info("Command stdout: {}", out.toString());
|
||||
LOG.info("Command stderr: {}", err.toString());
|
||||
assertEquals("Return value of the command is not successful", 2, ret);
|
||||
assertTrue("Result of cluster topology verify " +
|
||||
"should be logged correctly", out.toString()
|
||||
.contains("less than the minimum required number of racks"));
|
||||
assertTrue("Error output should be empty", err.toString().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -138,8 +157,13 @@ public void testRS63Good() throws Exception {
|
||||
cluster = DFSTestUtil.setupCluster(conf, 9, 3, 0);
|
||||
String[] args = {"-verifyClusterSetup"};
|
||||
final int ret = admin.run(args);
|
||||
LOG.info("Commend stdout: {}", out.toString());
|
||||
assertEquals(0, ret);
|
||||
LOG.info("Command stdout: {}", out.toString());
|
||||
LOG.info("Command stderr: {}", err.toString());
|
||||
assertEquals("Return value of the command is successful", 0, ret);
|
||||
assertTrue("Result of cluster topology verify " +
|
||||
"should be logged correctly", out.toString().contains(
|
||||
"The cluster setup can support EC policies: RS-6-3-1024k"));
|
||||
assertTrue("Error output should be empty", err.toString().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -150,8 +174,72 @@ public void testNoECEnabled() throws Exception {
|
||||
.getByID(SystemErasureCodingPolicies.RS_6_3_POLICY_ID).getName());
|
||||
String[] args = {"-verifyClusterSetup"};
|
||||
final int ret = admin.run(args);
|
||||
LOG.info("Commend stdout: {}", out.toString());
|
||||
assertEquals(0, ret);
|
||||
assertTrue(out.toString().contains("No erasure coding policy is enabled"));
|
||||
LOG.info("Command stdout: {}", out.toString());
|
||||
LOG.info("Command stderr: {}", err.toString());
|
||||
assertEquals("Return value of the command is successful", 0, ret);
|
||||
assertTrue("Result of cluster topology verify " +
|
||||
"should be logged correctly",
|
||||
out.toString().contains("No erasure coding policy is given"));
|
||||
assertTrue("Error output should be empty", err.toString().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnsuccessfulEnablePolicyMessage() throws Exception {
|
||||
cluster = DFSTestUtil.setupCluster(conf, 5, 2, 0);
|
||||
cluster.getFileSystem().disableErasureCodingPolicy(
|
||||
SystemErasureCodingPolicies
|
||||
.getByID(SystemErasureCodingPolicies.RS_6_3_POLICY_ID).getName());
|
||||
String[] args = {"-enablePolicy", "-policy", "RS-3-2-1024k"};
|
||||
|
||||
final int ret = admin.run(args);
|
||||
LOG.info("Command stdout: {}", out.toString());
|
||||
LOG.info("Command stderr: {}", err.toString());
|
||||
assertEquals("Return value of the command is successful", 0, ret);
|
||||
assertTrue("Enabling policy should be logged", out.toString()
|
||||
.contains("Erasure coding policy RS-3-2-1024k is enabled"));
|
||||
assertTrue("Warning about cluster topology should be printed",
|
||||
err.toString().contains("Warning: The cluster setup does not support " +
|
||||
"EC policy RS-3-2-1024k. Reason:"));
|
||||
assertTrue("Warning about cluster topology should be printed",
|
||||
err.toString()
|
||||
.contains("less than the minimum required number of racks"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuccessfulEnablePolicyMessage() throws Exception {
|
||||
cluster = DFSTestUtil.setupCluster(conf, 5, 3, 0);
|
||||
cluster.getFileSystem().disableErasureCodingPolicy(
|
||||
SystemErasureCodingPolicies
|
||||
.getByID(SystemErasureCodingPolicies.RS_6_3_POLICY_ID).getName());
|
||||
String[] args = {"-enablePolicy", "-policy", "RS-3-2-1024k"};
|
||||
|
||||
final int ret = admin.run(args);
|
||||
LOG.info("Command stdout: {}", out.toString());
|
||||
LOG.info("Command stderr: {}", err.toString());
|
||||
assertEquals("Return value of the command is successful", 0, ret);
|
||||
assertTrue("Enabling policy should be logged", out.toString()
|
||||
.contains("Erasure coding policy RS-3-2-1024k is enabled"));
|
||||
assertFalse("Warning about cluster topology should not be printed",
|
||||
out.toString().contains("Warning: The cluster setup does not support"));
|
||||
assertTrue("Error output should be empty", err.toString().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnableNonExistentPolicyMessage() throws Exception {
|
||||
cluster = DFSTestUtil.setupCluster(conf, 5, 3, 0);
|
||||
cluster.getFileSystem().disableErasureCodingPolicy(
|
||||
SystemErasureCodingPolicies
|
||||
.getByID(SystemErasureCodingPolicies.RS_6_3_POLICY_ID).getName());
|
||||
String[] args = {"-enablePolicy", "-policy", "NonExistentPolicy"};
|
||||
|
||||
final int ret = admin.run(args);
|
||||
LOG.info("Command stdout: {}", out.toString());
|
||||
LOG.info("Command stderr: {}", err.toString());
|
||||
assertEquals("Return value of the command is unsuccessful", 2, ret);
|
||||
assertFalse("Enabling policy should not be logged when " +
|
||||
"it was unsuccessful", out.toString().contains("is enabled"));
|
||||
assertTrue("Error message should be printed",
|
||||
err.toString().contains("RemoteException: The policy name " +
|
||||
"NonExistentPolicy does not exist"));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user