HDDS-1490. Support configurable container placement policy through 'o… (#903)
This commit is contained in:
parent
e1dfc060f8
commit
8ca58efeec
@ -312,6 +312,10 @@ public final class ScmConfigKeys {
|
||||
public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY =
|
||||
"ozone.scm.container.placement.impl";
|
||||
|
||||
public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT =
|
||||
"org.apache.hadoop.hdds.scm.container.placement.algorithms." +
|
||||
"SCMContainerPlacementRackAware";
|
||||
|
||||
public static final String OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT =
|
||||
"ozone.scm.pipeline.owner.container.count";
|
||||
public static final int OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT = 3;
|
||||
|
@ -0,0 +1,21 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.container.placement.algorithms;
|
||||
/**
|
||||
Contains container placement policy interface definition.
|
||||
**/
|
@ -732,7 +732,7 @@ public String toString() {
|
||||
try {
|
||||
// print the number of leaves
|
||||
int numOfLeaves = clusterTree.getNumOfLeaves();
|
||||
tree.append("Expected number of leaves:");
|
||||
tree.append("Number of leaves:");
|
||||
tree.append(numOfLeaves);
|
||||
tree.append("\n");
|
||||
// print all nodes
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.net;
|
||||
|
||||
import org.apache.commons.io.FilenameUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.w3c.dom.Document;
|
||||
@ -31,7 +32,10 @@
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@ -93,23 +97,50 @@ public List<NodeSchema> getSchemaList() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Load user defined network layer schemas from a XML configuration file.
|
||||
* Load user defined network layer schemas from a XML/YAML configuration file.
|
||||
* @param schemaFilePath path of schema file
|
||||
* @return all valid node schemas defined in schema file
|
||||
*/
|
||||
public NodeSchemaLoadResult loadSchemaFromXml(String schemaFilePath)
|
||||
throws IllegalArgumentException {
|
||||
public NodeSchemaLoadResult loadSchemaFromFile(String schemaFilePath)
|
||||
throws IllegalArgumentException, FileNotFoundException {
|
||||
try {
|
||||
File schemaFile = new File(schemaFilePath);
|
||||
if (!schemaFile.exists()) {
|
||||
String msg = "Network topology layer schema file " + schemaFilePath +
|
||||
" is not found.";
|
||||
LOG.warn(msg);
|
||||
throw new IllegalArgumentException(msg);
|
||||
// try to load with classloader
|
||||
ClassLoader classloader =
|
||||
Thread.currentThread().getContextClassLoader();
|
||||
if (classloader == null) {
|
||||
classloader = NodeSchemaLoader.class.getClassLoader();
|
||||
}
|
||||
if (classloader != null) {
|
||||
URL url = classloader.getResource(schemaFilePath);
|
||||
if (url != null) {
|
||||
schemaFile = new File(url.toURI());
|
||||
}
|
||||
}
|
||||
}
|
||||
return loadSchema(schemaFile);
|
||||
} catch (ParserConfigurationException | IOException | SAXException e) {
|
||||
throw new IllegalArgumentException("Fail to load network topology node"
|
||||
|
||||
if (!schemaFile.exists()) {
|
||||
String msg = "Network topology layer schema file " +
|
||||
schemaFilePath + "[" + schemaFile.getAbsolutePath() +
|
||||
"] is not found.";
|
||||
LOG.warn(msg);
|
||||
throw new FileNotFoundException(msg);
|
||||
}
|
||||
|
||||
LOG.info("Load network topology schema file " +
|
||||
schemaFile.getCanonicalPath());
|
||||
if (FilenameUtils.getExtension(schemaFilePath).toLowerCase()
|
||||
.compareTo("yaml") == 0) {
|
||||
return loadSchemaFromYaml(schemaFile);
|
||||
} else {
|
||||
return loadSchema(schemaFile);
|
||||
}
|
||||
} catch (FileNotFoundException e) {
|
||||
throw e;
|
||||
} catch (ParserConfigurationException | IOException | SAXException |
|
||||
URISyntaxException e) {
|
||||
throw new IllegalArgumentException("Failed to load network topology node"
|
||||
+ " schema file: " + schemaFilePath + " , error:" + e.getMessage());
|
||||
}
|
||||
}
|
||||
@ -167,29 +198,6 @@ private NodeSchemaLoadResult loadSchema(File schemaFile) throws
|
||||
return schemaList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load user defined network layer schemas from a YAML configuration file.
|
||||
* @param schemaFilePath path of schema file
|
||||
* @return all valid node schemas defined in schema file
|
||||
*/
|
||||
public NodeSchemaLoadResult loadSchemaFromYaml(String schemaFilePath)
|
||||
throws IllegalArgumentException {
|
||||
try {
|
||||
File schemaFile = new File(schemaFilePath);
|
||||
if (!schemaFile.exists()) {
|
||||
String msg = "Network topology layer schema file " + schemaFilePath +
|
||||
" is not found.";
|
||||
LOG.warn(msg);
|
||||
throw new IllegalArgumentException(msg);
|
||||
}
|
||||
return loadSchemaFromYaml(schemaFile);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("Fail to load network topology node"
|
||||
+ " schema file: " + schemaFilePath + " , error:"
|
||||
+ e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load network topology layer schemas from a YAML configuration file.
|
||||
* @param schemaFile schema file
|
||||
|
@ -19,7 +19,6 @@
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.io.FilenameUtils;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.net.NodeSchemaLoader.NodeSchemaLoadResult;
|
||||
import org.slf4j.Logger;
|
||||
@ -63,20 +62,14 @@ public void init(Configuration conf) {
|
||||
String schemaFile = conf.get(
|
||||
ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
|
||||
ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT);
|
||||
|
||||
NodeSchemaLoadResult result;
|
||||
try {
|
||||
if (FilenameUtils.getExtension(schemaFile).toLowerCase()
|
||||
.compareTo("yaml") == 0) {
|
||||
result = NodeSchemaLoader.getInstance().loadSchemaFromYaml(schemaFile);
|
||||
} else {
|
||||
result = NodeSchemaLoader.getInstance().loadSchemaFromXml(schemaFile);
|
||||
}
|
||||
result = NodeSchemaLoader.getInstance().loadSchemaFromFile(schemaFile);
|
||||
allSchema = result.getSchemaList();
|
||||
enforcePrefix = result.isEnforePrefix();
|
||||
maxLevel = allSchema.size();
|
||||
} catch (Throwable e) {
|
||||
String msg = "Fail to load schema file:" + schemaFile
|
||||
String msg = "Failed to load schema file:" + schemaFile
|
||||
+ ", error:" + e.getMessage();
|
||||
LOG.error(msg);
|
||||
throw new RuntimeException(msg);
|
||||
|
@ -815,11 +815,13 @@
|
||||
<property>
|
||||
<name>ozone.scm.container.placement.impl</name>
|
||||
<value>
|
||||
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom
|
||||
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackAware
|
||||
</value>
|
||||
<tag>OZONE, MANAGEMENT</tag>
|
||||
<description>Placement policy class for containers.
|
||||
Defaults to SCMContainerPlacementRandom.class
|
||||
<description>
|
||||
The full name of class which implements org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy.
|
||||
The class decides which datanode will be used to host the container replica. If not set,
|
||||
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackAware will be used as default value.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
|
@ -44,7 +44,7 @@ public TestNodeSchemaLoader(String schemaFile, String errMsg) {
|
||||
try {
|
||||
String filePath = classLoader.getResource(
|
||||
"./networkTopologyTestFiles/" + schemaFile).getPath();
|
||||
NodeSchemaLoader.getInstance().loadSchemaFromXml(filePath);
|
||||
NodeSchemaLoader.getInstance().loadSchemaFromFile(filePath);
|
||||
fail("expect exceptions");
|
||||
} catch (Throwable e) {
|
||||
assertTrue(e.getMessage().contains(errMsg));
|
||||
@ -83,7 +83,7 @@ public void testGood() {
|
||||
try {
|
||||
String filePath = classLoader.getResource(
|
||||
"./networkTopologyTestFiles/good.xml").getPath();
|
||||
NodeSchemaLoader.getInstance().loadSchemaFromXml(filePath);
|
||||
NodeSchemaLoader.getInstance().loadSchemaFromFile(filePath);
|
||||
} catch (Throwable e) {
|
||||
fail("should succeed");
|
||||
}
|
||||
@ -94,10 +94,10 @@ public void testNotExist() {
|
||||
String filePath = classLoader.getResource(
|
||||
"./networkTopologyTestFiles/good.xml").getPath() + ".backup";
|
||||
try {
|
||||
NodeSchemaLoader.getInstance().loadSchemaFromXml(filePath);
|
||||
NodeSchemaLoader.getInstance().loadSchemaFromFile(filePath);
|
||||
fail("should fail");
|
||||
} catch (Throwable e) {
|
||||
assertTrue(e.getMessage().contains("file " + filePath + " is not found"));
|
||||
assertTrue(e.getMessage().contains("not found"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -79,7 +79,7 @@ public void testInitFailure() {
|
||||
manager.init(conf);
|
||||
fail("should fail");
|
||||
} catch (Throwable e) {
|
||||
assertTrue(e.getMessage().contains("Fail to load schema file:" +
|
||||
assertTrue(e.getMessage().contains("Failed to load schema file:" +
|
||||
filePath));
|
||||
}
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ public TestYamlSchemaLoader(String schemaFile, String errMsg) {
|
||||
try {
|
||||
String filePath = classLoader.getResource(
|
||||
"./networkTopologyTestFiles/" + schemaFile).getPath();
|
||||
NodeSchemaLoader.getInstance().loadSchemaFromYaml(filePath);
|
||||
NodeSchemaLoader.getInstance().loadSchemaFromFile(filePath);
|
||||
fail("expect exceptions");
|
||||
} catch (Throwable e) {
|
||||
assertTrue(e.getMessage().contains(errMsg));
|
||||
@ -69,7 +69,7 @@ public void testGood() {
|
||||
try {
|
||||
String filePath = classLoader.getResource(
|
||||
"./networkTopologyTestFiles/good.yaml").getPath();
|
||||
NodeSchemaLoader.getInstance().loadSchemaFromYaml(filePath);
|
||||
NodeSchemaLoader.getInstance().loadSchemaFromFile(filePath);
|
||||
} catch (Throwable e) {
|
||||
fail("should succeed");
|
||||
}
|
||||
@ -78,12 +78,12 @@ public void testGood() {
|
||||
@Test
|
||||
public void testNotExist() {
|
||||
String filePath = classLoader.getResource(
|
||||
"./networkTopologyTestFiles/good.xml").getPath() + ".backup";
|
||||
"./networkTopologyTestFiles/good.yaml").getPath() + ".backup";
|
||||
try {
|
||||
NodeSchemaLoader.getInstance().loadSchemaFromXml(filePath);
|
||||
NodeSchemaLoader.getInstance().loadSchemaFromFile(filePath);
|
||||
fail("should fail");
|
||||
} catch (Throwable e) {
|
||||
assertTrue(e.getMessage().contains("file " + filePath + " is not found"));
|
||||
assertTrue(e.getMessage().contains("not found"));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -141,5 +141,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<testResources>
|
||||
<testResource>
|
||||
<directory>${basedir}/../../hadoop-hdds/common/src/main/resources</directory>
|
||||
</testResource>
|
||||
</testResources>
|
||||
</build>
|
||||
</project>
|
||||
|
@ -0,0 +1,67 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.container.placement.algorithms;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
|
||||
/**
|
||||
* A factory to create container placement instance based on configuration
|
||||
* property ozone.scm.container.placement.classname.
|
||||
*/
|
||||
public final class ContainerPlacementPolicyFactory {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ContainerPlacementPolicyFactory.class);
|
||||
|
||||
private ContainerPlacementPolicyFactory() {
|
||||
}
|
||||
|
||||
public static ContainerPlacementPolicy getPolicy(Configuration conf,
|
||||
final NodeManager nodeManager, NetworkTopology clusterMap,
|
||||
final boolean fallback) throws SCMException{
|
||||
final Class<? extends ContainerPlacementPolicy> placementClass = conf
|
||||
.getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
|
||||
SCMContainerPlacementRackAware.class,
|
||||
ContainerPlacementPolicy.class);
|
||||
Constructor<? extends ContainerPlacementPolicy> constructor;
|
||||
try {
|
||||
constructor = placementClass.getDeclaredConstructor(NodeManager.class,
|
||||
Configuration.class, NetworkTopology.class, boolean.class);
|
||||
} catch (NoSuchMethodException e) {
|
||||
String msg = "Failed to find constructor(NodeManager, Configuration, " +
|
||||
"NetworkTopology, boolean) for class " +
|
||||
placementClass.getCanonicalName();
|
||||
LOG.error(msg);
|
||||
throw new SCMException(msg,
|
||||
SCMException.ResultCodes.FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY);
|
||||
}
|
||||
|
||||
try {
|
||||
return constructor.newInstance(nodeManager, conf, clusterMap, fallback);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to instantiate class " +
|
||||
placementClass.getCanonicalName() + " for " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
@ -23,6 +23,7 @@
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
@ -77,7 +78,8 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
|
||||
* @param conf Configuration
|
||||
*/
|
||||
public SCMContainerPlacementCapacity(final NodeManager nodeManager,
|
||||
final Configuration conf) {
|
||||
final Configuration conf, final NetworkTopology networkTopology,
|
||||
final boolean fallback) {
|
||||
super(nodeManager, conf);
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,7 @@
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.slf4j.Logger;
|
||||
@ -49,7 +50,8 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy
|
||||
* @param conf Config
|
||||
*/
|
||||
public SCMContainerPlacementRandom(final NodeManager nodeManager,
|
||||
final Configuration conf) {
|
||||
final Configuration conf, final NetworkTopology networkTopology,
|
||||
final boolean fallback) {
|
||||
super(nodeManager, conf);
|
||||
}
|
||||
|
||||
|
@ -119,6 +119,7 @@ public enum ResultCodes {
|
||||
DUPLICATE_DATANODE,
|
||||
NO_SUCH_DATANODE,
|
||||
NO_REPLICA_FOUND,
|
||||
FAILED_TO_FIND_ACTIVE_PIPELINE
|
||||
FAILED_TO_FIND_ACTIVE_PIPELINE,
|
||||
FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY
|
||||
}
|
||||
}
|
||||
|
@ -41,6 +41,9 @@
|
||||
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
|
||||
import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
|
||||
import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
|
||||
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
|
||||
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
|
||||
import org.apache.hadoop.hdds.scm.safemode.SafeModeHandler;
|
||||
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
|
||||
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
|
||||
@ -53,7 +56,6 @@
|
||||
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
|
||||
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
|
||||
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
|
||||
@ -206,6 +208,11 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
||||
private final SafeModeHandler safeModeHandler;
|
||||
private SCMContainerMetrics scmContainerMetrics;
|
||||
|
||||
/**
|
||||
* Network topology Map.
|
||||
*/
|
||||
private NetworkTopology clusterMap;
|
||||
|
||||
/**
|
||||
* Creates a new StorageContainerManager. Configuration will be
|
||||
* updated with information on the actual listening addresses used
|
||||
@ -277,14 +284,13 @@ public StorageContainerManager(OzoneConfiguration conf,
|
||||
securityProtocolServer = null;
|
||||
}
|
||||
|
||||
|
||||
eventQueue = new EventQueue();
|
||||
long watcherTimeout =
|
||||
conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
|
||||
HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
|
||||
watcherTimeout);
|
||||
initalizeSystemManagers(conf, configurator);
|
||||
initializeSystemManagers(conf, configurator);
|
||||
|
||||
CloseContainerEventHandler closeContainerHandler =
|
||||
new CloseContainerEventHandler(pipelineManager, containerManager);
|
||||
@ -381,7 +387,7 @@ public StorageContainerManager(OzoneConfiguration conf,
|
||||
* used if needed.
|
||||
* @throws IOException - on Failure.
|
||||
*/
|
||||
private void initalizeSystemManagers(OzoneConfiguration conf,
|
||||
private void initializeSystemManagers(OzoneConfiguration conf,
|
||||
SCMConfigurator configurator)
|
||||
throws IOException {
|
||||
if(configurator.getScmNodeManager() != null) {
|
||||
@ -391,9 +397,10 @@ private void initalizeSystemManagers(OzoneConfiguration conf,
|
||||
conf, scmStorageConfig.getClusterID(), this, eventQueue);
|
||||
}
|
||||
|
||||
//TODO: support configurable containerPlacement policy
|
||||
clusterMap = new NetworkTopologyImpl(conf);
|
||||
ContainerPlacementPolicy containerPlacementPolicy =
|
||||
new SCMContainerPlacementCapacity(scmNodeManager, conf);
|
||||
ContainerPlacementPolicyFactory.getPolicy(conf, scmNodeManager,
|
||||
clusterMap, true);
|
||||
|
||||
if (configurator.getPipelineManager() != null) {
|
||||
pipelineManager = configurator.getPipelineManager();
|
||||
@ -1205,7 +1212,6 @@ public Map<String, Integer> getContainerStateCount() {
|
||||
return nodeStateCount;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the SCM metadata Store.
|
||||
* @return SCMMetadataStore
|
||||
|
@ -97,8 +97,9 @@ public void setup() throws IOException, InterruptedException {
|
||||
|
||||
Mockito.when(containerPlacementPolicy.chooseDatanodes(
|
||||
Mockito.anyListOf(DatanodeDetails.class),
|
||||
Mockito.anyListOf(DatanodeDetails.class), Mockito.anyInt(),
|
||||
Mockito.anyLong())).thenAnswer(invocation -> {
|
||||
Mockito.anyListOf(DatanodeDetails.class),
|
||||
Mockito.anyInt(), Mockito.anyLong()))
|
||||
.thenAnswer(invocation -> {
|
||||
int count = (int) invocation.getArguments()[2];
|
||||
return IntStream.range(0, count)
|
||||
.mapToObj(i -> randomDatanodeDetails())
|
||||
|
@ -0,0 +1,142 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.container.placement.algorithms;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.TestUtils;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
|
||||
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
|
||||
import org.apache.hadoop.hdds.scm.net.NodeSchema;
|
||||
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
|
||||
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
|
||||
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Test for scm container placement factory.
|
||||
*/
|
||||
public class TestContainerPlacementFactory {
|
||||
// network topology cluster
|
||||
private NetworkTopology cluster;
|
||||
// datanodes array list
|
||||
private List<DatanodeDetails> datanodes = new ArrayList<>();
|
||||
// node storage capacity
|
||||
private final long storageCapacity = 100L;
|
||||
// configuration
|
||||
private Configuration conf;
|
||||
// node manager
|
||||
private NodeManager nodeManager;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
//initialize network topology instance
|
||||
conf = new OzoneConfiguration();
|
||||
NodeSchema[] schemas = new NodeSchema[]
|
||||
{ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
|
||||
NodeSchemaManager.getInstance().init(schemas, true);
|
||||
cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance());
|
||||
|
||||
// build datanodes, and network topology
|
||||
String rack = "/rack";
|
||||
String hostname = "node";
|
||||
for (int i = 0; i < 15; i++) {
|
||||
// Totally 3 racks, each has 5 datanodes
|
||||
DatanodeDetails node = TestUtils.createDatanodeDetails(
|
||||
hostname + i, rack + (i / 5));
|
||||
datanodes.add(node);
|
||||
cluster.add(node);
|
||||
}
|
||||
|
||||
// create mock node manager
|
||||
nodeManager = Mockito.mock(NodeManager.class);
|
||||
when(nodeManager.getNodes(NodeState.HEALTHY))
|
||||
.thenReturn(new ArrayList<>(datanodes));
|
||||
when(nodeManager.getNodeStat(anyObject()))
|
||||
.thenReturn(new SCMNodeMetric(storageCapacity, 0L, 100L));
|
||||
when(nodeManager.getNodeStat(datanodes.get(2)))
|
||||
.thenReturn(new SCMNodeMetric(storageCapacity, 90L, 10L));
|
||||
when(nodeManager.getNodeStat(datanodes.get(3)))
|
||||
.thenReturn(new SCMNodeMetric(storageCapacity, 80L, 20L));
|
||||
when(nodeManager.getNodeStat(datanodes.get(4)))
|
||||
.thenReturn(new SCMNodeMetric(storageCapacity, 70L, 30L));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testDefaultPolicy() throws IOException {
|
||||
ContainerPlacementPolicy policy = ContainerPlacementPolicyFactory
|
||||
.getPolicy(conf, nodeManager, cluster, true);
|
||||
|
||||
int nodeNum = 3;
|
||||
List<DatanodeDetails> datanodeDetails =
|
||||
policy.chooseDatanodes(null, null, nodeNum, 15);
|
||||
Assert.assertEquals(nodeNum, datanodeDetails.size());
|
||||
Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
|
||||
datanodeDetails.get(1)));
|
||||
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
|
||||
datanodeDetails.get(2)));
|
||||
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
|
||||
datanodeDetails.get(2)));
|
||||
}
|
||||
|
||||
/**
|
||||
* A dummy container placement implementation for test.
|
||||
*/
|
||||
public class DummyImpl implements ContainerPlacementPolicy {
|
||||
@Override
|
||||
public List<DatanodeDetails> chooseDatanodes(
|
||||
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
|
||||
int nodesRequired, long sizeRequired) throws IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = SCMException.class)
|
||||
public void testConstuctorNotFound() throws SCMException {
|
||||
// set a placement class which does't have the right constructor implemented
|
||||
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
|
||||
"org.apache.hadoop.hdds.scm.container.placement.algorithms." +
|
||||
"TestContainerPlacementFactory$DummyImpl");
|
||||
ContainerPlacementPolicyFactory.getPolicy(conf, null, null, true);
|
||||
}
|
||||
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void testClassNotImplemented() throws SCMException {
|
||||
// set a placement class not implemented
|
||||
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
|
||||
"org.apache.hadoop.hdds.scm.container.placement.algorithm.HelloWorld");
|
||||
ContainerPlacementPolicyFactory.getPolicy(conf, null, null, true);
|
||||
}
|
||||
}
|
@ -64,7 +64,7 @@ public void chooseDatanodes() throws SCMException {
|
||||
.thenReturn(new SCMNodeMetric(100L, 70L, 30L));
|
||||
|
||||
SCMContainerPlacementCapacity scmContainerPlacementRandom =
|
||||
new SCMContainerPlacementCapacity(mockNodeManager, conf);
|
||||
new SCMContainerPlacementCapacity(mockNodeManager, conf, null, true);
|
||||
|
||||
List<DatanodeDetails> existingNodes = new ArrayList<>();
|
||||
existingNodes.add(datanodes.get(0));
|
||||
|
@ -16,6 +16,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.container.placement.algorithms;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
@ -23,6 +24,7 @@
|
||||
import org.apache.hadoop.hdds.scm.TestUtils;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.net.NetConstants;
|
||||
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
|
||||
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
|
||||
import org.apache.hadoop.hdds.scm.net.NodeSchema;
|
||||
@ -47,6 +49,8 @@
|
||||
*/
|
||||
public class TestSCMContainerPlacementRackAware {
|
||||
private NetworkTopology cluster;
|
||||
private Configuration conf;
|
||||
private NodeManager nodeManager;
|
||||
private List<DatanodeDetails> datanodes = new ArrayList<>();
|
||||
// policy with fallback capability
|
||||
private SCMContainerPlacementRackAware policy;
|
||||
@ -58,7 +62,7 @@ public class TestSCMContainerPlacementRackAware {
|
||||
@Before
|
||||
public void setup() {
|
||||
//initialize network topology instance
|
||||
Configuration conf = new OzoneConfiguration();
|
||||
conf = new OzoneConfiguration();
|
||||
NodeSchema[] schemas = new NodeSchema[]
|
||||
{ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
|
||||
NodeSchemaManager.getInstance().init(schemas, true);
|
||||
@ -76,7 +80,7 @@ public void setup() {
|
||||
}
|
||||
|
||||
// create mock node manager
|
||||
NodeManager nodeManager = Mockito.mock(NodeManager.class);
|
||||
nodeManager = Mockito.mock(NodeManager.class);
|
||||
when(nodeManager.getNodes(NodeState.HEALTHY))
|
||||
.thenReturn(new ArrayList<>(datanodes));
|
||||
when(nodeManager.getNodeStat(anyObject()))
|
||||
@ -254,4 +258,35 @@ public void testNoInfiniteLoop() throws SCMException {
|
||||
// request storage space larger than node capability
|
||||
policy.chooseDatanodes(null, null, nodeNum, STORAGE_CAPACITY + 15);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDatanodeWithDefaultNetworkLocation() throws SCMException {
|
||||
String hostname = "node";
|
||||
List<DatanodeDetails> dataList = new ArrayList<>();
|
||||
NetworkTopology clusterMap =
|
||||
new NetworkTopologyImpl(NodeSchemaManager.getInstance());
|
||||
for (int i = 0; i < 15; i++) {
|
||||
// Totally 3 racks, each has 5 datanodes
|
||||
DatanodeDetails node = TestUtils.createDatanodeDetails(
|
||||
hostname + i, null);
|
||||
dataList.add(node);
|
||||
clusterMap.add(node);
|
||||
}
|
||||
Assert.assertEquals(dataList.size(), StringUtils.countMatches(
|
||||
clusterMap.toString(), NetConstants.DEFAULT_RACK));
|
||||
|
||||
// choose nodes to host 3 replica
|
||||
int nodeNum = 3;
|
||||
SCMContainerPlacementRackAware newPolicy =
|
||||
new SCMContainerPlacementRackAware(nodeManager, conf, clusterMap, true);
|
||||
List<DatanodeDetails> datanodeDetails =
|
||||
newPolicy.chooseDatanodes(null, null, nodeNum, 15);
|
||||
Assert.assertEquals(nodeNum, datanodeDetails.size());
|
||||
Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
|
||||
datanodeDetails.get(1)));
|
||||
Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
|
||||
datanodeDetails.get(2)));
|
||||
Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(1),
|
||||
datanodeDetails.get(2)));
|
||||
}
|
||||
}
|
@ -59,7 +59,7 @@ public void chooseDatanodes() throws SCMException {
|
||||
.thenReturn(new SCMNodeMetric(100L, 90L, 10L));
|
||||
|
||||
SCMContainerPlacementRandom scmContainerPlacementRandom =
|
||||
new SCMContainerPlacementRandom(mockNodeManager, conf);
|
||||
new SCMContainerPlacementRandom(mockNodeManager, conf, null, true);
|
||||
|
||||
List<DatanodeDetails> existingNodes = new ArrayList<>();
|
||||
existingNodes.add(datanodes.get(0));
|
||||
|
@ -80,9 +80,11 @@ public void testCapacityPlacementYieldsBetterDataDistribution() throws
|
||||
.getStandardDeviation(), 0.001);
|
||||
|
||||
SCMContainerPlacementCapacity capacityPlacer = new
|
||||
SCMContainerPlacementCapacity(nodeManagerCapacity, new Configuration());
|
||||
SCMContainerPlacementCapacity(nodeManagerCapacity, new Configuration(),
|
||||
null, true);
|
||||
SCMContainerPlacementRandom randomPlacer = new
|
||||
SCMContainerPlacementRandom(nodeManagerRandom, new Configuration());
|
||||
SCMContainerPlacementRandom(nodeManagerRandom, new Configuration(),
|
||||
null, true);
|
||||
|
||||
for (int x = 0; x < opsCount; x++) {
|
||||
long containerSize = random.nextInt(100) * OzoneConsts.GB;
|
||||
|
@ -126,4 +126,12 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<testResources>
|
||||
<testResource>
|
||||
<directory>${basedir}/../../hadoop-hdds/common/src/main/resources</directory>
|
||||
</testResource>
|
||||
</testResources>
|
||||
</build>
|
||||
</project>
|
||||
|
@ -124,5 +124,13 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<testResources>
|
||||
<testResource>
|
||||
<directory>${basedir}/../../hadoop-hdds/common/src/main/resources</directory>
|
||||
</testResource>
|
||||
<testResource>
|
||||
<directory>${basedir}/src/test/resources</directory>
|
||||
</testResource>
|
||||
</testResources>
|
||||
</build>
|
||||
</project>
|
||||
|
Loading…
Reference in New Issue
Block a user