YARN-4405. Support node label store in non-appendable file system. Contributed by Wangda Tan
This commit is contained in:
parent
924a33d02d
commit
755dda8dd8
@ -18,27 +18,22 @@
|
||||
|
||||
package org.apache.hadoop.conf;
|
||||
|
||||
import java.lang.Class;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* Base class for comparing fields in one or more Configuration classes
|
||||
* against a corresponding .xml file. Usage is intended as follows:
|
||||
@ -331,6 +326,7 @@ public abstract class TestConfigurationFieldsBase {
|
||||
private static Set<String> compareConfigurationToXmlFields(Map<String,String> keyMap1, Map<String,String> keyMap2) {
|
||||
Set<String> retVal = new HashSet<String>(keyMap1.keySet());
|
||||
retVal.removeAll(keyMap2.keySet());
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
|
@ -589,6 +589,9 @@ Release 2.8.0 - UNRELEASED
|
||||
YARN-4292. ResourceUtilization should be a part of NodeInfo REST API.
|
||||
(Sunil G via wangda)
|
||||
|
||||
YARN-4405. Support node label store in non-appendable file system. (Wangda
|
||||
Tan via jianhe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||
|
@ -2072,6 +2072,12 @@ private static void addDeprecatedKeys() {
|
||||
*/
|
||||
public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels.";
|
||||
|
||||
/** Node label store implementation class */
|
||||
public static final String FS_NODE_LABELS_STORE_IMPL_CLASS = NODE_LABELS_PREFIX
|
||||
+ "fs-store.impl.class";
|
||||
public static final String DEFAULT_FS_NODE_LABELS_STORE_IMPL_CLASS =
|
||||
"org.apache.hadoop.yarn.nodelabels.FileSystemNodeLabelsStore";
|
||||
|
||||
/** URI for NodeLabelManager */
|
||||
public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX
|
||||
+ "fs-store.root-dir";
|
||||
|
@ -48,6 +48,8 @@ public void initializeMemberVariables() {
|
||||
errorIfMissingXmlProps = true;
|
||||
|
||||
// Specific properties to skip
|
||||
configurationPropsToSkipCompare
|
||||
.add(YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_IMPL_CLASS);
|
||||
configurationPropsToSkipCompare
|
||||
.add(YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS);
|
||||
configurationPropsToSkipCompare
|
||||
|
@ -42,6 +42,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
@ -224,10 +225,20 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
labelCollections.put(NO_LABEL, new RMNodeLabel(NO_LABEL));
|
||||
}
|
||||
|
||||
boolean isCentralizedConfiguration() {
|
||||
return isCentralizedNodeLabelConfiguration;
|
||||
}
|
||||
|
||||
protected void initNodeLabelStore(Configuration conf) throws Exception {
|
||||
this.store = new FileSystemNodeLabelsStore(this);
|
||||
this.store =
|
||||
ReflectionUtils
|
||||
.newInstance(
|
||||
conf.getClass(YarnConfiguration.FS_NODE_LABELS_STORE_IMPL_CLASS,
|
||||
FileSystemNodeLabelsStore.class, NodeLabelsStore.class),
|
||||
conf);
|
||||
this.store.setNodeLabelsManager(this);
|
||||
this.store.init(conf);
|
||||
this.store.recover(!isCentralizedNodeLabelConfiguration);
|
||||
this.store.recover();
|
||||
}
|
||||
|
||||
// for UT purpose
|
||||
|
@ -52,11 +52,6 @@
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
public class FileSystemNodeLabelsStore extends NodeLabelsStore {
|
||||
|
||||
public FileSystemNodeLabelsStore(CommonNodeLabelsManager mgr) {
|
||||
super(mgr);
|
||||
}
|
||||
|
||||
protected static final Log LOG = LogFactory.getLog(FileSystemNodeLabelsStore.class);
|
||||
|
||||
protected static final String DEFAULT_DIR_NAME = "node-labels";
|
||||
@ -69,8 +64,8 @@ protected enum SerializedLogType {
|
||||
|
||||
Path fsWorkingPath;
|
||||
FileSystem fs;
|
||||
FSDataOutputStream editlogOs;
|
||||
Path editLogPath;
|
||||
private FSDataOutputStream editlogOs;
|
||||
private Path editLogPath;
|
||||
|
||||
private String getDefaultFSNodeLabelsRootDir() throws IOException {
|
||||
// default is in local: /tmp/hadoop-yarn-${user}/node-labels/
|
||||
@ -161,11 +156,39 @@ public void removeClusterNodeLabels(Collection<String> labels)
|
||||
}
|
||||
}
|
||||
|
||||
protected void loadFromMirror(Path newMirrorPath, Path oldMirrorPath)
|
||||
throws IOException {
|
||||
// If mirror.new exists, read from mirror.new,
|
||||
FSDataInputStream is = null;
|
||||
if (fs.exists(newMirrorPath)) {
|
||||
is = fs.open(newMirrorPath);
|
||||
} else if (fs.exists(oldMirrorPath)) {
|
||||
is = fs.open(oldMirrorPath);
|
||||
}
|
||||
|
||||
if (null != is) {
|
||||
List<NodeLabel> labels = new AddToClusterNodeLabelsRequestPBImpl(
|
||||
AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is))
|
||||
.getNodeLabels();
|
||||
mgr.addToCluserNodeLabels(labels);
|
||||
|
||||
if (mgr.isCentralizedConfiguration()) {
|
||||
// Only load node to labels mapping while using centralized configuration
|
||||
Map<NodeId, Set<String>> nodeToLabels =
|
||||
new ReplaceLabelsOnNodeRequestPBImpl(
|
||||
ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
|
||||
.getNodeToLabels();
|
||||
mgr.replaceLabelsOnNode(nodeToLabels);
|
||||
}
|
||||
is.close();
|
||||
}
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.hadoop.yarn.nodelabels.NodeLabelsStore#recover(boolean)
|
||||
*/
|
||||
@Override
|
||||
public void recover(boolean ignoreNodeToLabelsMappings) throws YarnException,
|
||||
public void recover() throws YarnException,
|
||||
IOException {
|
||||
/*
|
||||
* Steps of recover
|
||||
@ -182,30 +205,12 @@ public void recover(boolean ignoreNodeToLabelsMappings) throws YarnException,
|
||||
Path mirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME);
|
||||
Path oldMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".old");
|
||||
|
||||
FSDataInputStream is = null;
|
||||
if (fs.exists(mirrorPath)) {
|
||||
is = fs.open(mirrorPath);
|
||||
} else if (fs.exists(oldMirrorPath)) {
|
||||
is = fs.open(oldMirrorPath);
|
||||
}
|
||||
|
||||
if (null != is) {
|
||||
List<NodeLabel> labels =
|
||||
new AddToClusterNodeLabelsRequestPBImpl(
|
||||
AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)).getNodeLabels();
|
||||
Map<NodeId, Set<String>> nodeToLabels =
|
||||
new ReplaceLabelsOnNodeRequestPBImpl(
|
||||
ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
|
||||
.getNodeToLabels();
|
||||
mgr.addToCluserNodeLabels(labels);
|
||||
mgr.replaceLabelsOnNode(nodeToLabels);
|
||||
is.close();
|
||||
}
|
||||
loadFromMirror(mirrorPath, oldMirrorPath);
|
||||
|
||||
// Open and process editlog
|
||||
editLogPath = new Path(fsWorkingPath, EDITLOG_FILENAME);
|
||||
if (fs.exists(editLogPath)) {
|
||||
is = fs.open(editLogPath);
|
||||
FSDataInputStream is = fs.open(editLogPath);
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
@ -233,7 +238,7 @@ public void recover(boolean ignoreNodeToLabelsMappings) throws YarnException,
|
||||
new ReplaceLabelsOnNodeRequestPBImpl(
|
||||
ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
|
||||
.getNodeToLabels();
|
||||
if (!ignoreNodeToLabelsMappings) {
|
||||
if (mgr.isCentralizedConfiguration()) {
|
||||
/*
|
||||
* In case of Distributed NodeLabels setup,
|
||||
* ignoreNodeToLabelsMappings will be set to true and recover will
|
||||
|
@ -31,11 +31,7 @@
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
public abstract class NodeLabelsStore implements Closeable {
|
||||
protected final CommonNodeLabelsManager mgr;
|
||||
|
||||
public NodeLabelsStore(CommonNodeLabelsManager mgr) {
|
||||
this.mgr = mgr;
|
||||
}
|
||||
protected CommonNodeLabelsManager mgr;
|
||||
|
||||
/**
|
||||
* Store node {@literal ->} label
|
||||
@ -62,16 +58,14 @@ public abstract void removeClusterNodeLabels(Collection<String> labels)
|
||||
* ignoreNodeToLabelsMappings will be set to true and recover will be invoked
|
||||
* as RM will collect the node labels from NM through registration/HB
|
||||
*
|
||||
* @param ignoreNodeToLabelsMappings
|
||||
* @throws IOException
|
||||
* @throws YarnException
|
||||
*/
|
||||
public abstract void recover(boolean ignoreNodeToLabelsMappings)
|
||||
throws IOException, YarnException;
|
||||
public abstract void recover() throws IOException, YarnException;
|
||||
|
||||
public void init(Configuration conf) throws Exception {}
|
||||
|
||||
public CommonNodeLabelsManager getNodeLabelsManager() {
|
||||
return mgr;
|
||||
public void setNodeLabelsManager(CommonNodeLabelsManager mgr) {
|
||||
this.mgr = mgr;
|
||||
}
|
||||
}
|
||||
|
@ -2451,4 +2451,12 @@
|
||||
<name>yarn.am.blacklisting.disable-failure-threshold</name>
|
||||
<value>0.8f</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Choose different implementation of node label's storage
|
||||
</description>
|
||||
<name>yarn.node-labels.fs-store.impl.class</name>
|
||||
<value>org.apache.hadoop.yarn.nodelabels.FileSystemNodeLabelsStore</value>
|
||||
</property>
|
||||
</configuration>
|
||||
|
@ -36,10 +36,10 @@ public class DummyCommonNodeLabelsManager extends CommonNodeLabelsManager {
|
||||
|
||||
@Override
|
||||
public void initNodeLabelStore(Configuration conf) {
|
||||
this.store = new NodeLabelsStore(this) {
|
||||
this.store = new NodeLabelsStore() {
|
||||
|
||||
@Override
|
||||
public void recover(boolean ignoreNodeToLabelsMappings)
|
||||
public void recover()
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@ -65,6 +65,8 @@ public void close() throws IOException {
|
||||
// do nothing
|
||||
}
|
||||
};
|
||||
|
||||
this.store.setNodeLabelsManager(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -33,13 +34,17 @@
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase {
|
||||
MockNodeLabelManager mgr = null;
|
||||
Configuration conf = null;
|
||||
String storeClassName = null;
|
||||
|
||||
private static class MockNodeLabelManager extends
|
||||
CommonNodeLabelsManager {
|
||||
@ -59,8 +64,15 @@ protected void stopDispatcher() {
|
||||
}
|
||||
}
|
||||
|
||||
private FileSystemNodeLabelsStore getStore() {
|
||||
return (FileSystemNodeLabelsStore) mgr.store;
|
||||
public TestFileSystemNodeLabelsStore(String className) {
|
||||
this.storeClassName = className;
|
||||
}
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<String[]> getParameters() {
|
||||
return Arrays.asList(
|
||||
new String[][] { { FileSystemNodeLabelsStore.class.getCanonicalName() },
|
||||
{ NonAppendableFSNodeLabelStore.class.getCanonicalName() } });
|
||||
}
|
||||
|
||||
@Before
|
||||
@ -68,6 +80,7 @@ public void before() throws IOException {
|
||||
mgr = new MockNodeLabelManager();
|
||||
conf = new Configuration();
|
||||
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_IMPL_CLASS, storeClassName);
|
||||
File tempDir = File.createTempFile("nlb", ".tmp");
|
||||
tempDir.delete();
|
||||
tempDir.mkdirs();
|
||||
@ -80,7 +93,11 @@ public void before() throws IOException {
|
||||
|
||||
@After
|
||||
public void after() throws IOException {
|
||||
getStore().fs.delete(getStore().fsWorkingPath, true);
|
||||
if (mgr.store instanceof FileSystemNodeLabelsStore) {
|
||||
FileSystemNodeLabelsStore fsStore =
|
||||
((FileSystemNodeLabelsStore) mgr.store);
|
||||
fsStore.fs.delete(fsStore.fsWorkingPath, true);
|
||||
}
|
||||
mgr.stop();
|
||||
}
|
||||
|
||||
@ -324,11 +341,12 @@ public void testSerilizationAfterRecovery() throws Exception {
|
||||
@Test
|
||||
public void testRootMkdirOnInitStore() throws Exception {
|
||||
final FileSystem mockFs = Mockito.mock(FileSystem.class);
|
||||
FileSystemNodeLabelsStore mockStore = new FileSystemNodeLabelsStore(mgr) {
|
||||
FileSystemNodeLabelsStore mockStore = new FileSystemNodeLabelsStore() {
|
||||
void setFileSystem(Configuration conf) throws IOException {
|
||||
fs = mockFs;
|
||||
}
|
||||
};
|
||||
mockStore.setNodeLabelsManager(mgr);
|
||||
mockStore.fs = mockFs;
|
||||
verifyMkdirsCount(mockStore, true, 0);
|
||||
verifyMkdirsCount(mockStore, false, 1);
|
||||
|
@ -37,10 +37,10 @@ public class NullRMNodeLabelsManager extends RMNodeLabelsManager {
|
||||
|
||||
@Override
|
||||
public void initNodeLabelStore(Configuration conf) {
|
||||
this.store = new NodeLabelsStore(this) {
|
||||
this.store = new NodeLabelsStore() {
|
||||
|
||||
@Override
|
||||
public void recover(boolean ignoreNodeToLabelsMappings)
|
||||
public void recover()
|
||||
throws IOException {
|
||||
// do nothing
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user