HDFS-14039. ec -listPolicies doesn't show correct state for the default policy when the default is not RS(6,3). Contributed by Kitti Nanasi.

Signed-off-by: Xiao Chen <xiao@apache.org>
This commit is contained in:
Kitti Nanasi 2018-11-08 10:00:09 -08:00 committed by Xiao Chen
parent 724c15007b
commit 8d99648c20
8 changed files with 231 additions and 45 deletions

View File

@ -35,6 +35,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -80,6 +81,15 @@ public final class ErasureCodingPolicyManager {
*/
private ErasureCodingPolicyInfo[] allPolicies;
/**
* All policies in the state as it will be persisted in the fsimage.
*
* The difference between persisted policies and all policies is that
* if a default policy is only enabled at startup,
* it will appear as disabled in the persisted policy list and in the fsimage.
*/
private Map<Byte, ErasureCodingPolicyInfo> allPersistedPolicies;
/**
* All enabled policies sorted by name for fast querying, including built-in
* policy, user defined policy.
@ -90,6 +100,7 @@ public final class ErasureCodingPolicyManager {
*/
private ErasureCodingPolicy[] enabledPolicies;
private String defaultPolicyName;
private volatile static ErasureCodingPolicyManager instance = null;
@ -102,14 +113,11 @@ public static ErasureCodingPolicyManager getInstance() {
private ErasureCodingPolicyManager() {}
public void init(Configuration conf) {
// Load erasure coding default policy
final String defaultPolicyName = conf.getTrimmed(
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT);
public void init(Configuration conf) throws IOException {
this.policiesByName = new TreeMap<>();
this.policiesByID = new TreeMap<>();
this.enabledPoliciesByName = new TreeMap<>();
this.allPersistedPolicies = new TreeMap<>();
/**
* TODO: load user defined EC policy from fsImage HDFS-7859
@ -125,31 +133,12 @@ public void init(Configuration conf) {
final ErasureCodingPolicyInfo info = new ErasureCodingPolicyInfo(policy);
policiesByName.put(policy.getName(), info);
policiesByID.put(policy.getId(), info);
allPersistedPolicies.put(policy.getId(),
new ErasureCodingPolicyInfo(policy));
}
if (!defaultPolicyName.isEmpty()) {
final ErasureCodingPolicyInfo info =
policiesByName.get(defaultPolicyName);
if (info == null) {
String names = policiesByName.values()
.stream().map((pi) -> pi.getPolicy().getName())
.collect(Collectors.joining(", "));
String msg = String.format("EC policy '%s' specified at %s is not a "
+ "valid policy. Please choose from list of available "
+ "policies: [%s]",
defaultPolicyName,
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
names);
throw new HadoopIllegalArgumentException(msg);
}
info.setState(ErasureCodingPolicyState.ENABLED);
enabledPoliciesByName.put(info.getPolicy().getName(), info.getPolicy());
}
enabledPolicies =
enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]);
allPolicies =
policiesByName.values().toArray(new ErasureCodingPolicyInfo[0]);
enableDefaultPolicy(conf);
updatePolicies();
maxCellSize = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_KEY,
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_DEFAULT);
@ -200,6 +189,21 @@ public ErasureCodingPolicyInfo[] getPolicies() {
return allPolicies;
}
/**
* Get all system defined policies and user defined policies
* as it is written out in the fsimage.
*
* The difference between persisted policies and all policies is that
* if a default policy is only enabled at startup,
* it will appear as disabled in the persisted policy list and in the fsimage.
*
* @return persisted policies
*/
public ErasureCodingPolicyInfo[] getPersistedPolicies() {
return allPersistedPolicies.values()
.toArray(new ErasureCodingPolicyInfo[0]);
}
/**
* Get a {@link ErasureCodingPolicy} by policy ID, including system policy
* and user defined policy.
@ -299,6 +303,8 @@ public synchronized ErasureCodingPolicy addPolicy(
this.policiesByID.put(policy.getId(), pi);
allPolicies =
policiesByName.values().toArray(new ErasureCodingPolicyInfo[0]);
allPersistedPolicies.put(policy.getId(),
new ErasureCodingPolicyInfo(policy));
return policy;
}
@ -335,7 +341,8 @@ public synchronized void removePolicy(String name) {
}
info.setState(ErasureCodingPolicyState.REMOVED);
LOG.info("Remove erasure coding policy " + name);
allPersistedPolicies.put(ecPolicy.getId(),
createPolicyInfo(ecPolicy, ErasureCodingPolicyState.REMOVED));
/*
* TODO HDFS-12405 postpone the delete removed policy to Namenode restart
* time.
@ -370,6 +377,9 @@ public synchronized boolean disablePolicy(String name) {
enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]);
info.setState(ErasureCodingPolicyState.DISABLED);
LOG.info("Disable the erasure coding policy " + name);
allPersistedPolicies.put(info.getPolicy().getId(),
createPolicyInfo(info.getPolicy(),
ErasureCodingPolicyState.DISABLED));
return true;
}
return false;
@ -385,6 +395,12 @@ public synchronized boolean enablePolicy(String name) {
name + " does not exist");
}
if (enabledPoliciesByName.containsKey(name)) {
if (defaultPolicyName.equals(name)) {
allPersistedPolicies.put(info.getPolicy().getId(),
createPolicyInfo(info.getPolicy(),
ErasureCodingPolicyState.ENABLED));
return true;
}
return false;
}
final ErasureCodingPolicy ecPolicy = info.getPolicy();
@ -392,6 +408,8 @@ public synchronized boolean enablePolicy(String name) {
info.setState(ErasureCodingPolicyState.ENABLED);
enabledPolicies =
enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]);
allPersistedPolicies.put(ecPolicy.getId(),
createPolicyInfo(info.getPolicy(), ErasureCodingPolicyState.ENABLED));
LOG.info("Enable the erasure coding policy " + name);
return true;
}
@ -414,6 +432,8 @@ private void loadPolicy(ErasureCodingPolicyInfo info) {
if (info.isEnabled()) {
enablePolicy(policy.getName());
}
allPersistedPolicies.put(policy.getId(),
createPolicyInfo(policy, info.getState()));
}
/**
@ -423,11 +443,43 @@ private void loadPolicy(ErasureCodingPolicyInfo info) {
*
*/
public synchronized void loadPolicies(
List<ErasureCodingPolicyInfo> ecPolicies) {
List<ErasureCodingPolicyInfo> ecPolicies, Configuration conf)
throws IOException{
Preconditions.checkNotNull(ecPolicies);
for (ErasureCodingPolicyInfo p : ecPolicies) {
loadPolicy(p);
}
enableDefaultPolicy(conf);
updatePolicies();
}
private void enableDefaultPolicy(Configuration conf) throws IOException {
defaultPolicyName = conf.getTrimmed(
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT);
if (!defaultPolicyName.isEmpty()) {
final ErasureCodingPolicyInfo info =
policiesByName.get(defaultPolicyName);
if (info == null) {
String names = policiesByName.values()
.stream().map((pi) -> pi.getPolicy().getName())
.collect(Collectors.joining(", "));
String msg = String.format("EC policy '%s' specified at %s is not a "
+ "valid policy. Please choose from list of available "
+ "policies: [%s]",
defaultPolicyName,
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
names);
throw new IOException(msg);
}
info.setState(ErasureCodingPolicyState.ENABLED);
enabledPoliciesByName.put(info.getPolicy().getName(), info.getPolicy());
}
}
private void updatePolicies() {
enabledPolicies =
enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]);
allPolicies =
policiesByName.values().toArray(new ErasureCodingPolicyInfo[0]);
}
@ -436,4 +488,11 @@ public String getEnabledPoliciesMetric() {
return StringUtils.join(", ",
enabledPoliciesByName.keySet());
}
private ErasureCodingPolicyInfo createPolicyInfo(ErasureCodingPolicy p,
ErasureCodingPolicyState s) {
ErasureCodingPolicyInfo policyInfo = new ErasureCodingPolicyInfo(p);
policyInfo.setState(s);
return policyInfo;
}
}

View File

@ -380,7 +380,7 @@ private void loadErasureCodingSection(InputStream in)
ecPolicies.add(PBHelperClient.convertErasureCodingPolicyInfo(
s.getPolicies(i)));
}
fsn.getErasureCodingPolicyManager().loadPolicies(ecPolicies);
fsn.getErasureCodingPolicyManager().loadPolicies(ecPolicies, conf);
}
}
@ -601,7 +601,7 @@ private void saveErasureCodingSection(
FileSummary.Builder summary) throws IOException {
final FSNamesystem fsn = context.getSourceNamesystem();
ErasureCodingPolicyInfo[] ecPolicies =
fsn.getErasureCodingPolicyManager().getPolicies();
fsn.getErasureCodingPolicyManager().getPersistedPolicies();
ArrayList<ErasureCodingPolicyProto> ecPolicyProtoes =
new ArrayList<ErasureCodingPolicyProto>();
for (ErasureCodingPolicyInfo p : ecPolicies) {

View File

@ -503,7 +503,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);
defaultECPolicyName = conf.get(
defaultECPolicyName = conf.getTrimmed(
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT);

View File

@ -20,6 +20,8 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.test.GenericTestUtils;
@ -28,10 +30,16 @@
import org.junit.Test;
import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
@ -43,14 +51,14 @@ public class TestEnabledECPolicies {
@Rule
public Timeout testTimeout = new Timeout(60000);
private void expectInvalidPolicy(String value) {
private void expectInvalidPolicy(String value) throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
value);
try {
ErasureCodingPolicyManager.getInstance().init(conf);
fail("Expected exception when instantiating ECPolicyManager");
} catch (IllegalArgumentException e) {
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("is not a valid policy", e);
}
}
@ -117,6 +125,70 @@ public void testGetPolicies() throws Exception {
testGetPolicies(enabledPolicies);
}
@Test
public void testChangeDefaultPolicy() throws Exception {
final HdfsConfiguration conf = new HdfsConfiguration();
final String testPolicy = "RS-3-2-1024k";
final String defaultPolicy = conf.getTrimmed(
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT);
assertNotEquals("The default policy and the next default policy " +
"should not be the same!", testPolicy, defaultPolicy);
ErasureCodingPolicyManager manager =
ErasureCodingPolicyManager.getInstance();
// Change the default policy to a new one
conf.set(
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
testPolicy);
manager.init(conf);
// Load policies similar to when fsimage is loaded at namenode startup
manager.loadPolicies(constructAllDisabledInitialPolicies(), conf);
ErasureCodingPolicyInfo[] getPoliciesResult = manager.getPolicies();
boolean isEnabled = isPolicyEnabled(testPolicy, getPoliciesResult);
assertTrue("The new default policy should be " +
"in enabled state!", isEnabled);
ErasureCodingPolicyInfo[] getPersistedPoliciesResult
= manager.getPersistedPolicies();
isEnabled = isPolicyEnabled(testPolicy, getPersistedPoliciesResult);
assertFalse("The new default policy should be " +
"in disabled state in the persisted list!", isEnabled);
manager.disablePolicy(testPolicy);
getPoliciesResult = manager.getPolicies();
isEnabled = isPolicyEnabled(testPolicy, getPoliciesResult);
assertFalse("The new default policy should be " +
"in disabled state!", isEnabled);
getPersistedPoliciesResult
= manager.getPersistedPolicies();
isEnabled = isPolicyEnabled(testPolicy, getPersistedPoliciesResult);
assertFalse("The new default policy should be " +
"in disabled state in the persisted list!", isEnabled);
manager.enablePolicy(testPolicy);
getPoliciesResult = manager.getPolicies();
isEnabled = isPolicyEnabled(testPolicy, getPoliciesResult);
assertTrue("The new default policy should be " +
"in enabled state!", isEnabled);
getPersistedPoliciesResult
= manager.getPersistedPolicies();
isEnabled = isPolicyEnabled(testPolicy, getPersistedPoliciesResult);
assertTrue("The new default policy should be " +
"in enabled state in the persisted list!", isEnabled);
final String emptyPolicy = "";
// Change the default policy to a empty
conf.set(
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY, emptyPolicy);
manager.init(conf);
// Load policies similar to when fsimage is loaded at namenode startup
manager.loadPolicies(constructAllDisabledInitialPolicies(), conf);
// All the policies are disabled if the default policy is empty
getPoliciesResult = manager.getPolicies();
assertAllPoliciesAreDisabled(getPoliciesResult);
}
private void testGetPolicies(ErasureCodingPolicy[] enabledPolicies)
throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
@ -154,4 +226,31 @@ private void testGetPolicies(ErasureCodingPolicy[] enabledPolicies)
}
}
}
private List<ErasureCodingPolicyInfo> constructAllDisabledInitialPolicies() {
List<ErasureCodingPolicyInfo> policies = new ArrayList<>();
for (ErasureCodingPolicy p: SystemErasureCodingPolicies.getPolicies()) {
policies.add(new ErasureCodingPolicyInfo(p,
ErasureCodingPolicyState.DISABLED));
}
return policies;
}
private boolean isPolicyEnabled(String testPolicy,
ErasureCodingPolicyInfo[] policies) {
for (ErasureCodingPolicyInfo p : policies) {
if (testPolicy.equals(p.getPolicy().getName())) {
return p.isEnabled();
}
}
fail("The result should contain the test policy!");
return false;
}
private void assertAllPoliciesAreDisabled(
ErasureCodingPolicyInfo[] policies) {
for (ErasureCodingPolicyInfo p : policies) {
assertTrue("Policy should be disabled", p.isDisabled());
}
}
}

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@ -883,15 +884,19 @@ public void testSaveAndLoadErasureCodingPolicies() throws IOException{
DFSTestUtil.getECPolicyState(ecPolicy));
// Test enable/disable/remove user customized erasure coding policy
testChangeErasureCodingPolicyState(cluster, blockSize, newPolicy);
// Test enable/disable built-in erasure coding policy
testChangeErasureCodingPolicyState(cluster, blockSize, newPolicy, false);
// Test enable/disable default built-in erasure coding policy
testChangeErasureCodingPolicyState(cluster, blockSize,
SystemErasureCodingPolicies.getByID((byte) 1));
SystemErasureCodingPolicies.getByID((byte) 1), true);
// Test enable/disable non-default built-in erasure coding policy
testChangeErasureCodingPolicyState(cluster, blockSize,
SystemErasureCodingPolicies.getByID((byte) 2), false);
}
}
private void testChangeErasureCodingPolicyState(MiniDFSCluster cluster,
int blockSize, ErasureCodingPolicy targetPolicy) throws IOException {
int blockSize, ErasureCodingPolicy targetPolicy, boolean isDefault)
throws IOException {
DistributedFileSystem fs = cluster.getFileSystem();
// 1. Enable an erasure coding policy
@ -920,6 +925,9 @@ private void testChangeErasureCodingPolicyState(MiniDFSCluster cluster,
assertEquals("The erasure coding policy should be of enabled state",
ErasureCodingPolicyState.ENABLED,
DFSTestUtil.getECPolicyState(ecPolicy));
assertTrue("Policy should be in disabled state in FSImage!",
isPolicyEnabledInFsImage(targetPolicy));
// Read file regardless of the erasure coding policy state
DFSTestUtil.readFileAsBytes(fs, filePath);
@ -936,9 +944,18 @@ private void testChangeErasureCodingPolicyState(MiniDFSCluster cluster,
ErasureCodingPolicyManager.getInstance().getByID(targetPolicy.getId());
assertEquals("The erasure coding policy is not found",
targetPolicy, ecPolicy);
assertEquals("The erasure coding policy should be of disabled state",
ErasureCodingPolicyState.DISABLED,
DFSTestUtil.getECPolicyState(ecPolicy));
ErasureCodingPolicyState ecPolicyState =
DFSTestUtil.getECPolicyState(ecPolicy);
if (isDefault) {
assertEquals("The erasure coding policy should be of " +
"enabled state", ErasureCodingPolicyState.ENABLED, ecPolicyState);
} else {
assertEquals("The erasure coding policy should be of " +
"disabled state", ErasureCodingPolicyState.DISABLED, ecPolicyState);
}
assertFalse("Policy should be in disabled state in FSImage!",
isPolicyEnabledInFsImage(targetPolicy));
// Read file regardless of the erasure coding policy state
DFSTestUtil.readFileAsBytes(fs, filePath);
@ -972,4 +989,15 @@ private void testChangeErasureCodingPolicyState(MiniDFSCluster cluster,
DFSTestUtil.readFileAsBytes(fs, filePath);
fs.delete(dirPath, true);
}
private boolean isPolicyEnabledInFsImage(ErasureCodingPolicy testPolicy) {
ErasureCodingPolicyInfo[] persistedPolicies =
ErasureCodingPolicyManager.getInstance().getPersistedPolicies();
for (ErasureCodingPolicyInfo p : persistedPolicies) {
if(p.getPolicy().getName().equals(testPolicy.getName())) {
return p.isEnabled();
}
}
throw new AssertionError("Policy is not found!");
}
}

View File

@ -463,7 +463,7 @@ public void testRetryCacheRebuild() throws Exception {
assertTrue(namesystem.hasRetryCache());
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem
.getRetryCache().getCacheSet();
assertEquals("Retry cache size is wrong", 38, cacheSet.size());
assertEquals("Retry cache size is wrong", 39, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();

View File

@ -90,7 +90,7 @@ private static INodeFile createStripedINodeFile() {
public ExpectedException thrown = ExpectedException.none();
@Before
public void init() {
public void init() throws IOException {
Configuration conf = new HdfsConfiguration();
ErasureCodingPolicyManager.getInstance().init(conf);
}

View File

@ -195,7 +195,7 @@ public void testRetryCacheOnStandbyNN() throws Exception {
FSNamesystem fsn1 = cluster.getNamesystem(1);
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1
.getRetryCache().getCacheSet();
assertEquals("Retry cache size is wrong", 38, cacheSet.size());
assertEquals("Retry cache size is wrong", 39, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();