YARN-2705. Fixed bugs in ResourceManager node-label manager that were causing test-failures: added a dummy in-memory labels-manager. Contributed by Wangda Tan.
This commit is contained in:
parent
c3de2412eb
commit
e9c66e8fd2
@ -600,6 +600,10 @@ Release 2.6.0 - UNRELEASED
|
||||
YARN-2699. Fixed a bug in CommonNodeLabelsManager that caused tests to fail
|
||||
when using ephemeral ports on NodeIDs. (Wangda Tan via vinodkv)
|
||||
|
||||
YARN-2705. Fixed bugs in ResourceManager node-label manager that were causing
|
||||
test-failures: added a dummy in-memory labels-manager. (Wangda Tan via
|
||||
vinodkv)
|
||||
|
||||
BREAKDOWN OF YARN-1051 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
YARN-1707. Introduce APIs to add/remove/resize queues in the
|
||||
|
@ -1457,10 +1457,16 @@ public class YarnConfiguration extends Configuration {
|
||||
|
||||
public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels.";
|
||||
|
||||
/**
|
||||
* Class for RMNodeLabelsManager Please note this value should be consistent
|
||||
* in client nodes and RM node(s)
|
||||
*/
|
||||
public static final String RM_NODE_LABELS_MANAGER_CLASS = NODE_LABELS_PREFIX
|
||||
+ "manager-class";
|
||||
|
||||
/** URI for NodeLabelManager */
|
||||
public static final String FS_NODE_LABELS_STORE_URI = NODE_LABELS_PREFIX
|
||||
+ "fs-store.uri";
|
||||
public static final String DEFAULT_FS_NODE_LABELS_STORE_URI = "file:///tmp/";
|
||||
public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX
|
||||
+ "fs-store.root-dir";
|
||||
public static final String FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC =
|
||||
NODE_LABELS_PREFIX + "fs-store.retry-policy-spec";
|
||||
public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC =
|
||||
|
@ -220,9 +220,11 @@ protected void serviceStart() throws Exception {
|
||||
// service init, we don't want to trigger any event handling at that time.
|
||||
initDispatcher(getConfig());
|
||||
|
||||
dispatcher.register(NodeLabelsStoreEventType.class,
|
||||
new ForwardingEventHandler());
|
||||
|
||||
if (null != dispatcher) {
|
||||
dispatcher.register(NodeLabelsStoreEventType.class,
|
||||
new ForwardingEventHandler());
|
||||
}
|
||||
|
||||
startDispatcher();
|
||||
}
|
||||
|
||||
|
@ -32,6 +32,7 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
|
||||
@ -54,7 +55,7 @@ public FileSystemNodeLabelsStore(CommonNodeLabelsManager mgr) {
|
||||
|
||||
protected static final Log LOG = LogFactory.getLog(FileSystemNodeLabelsStore.class);
|
||||
|
||||
protected static final String ROOT_DIR_NAME = "FSNodeLabelManagerRoot";
|
||||
protected static final String DEFAULT_DIR_NAME = "node-labels";
|
||||
protected static final String MIRROR_FILENAME = "nodelabel.mirror";
|
||||
protected static final String EDITLOG_FILENAME = "nodelabel.editlog";
|
||||
|
||||
@ -63,22 +64,27 @@ protected enum SerializedLogType {
|
||||
}
|
||||
|
||||
Path fsWorkingPath;
|
||||
Path rootDirPath;
|
||||
FileSystem fs;
|
||||
FSDataOutputStream editlogOs;
|
||||
Path editLogPath;
|
||||
|
||||
private String getDefaultFSNodeLabelsRootDir() throws IOException {
|
||||
// default is in local: /tmp/hadoop-yarn-${user}/node-labels/
|
||||
return "file:///tmp/hadoop-yarn-"
|
||||
+ UserGroupInformation.getCurrentUser().getShortUserName() + "/"
|
||||
+ DEFAULT_DIR_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Configuration conf) throws Exception {
|
||||
fsWorkingPath =
|
||||
new Path(conf.get(YarnConfiguration.FS_NODE_LABELS_STORE_URI,
|
||||
YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_URI));
|
||||
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
|
||||
new Path(conf.get(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
|
||||
getDefaultFSNodeLabelsRootDir()));
|
||||
|
||||
setFileSystem(conf);
|
||||
|
||||
// mkdir of root dir path
|
||||
fs.mkdirs(rootDirPath);
|
||||
fs.mkdirs(fsWorkingPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -159,8 +165,8 @@ public void recover() throws IOException {
|
||||
*/
|
||||
|
||||
// Open mirror from serialized file
|
||||
Path mirrorPath = new Path(rootDirPath, MIRROR_FILENAME);
|
||||
Path oldMirrorPath = new Path(rootDirPath, MIRROR_FILENAME + ".old");
|
||||
Path mirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME);
|
||||
Path oldMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".old");
|
||||
|
||||
FSDataInputStream is = null;
|
||||
if (fs.exists(mirrorPath)) {
|
||||
@ -183,7 +189,7 @@ public void recover() throws IOException {
|
||||
}
|
||||
|
||||
// Open and process editlog
|
||||
editLogPath = new Path(rootDirPath, EDITLOG_FILENAME);
|
||||
editLogPath = new Path(fsWorkingPath, EDITLOG_FILENAME);
|
||||
if (fs.exists(editLogPath)) {
|
||||
is = fs.open(editLogPath);
|
||||
|
||||
@ -224,7 +230,7 @@ public void recover() throws IOException {
|
||||
}
|
||||
|
||||
// Serialize current mirror to mirror.writing
|
||||
Path writingMirrorPath = new Path(rootDirPath, MIRROR_FILENAME + ".writing");
|
||||
Path writingMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".writing");
|
||||
FSDataOutputStream os = fs.create(writingMirrorPath, true);
|
||||
((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequestPBImpl
|
||||
.newInstance(mgr.getClusterNodeLabels())).getProto().writeDelimitedTo(os);
|
||||
|
@ -67,7 +67,7 @@ public void before() throws IOException {
|
||||
tempDir.delete();
|
||||
tempDir.mkdirs();
|
||||
tempDir.deleteOnExit();
|
||||
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_URI,
|
||||
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
|
||||
tempDir.getAbsolutePath());
|
||||
mgr.init(conf);
|
||||
mgr.start();
|
||||
@ -75,7 +75,7 @@ public void before() throws IOException {
|
||||
|
||||
@After
|
||||
public void after() throws IOException {
|
||||
getStore().fs.delete(getStore().rootDirPath, true);
|
||||
getStore().fs.delete(getStore().fsWorkingPath, true);
|
||||
mgr.stop();
|
||||
}
|
||||
|
||||
|
@ -67,6 +67,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
@ -321,8 +322,12 @@ protected AMLivelinessMonitor createAMLivelinessMonitor() {
|
||||
return new AMLivelinessMonitor(this.rmDispatcher);
|
||||
}
|
||||
|
||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||
return new RMNodeLabelsManager();
|
||||
protected RMNodeLabelsManager createNodeLabelManager()
|
||||
throws InstantiationException, IllegalAccessException {
|
||||
Class<? extends RMNodeLabelsManager> nlmCls =
|
||||
conf.getClass(YarnConfiguration.RM_NODE_LABELS_MANAGER_CLASS,
|
||||
MemoryRMNodeLabelsManager.class, RMNodeLabelsManager.class);
|
||||
return nlmCls.newInstance();
|
||||
}
|
||||
|
||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||
|
@ -25,10 +25,9 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||
import org.apache.hadoop.yarn.nodelabels.NodeLabelsStore;
|
||||
|
||||
public class DummyRMNodeLabelsManager extends RMNodeLabelsManager {
|
||||
public class MemoryRMNodeLabelsManager extends RMNodeLabelsManager {
|
||||
Map<NodeId, Set<String>> lastNodeToLabels = null;
|
||||
Collection<String> lastAddedlabels = null;
|
||||
Collection<String> lastRemovedlabels = null;
|
||||
@ -68,7 +67,7 @@ public void close() throws IOException {
|
||||
|
||||
@Override
|
||||
protected void initDispatcher(Configuration conf) {
|
||||
super.dispatcher = new InlineDispatcher();
|
||||
super.dispatcher = null;
|
||||
}
|
||||
|
||||
@Override
|
@ -59,7 +59,7 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DummyRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
@ -115,7 +115,7 @@ public MockRM(Configuration conf, RMStateStore store) {
|
||||
|
||||
@Override
|
||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||
RMNodeLabelsManager mgr = new DummyRMNodeLabelsManager();
|
||||
RMNodeLabelsManager mgr = new MemoryRMNodeLabelsManager();
|
||||
mgr.init(getConfig());
|
||||
return mgr;
|
||||
}
|
||||
|
@ -42,11 +42,11 @@ public class TestRMNodeLabelsManager extends NodeLabelTestBase {
|
||||
private final Resource SMALL_RESOURCE = Resource.newInstance(100, 0);
|
||||
private final Resource LARGE_NODE = Resource.newInstance(1000, 0);
|
||||
|
||||
DummyRMNodeLabelsManager mgr = null;
|
||||
MemoryRMNodeLabelsManager mgr = null;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
mgr = new DummyRMNodeLabelsManager();
|
||||
mgr = new MemoryRMNodeLabelsManager();
|
||||
mgr.init(new Configuration());
|
||||
mgr.start();
|
||||
}
|
||||
|
@ -84,7 +84,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.Task;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DummyRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
|
||||
@ -154,7 +154,7 @@ public void setUp() throws Exception {
|
||||
resourceManager = new ResourceManager() {
|
||||
@Override
|
||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||
RMNodeLabelsManager mgr = new DummyRMNodeLabelsManager();
|
||||
RMNodeLabelsManager mgr = new MemoryRMNodeLabelsManager();
|
||||
mgr.init(getConfig());
|
||||
return mgr;
|
||||
}
|
||||
@ -1485,7 +1485,7 @@ public void testMoveAppViolateQueueState() throws Exception {
|
||||
resourceManager = new ResourceManager() {
|
||||
@Override
|
||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||
RMNodeLabelsManager mgr = new DummyRMNodeLabelsManager();
|
||||
RMNodeLabelsManager mgr = new MemoryRMNodeLabelsManager();
|
||||
mgr.init(getConfig());
|
||||
return mgr;
|
||||
}
|
||||
|
@ -45,7 +45,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DummyRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
@ -81,7 +81,7 @@ public void setUp() throws Exception {
|
||||
conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
mgr = new DummyRMNodeLabelsManager();
|
||||
mgr = new MemoryRMNodeLabelsManager();
|
||||
mgr.init(conf);
|
||||
}
|
||||
|
||||
@ -446,7 +446,7 @@ private Configuration getComplexConfigurationWithQueueLabels(
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testContainerAllocationWithSingleUserLimits() throws Exception {
|
||||
final RMNodeLabelsManager mgr = new DummyRMNodeLabelsManager();
|
||||
final RMNodeLabelsManager mgr = new MemoryRMNodeLabelsManager();
|
||||
mgr.init(conf);
|
||||
|
||||
// set node -> label
|
||||
|
@ -39,7 +39,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.MockAsm;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DummyRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
@ -179,7 +179,7 @@ public ConcurrentMap<NodeId, RMNode> getRMNodes() {
|
||||
return nodesMap;
|
||||
}
|
||||
};
|
||||
rmContext.setNodeLabelManager(new DummyRMNodeLabelsManager());
|
||||
rmContext.setNodeLabelManager(new MemoryRMNodeLabelsManager());
|
||||
return rmContext;
|
||||
}
|
||||
|
||||
@ -211,7 +211,7 @@ public static CapacityScheduler mockCapacityScheduler() throws IOException {
|
||||
null, new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM(), null);
|
||||
rmContext.setNodeLabelManager(new DummyRMNodeLabelsManager());
|
||||
rmContext.setNodeLabelManager(new MemoryRMNodeLabelsManager());
|
||||
cs.setRMContext(rmContext);
|
||||
cs.init(conf);
|
||||
return cs;
|
||||
|
Loading…
Reference in New Issue
Block a user