diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5ca99983c0..edad4d23e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3480,6 +3480,22 @@ public static boolean isAclEnabled(Configuration conf) { public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX + "fs-store.root-dir"; + /** + * Node-attribute configurations. + */ + public static final String NODE_ATTRIBUTE_PREFIX = + YARN_PREFIX + "node-attribute."; + /** + * Node attribute store implementation class. + */ + public static final String FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS = + NODE_ATTRIBUTE_PREFIX + "fs-store.impl.class"; + /** + * File system not attribute store directory. + */ + public static final String FS_NODE_ATTRIBUTE_STORE_ROOT_DIR = + NODE_ATTRIBUTE_PREFIX + "fs-store.root-dir"; + /** * Flag to indicate if the node labels feature enabled, by default it's * disabled diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributeStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributeStore.java new file mode 100644 index 0000000000..8e9f9ff9f0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributeStore.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Interface class for Node label store. + */ +public interface NodeAttributeStore extends Closeable { + + /** + * Replace labels on node. + * + * @param nodeToAttribute node to attribute list. + * @throws IOException + */ + void replaceNodeAttributes(List nodeToAttribute) + throws IOException; + + /** + * Add attribute to node. + * + * @param nodeToAttribute node to attribute list. + * @throws IOException + */ + void addNodeAttributes(List nodeToAttribute) + throws IOException; + + /** + * Remove attribute from node. + * + * @param nodeToAttribute node to attribute list. + * @throws IOException + */ + void removeNodeAttributes(List nodeToAttribute) + throws IOException; + + /** + * Initialize based on configuration and NodeAttributesManager. + * + * @param configuration configuration instance. + * @param mgr nodeattributemanager instance. + * @throws Exception + */ + void init(Configuration configuration, NodeAttributesManager mgr) + throws Exception; + + /** + * Recover store on resourcemanager startup. + * @throws IOException + * @throws YarnException + */ + void recover() throws IOException, YarnException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java index ffa33cfe96..ec7d30d4f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java @@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.nodelabels; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; /** * This class captures all interactions for Attributes with RM. @@ -101,6 +103,15 @@ public abstract Set getClusterNodeAttributes( public abstract Map getAttributesForNode( String hostName); + /** + * Get All node to Attributes list based on filter. + * + * @return List nodeToAttributes matching filter.If empty + * or null is passed as argument will return all. + */ + public abstract List getNodeToAttributes( + Set prefix); + // futuristic // public set getNodesMatchingExpression(String nodeLabelExp); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeAttribute.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeAttribute.java index 5a709c629b..3b2bd16268 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeAttribute.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/RMNodeAttribute.java @@ -53,12 +53,6 @@ public void setAttribute(NodeAttribute attribute) { this.attribute = attribute; } - public RMNodeAttribute(String attributeName) { - super(attributeName); - attribute = NodeAttribute.newInstance(attributeName, - NodeAttributeType.STRING, CommonNodeLabelsManager.NO_LABEL); - } - public NodeAttributeType getAttributeType() { return attribute.getAttributeType(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java index a47cacf784..216fc79ecb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java @@ -64,7 +64,7 @@ protected void initStore(Configuration conf, Path fsStorePath, initFileSystem(conf); // mkdir of root dir path fs.mkdirs(fsWorkingPath); - + LOG.info("Created store directory :" + fsWorkingPath); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java index 0f7f53dbb4..a626537aa8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java @@ -17,13 +17,17 @@ */ package org.apache.hadoop.yarn.nodelabels.store; -import static org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler - .StoreType.NODE_LABEL_STORE; +import static org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType.NODE_ATTRIBUTE; +import static org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType.NODE_LABEL_STORE; import org.apache.hadoop.yarn.nodelabels.store.op.AddClusterLabelOp; +import org.apache.hadoop.yarn.nodelabels.store.op.AddNodeToAttributeLogOp; import org.apache.hadoop.yarn.nodelabels.store.op.FSNodeStoreLogOp; +import org.apache.hadoop.yarn.nodelabels.store.op.NodeAttributeMirrorOp; import org.apache.hadoop.yarn.nodelabels.store.op.NodeLabelMirrorOp; import org.apache.hadoop.yarn.nodelabels.store.op.NodeToLabelOp; import org.apache.hadoop.yarn.nodelabels.store.op.RemoveClusterLabelOp; +import org.apache.hadoop.yarn.nodelabels.store.op.RemoveNodeToAttributeLogOp; +import org.apache.hadoop.yarn.nodelabels.store.op.ReplaceNodeToAttributeLogOp; import java.util.HashMap; import java.util.Map; @@ -39,7 +43,7 @@ public class FSStoreOpHandler { public enum StoreType { NODE_LABEL_STORE, - NODE_LABEL_ATTRIBUTE; + NODE_ATTRIBUTE } static { @@ -47,13 +51,24 @@ public enum StoreType { mirrorOp = new HashMap<>(); // registerLog edit log operation + + //Node Label Operations registerLog(NODE_LABEL_STORE, AddClusterLabelOp.OPCODE, AddClusterLabelOp.class); registerLog(NODE_LABEL_STORE, NodeToLabelOp.OPCODE, NodeToLabelOp.class); registerLog(NODE_LABEL_STORE, RemoveClusterLabelOp.OPCODE, RemoveClusterLabelOp.class); + //NodeAttibute operation + registerLog(NODE_ATTRIBUTE, AddNodeToAttributeLogOp.OPCODE, AddNodeToAttributeLogOp.class); + registerLog(NODE_ATTRIBUTE, RemoveNodeToAttributeLogOp.OPCODE, RemoveNodeToAttributeLogOp.class); + registerLog(NODE_ATTRIBUTE, ReplaceNodeToAttributeLogOp.OPCODE, ReplaceNodeToAttributeLogOp.class); + // registerLog Mirror op + // Node label mirror operation registerMirror(NODE_LABEL_STORE, NodeLabelMirrorOp.class); + //Node attribute mirror operation + registerMirror(NODE_ATTRIBUTE, NodeAttributeMirrorOp.class); + } private static void registerMirror(StoreType type, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/AddNodeToAttributeLogOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/AddNodeToAttributeLogOp.java new file mode 100644 index 0000000000..4b92bcf9cc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/AddNodeToAttributeLogOp.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.nodelabels.store.op; + +import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; +import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +/** + * File system Add Node to attribute mapping. + */ +public class AddNodeToAttributeLogOp + extends FSNodeStoreLogOp { + + private List attributes; + + public static final int OPCODE = 0; + + @Override + public void write(OutputStream os, NodeAttributesManager mgr) + throws IOException { + ((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest + .newInstance(AttributeMappingOperationType.ADD, attributes, false)) + .getProto().writeDelimitedTo(os); + } + + @Override + public void recover(InputStream is, NodeAttributesManager mgr) + throws IOException { + NodesToAttributesMappingRequest request = + new NodesToAttributesMappingRequestPBImpl( + YarnServerResourceManagerServiceProtos + .NodesToAttributesMappingRequestProto + .parseDelimitedFrom(is)); + mgr.addNodeAttributes(getNodeToAttributesMap(request)); + } + + public AddNodeToAttributeLogOp setAttributes( + List attributesList) { + this.attributes = attributesList; + return this; + } + + @Override + public int getOpCode() { + return OPCODE; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java index cd739c025a..bf4d1b9196 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java @@ -17,10 +17,18 @@ */ package org.apache.hadoop.yarn.nodelabels.store.op; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.nodelabels.store.StoreOp; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest; import java.io.InputStream; import java.io.OutputStream; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * Defines all FileSystem editlog operation. All node label and attribute @@ -32,4 +40,13 @@ public abstract class FSNodeStoreLogOp implements StoreOp { public abstract int getOpCode(); + + protected Map> getNodeToAttributesMap( + NodesToAttributesMappingRequest request) { + List attributes = request.getNodesToAttributes(); + Map> nodeToAttrMap = new HashMap<>(); + attributes.forEach((v) -> nodeToAttrMap + .put(v.getNode(), new HashSet<>(v.getNodeAttributes()))); + return nodeToAttrMap; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeAttributeMirrorOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeAttributeMirrorOp.java new file mode 100644 index 0000000000..dca0555abc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeAttributeMirrorOp.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.nodelabels.store.op; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; +import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * File System Node Attribute Mirror read and write operation. + */ +public class NodeAttributeMirrorOp + extends FSNodeStoreLogOp { + + @Override + public void write(OutputStream os, NodeAttributesManager mgr) + throws IOException { + ((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest + .newInstance(AttributeMappingOperationType.REPLACE, + mgr.getNodeToAttributes( + ImmutableSet.of(NodeAttribute.PREFIX_CENTRALIZED)), false)) + .getProto().writeDelimitedTo(os); + } + + @Override + public void recover(InputStream is, NodeAttributesManager mgr) + throws IOException { + NodesToAttributesMappingRequest request = + new NodesToAttributesMappingRequestPBImpl( + YarnServerResourceManagerServiceProtos + .NodesToAttributesMappingRequestProto + .parseDelimitedFrom(is)); + mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED, + getNodeToAttributesMap(request)); + } + + @Override + public int getOpCode() { + return -1; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/RemoveNodeToAttributeLogOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/RemoveNodeToAttributeLogOp.java new file mode 100644 index 0000000000..1d13077418 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/RemoveNodeToAttributeLogOp.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.nodelabels.store.op; + +import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; +import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +/** + * File system remove node attribute from node operation. + */ +public class RemoveNodeToAttributeLogOp + extends FSNodeStoreLogOp { + + private List attributes; + + public static final int OPCODE = 1; + + @Override + public void write(OutputStream os, NodeAttributesManager mgr) + throws IOException { + ((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest + .newInstance(AttributeMappingOperationType.REMOVE, attributes, false)) + .getProto().writeDelimitedTo(os); + } + + @Override + public void recover(InputStream is, NodeAttributesManager mgr) + throws IOException { + NodesToAttributesMappingRequest request = + new NodesToAttributesMappingRequestPBImpl( + YarnServerResourceManagerServiceProtos + .NodesToAttributesMappingRequestProto + .parseDelimitedFrom(is)); + mgr.removeNodeAttributes(getNodeToAttributesMap(request)); + } + + public RemoveNodeToAttributeLogOp setAttributes( + List attrs) { + this.attributes = attrs; + return this; + } + + @Override + public int getOpCode() { + return OPCODE; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/ReplaceNodeToAttributeLogOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/ReplaceNodeToAttributeLogOp.java new file mode 100644 index 0000000000..54d7651c67 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/ReplaceNodeToAttributeLogOp.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.nodelabels.store.op; + +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; +import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +/** + * File system replace node attribute from node operation. + */ +public class ReplaceNodeToAttributeLogOp + extends FSNodeStoreLogOp { + + private List attributes; + public static final int OPCODE = 2; + + @Override + public void write(OutputStream os, NodeAttributesManager mgr) + throws IOException { + ((NodesToAttributesMappingRequestPBImpl) NodesToAttributesMappingRequest + .newInstance(AttributeMappingOperationType.REPLACE, attributes, false)) + .getProto().writeDelimitedTo(os); + } + + @Override + public void recover(InputStream is, NodeAttributesManager mgr) + throws IOException { + NodesToAttributesMappingRequest request = + new NodesToAttributesMappingRequestPBImpl( + YarnServerResourceManagerServiceProtos + .NodesToAttributesMappingRequestProto + .parseDelimitedFrom(is)); + //Only CENTRALIZED is stored to FS system + mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED, + getNodeToAttributesMap(request)); + } + + public ReplaceNodeToAttributeLogOp setAttributes( + List attrs) { + this.attributes = attrs; + return this; + } + + @Override + public int getOpCode() { + return OPCODE; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/package-info.java new file mode 100644 index 0000000000..f6fb3d3eca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +package org.apache.hadoop.yarn.nodelabels.store.op; +import org.apache.hadoop.classification.InterfaceAudience; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index cdc3c09330..0700902e9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3965,4 +3965,20 @@ yarn.nodemanager.elastic-memory-control.timeout-sec 5 + + + URI for NodeAttributeManager. The default value is + /tmp/hadoop-yarn-${user}/node-attribute/ in the local filesystem. + + yarn.node-attribute.fs-store.root-dir + + + + + + Choose different implementation of node attribute's storage + + yarn.node-attribute.fs-store.impl.class + org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/FileSystemNodeAttributeStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/FileSystemNodeAttributeStore.java new file mode 100644 index 0000000000..01df25075d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/FileSystemNodeAttributeStore.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore; +import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.nodelabels.store.AbstractFSNodeStore; +import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler; +import org.apache.hadoop.yarn.nodelabels.store.op.AddNodeToAttributeLogOp; +import org.apache.hadoop.yarn.nodelabels.store.op.RemoveNodeToAttributeLogOp; +import org.apache.hadoop.yarn.nodelabels.store.op.ReplaceNodeToAttributeLogOp; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; + +import java.io.IOException; +import java.util.List; + +/** + * File system node attribute implementation. + */ +public class FileSystemNodeAttributeStore + extends AbstractFSNodeStore + implements NodeAttributeStore { + + protected static final Log LOG = + LogFactory.getLog(FileSystemNodeAttributeStore.class); + + protected static final String DEFAULT_DIR_NAME = "node-attribute"; + protected static final String MIRROR_FILENAME = "nodeattribute.mirror"; + protected static final String EDITLOG_FILENAME = "nodeattribute.editlog"; + + public FileSystemNodeAttributeStore() { + super(FSStoreOpHandler.StoreType.NODE_ATTRIBUTE); + } + + private String getDefaultFSNodeAttributeRootDir() throws IOException { + // default is in local: /tmp/hadoop-yarn-${user}/node-attribute/ + return "file:///tmp/hadoop-yarn-" + UserGroupInformation.getCurrentUser() + .getShortUserName() + "/" + DEFAULT_DIR_NAME; + } + + @Override + public void init(Configuration conf, NodeAttributesManager mgr) + throws Exception { + StoreSchema schema = new StoreSchema(EDITLOG_FILENAME, MIRROR_FILENAME); + initStore(conf, new Path( + conf.get(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, + getDefaultFSNodeAttributeRootDir())), schema, mgr); + } + + @Override + public void replaceNodeAttributes(List nodeToAttribute) + throws IOException { + ReplaceNodeToAttributeLogOp op = new ReplaceNodeToAttributeLogOp(); + writeToLog(op.setAttributes(nodeToAttribute)); + } + + @Override + public void addNodeAttributes(List nodeAttributeMapping) + throws IOException { + AddNodeToAttributeLogOp op = new AddNodeToAttributeLogOp(); + writeToLog(op.setAttributes(nodeAttributeMapping)); + } + + @Override + public void removeNodeAttributes(List nodeAttributeMapping) + throws IOException { + RemoveNodeToAttributeLogOp op = new RemoveNodeToAttributeLogOp(); + writeToLog(op.setAttributes(nodeAttributeMapping)); + } + + @Override + public void recover() throws IOException, YarnException { + super.recoverFromStore(); + } + + @Override + public void close() throws IOException { + super.closeFSStore(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java index 04d74a87f5..b4686e638a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java @@ -32,24 +32,31 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.ArrayList; +import java.util.List; import com.google.common.base.Strings; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.nodelabels.AttributeValue; +import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil; import org.apache.hadoop.yarn.nodelabels.RMNodeAttribute; import org.apache.hadoop.yarn.nodelabels.StringAttributeValue; import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; /** * Manager holding the attributes to Labels. @@ -63,7 +70,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { */ public static final String EMPTY_ATTRIBUTE_VALUE = ""; - private Dispatcher dispatcher; + Dispatcher dispatcher; + NodeAttributeStore store; // TODO may be we can have a better collection here. // this will be updated to get the attributeName to NM mapping @@ -121,7 +129,21 @@ protected void serviceStart() throws Exception { } protected void initNodeAttributeStore(Configuration conf) throws Exception { - // TODO to generalize and make use of the FileSystemNodeLabelsStore + this.store =getAttributeStoreClass(conf); + this.store.init(conf, this); + this.store.recover(); + } + + private NodeAttributeStore getAttributeStoreClass(Configuration conf) { + try { + return ReflectionUtils.newInstance( + conf.getClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, + FileSystemNodeAttributeStore.class, NodeAttributeStore.class), + conf); + } catch (Exception e) { + throw new YarnRuntimeException( + "Could not instantiate Node Attribute Store ", e); + } } private void internalUpdateAttributesOnNodes( @@ -174,7 +196,8 @@ private void internalUpdateAttributesOnNodes( LOG.info(logMsg); - if (null != dispatcher) { + if (null != dispatcher && NodeAttribute.PREFIX_CENTRALIZED + .equals(attributePrefix)) { dispatcher.getEventHandler() .handle(new NodeAttributesStoreEvent(nodeAttributeMapping, op)); } @@ -382,6 +405,32 @@ public Map getAttributesForNode( } } + @Override + public List getNodeToAttributes(Set prefix) { + try { + readLock.lock(); + List nodeToAttributes = new ArrayList<>(); + nodeCollections.forEach((k, v) -> { + List attrs; + if (prefix == null || prefix.isEmpty()) { + attrs = new ArrayList<>(v.getAttributes().keySet()); + } else { + attrs = new ArrayList<>(); + for (Entry nodeAttr : v.attributes + .entrySet()) { + if (prefix.contains(nodeAttr.getKey().getAttributePrefix())) { + attrs.add(nodeAttr.getKey()); + } + } + } + nodeToAttributes.add(NodeToAttributes.newInstance(k, attrs)); + }); + return nodeToAttributes; + } finally { + readLock.unlock(); + } + } + public void activateNode(NodeId nodeId, Resource resource) { try { writeLock.lock(); @@ -524,7 +573,29 @@ public void handle(NodeAttributesStoreEvent event) { // Dispatcher related code protected void handleStoreEvent(NodeAttributesStoreEvent event) { - // TODO Need to extend the File + List mappingList = new ArrayList<>(); + Map> nodeToAttr = + event.getNodeAttributeMappingList(); + nodeToAttr.forEach((k, v) -> mappingList + .add(NodeToAttributes.newInstance(k, new ArrayList<>(v.keySet())))); + try { + switch (event.getOperation()) { + case REPLACE: + store.replaceNodeAttributes(mappingList); + break; + case ADD: + store.addNodeAttributes(mappingList); + break; + case REMOVE: + store.removeNodeAttributes(mappingList); + break; + default: + LOG.warn("Unsupported operation"); + } + } catch (IOException e) { + LOG.error("Failed to store attribute modification to storage"); + throw new YarnRuntimeException(e); + } } @Override @@ -549,7 +620,8 @@ public void removeNodeAttributes( private void processMapping( Map> nodeAttributeMapping, AttributeMappingOperationType mappingType) throws IOException { - processMapping(nodeAttributeMapping, mappingType, null); + processMapping(nodeAttributeMapping, mappingType, + NodeAttribute.PREFIX_CENTRALIZED); } private void processMapping( @@ -564,4 +636,22 @@ private void processMapping( internalUpdateAttributesOnNodes(validMapping, mappingType, newAttributesToBeAdded, attributePrefix); } + + protected void stopDispatcher() { + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + if (null != asyncDispatcher) { + asyncDispatcher.stop(); + } + } + + @Override + protected void serviceStop() throws Exception { + // finalize store + stopDispatcher(); + + // only close store when we enabled store persistent + if (null != store) { + store.close(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index a29e8a2bd0..adb7fe0802 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -828,6 +830,14 @@ public void testNodeHeartbeatWithNodeAttributes() throws Exception { Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath()); + conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, + FileSystemNodeAttributeStore.class, NodeAttributeStore.class); + File tempDir = File.createTempFile("nattr", ".tmp"); + tempDir.delete(); + tempDir.mkdirs(); + tempDir.deleteOnExit(); + conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, + tempDir.getAbsolutePath()); rm = new MockRM(conf); rm.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestFileSystemNodeAttributeStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestFileSystemNodeAttributeStore.java new file mode 100644 index 0000000000..e2ee8b4763 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestFileSystemNodeAttributeStore.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.nodelabels.AttributeValue; +import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class TestFileSystemNodeAttributeStore { + + private MockNodeAttrbuteManager mgr = null; + private Configuration conf = null; + + private static class MockNodeAttrbuteManager + extends NodeAttributesManagerImpl { + @Override + protected void initDispatcher(Configuration conf) { + super.dispatcher = new InlineDispatcher(); + } + + @Override + protected void startDispatcher() { + //Do nothing + } + + @Override + protected void stopDispatcher() { + //Do nothing + } + } + + @Before + public void before() throws IOException { + mgr = new MockNodeAttrbuteManager(); + conf = new Configuration(); + conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, + FileSystemNodeAttributeStore.class, NodeAttributeStore.class); + File tempDir = File.createTempFile("nattr", ".tmp"); + tempDir.delete(); + tempDir.mkdirs(); + tempDir.deleteOnExit(); + conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, + tempDir.getAbsolutePath()); + mgr.init(conf); + mgr.start(); + } + + @After + public void after() throws IOException { + FileSystemNodeAttributeStore fsStore = + ((FileSystemNodeAttributeStore) mgr.store); + fsStore.getFs().delete(fsStore.getFsWorkingPath(), true); + mgr.stop(); + } + + @Test(timeout = 10000) + public void testRecoverWithMirror() throws Exception { + + //------host0---- + // add -GPU & FPGA + // remove -GPU + // replace -Docker + //------host1---- + // add--GPU + NodeAttribute docker = NodeAttribute + .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "DOCKER", + NodeAttributeType.STRING, "docker-0"); + NodeAttribute gpu = NodeAttribute + .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", + NodeAttributeType.STRING, "nvidia"); + NodeAttribute fpga = NodeAttribute + .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "FPGA", + NodeAttributeType.STRING, "asus"); + + Map> toAddAttributes = new HashMap<>(); + toAddAttributes.put("host0", ImmutableSet.of(gpu, fpga)); + toAddAttributes.put("host1", ImmutableSet.of(gpu)); + // Add node attribute + mgr.addNodeAttributes(toAddAttributes); + + Assert.assertEquals("host0 size", 2, + mgr.getAttributesForNode("host0").size()); + // Add test to remove + toAddAttributes.clear(); + toAddAttributes.put("host0", ImmutableSet.of(gpu)); + mgr.removeNodeAttributes(toAddAttributes); + + // replace nodeattribute + toAddAttributes.clear(); + toAddAttributes.put("host0", ImmutableSet.of(docker)); + mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED, + toAddAttributes); + Map attrs = + mgr.getAttributesForNode("host0"); + Assert.assertEquals(attrs.size(), 1); + Assert.assertEquals(attrs.keySet().toArray()[0], docker); + mgr.stop(); + + // Start new attribute manager with same path + mgr = new MockNodeAttrbuteManager(); + mgr.init(conf); + mgr.start(); + + mgr.getAttributesForNode("host0"); + Assert.assertEquals("host0 size", 1, + mgr.getAttributesForNode("host0").size()); + Assert.assertEquals("host1 size", 1, + mgr.getAttributesForNode("host1").size()); + attrs = mgr.getAttributesForNode("host0"); + Assert.assertEquals(attrs.size(), 1); + Assert.assertEquals(attrs.keySet().toArray()[0], docker); + //------host0---- + // current - docker + // replace - gpu + //----- host1---- + // current - gpu + // add - docker + toAddAttributes.clear(); + toAddAttributes.put("host0", ImmutableSet.of(gpu)); + mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED, + toAddAttributes); + + toAddAttributes.clear(); + toAddAttributes.put("host1", ImmutableSet.of(docker)); + mgr.addNodeAttributes(toAddAttributes); + // Recover from mirror and edit log + mgr.stop(); + + mgr = new MockNodeAttrbuteManager(); + mgr.init(conf); + mgr.start(); + Assert.assertEquals("host0 size", 1, + mgr.getAttributesForNode("host0").size()); + Assert.assertEquals("host1 size", 2, + mgr.getAttributesForNode("host1").size()); + attrs = mgr.getAttributesForNode("host0"); + Assert.assertEquals(attrs.size(), 1); + Assert.assertEquals(attrs.keySet().toArray()[0], gpu); + attrs = mgr.getAttributesForNode("host1"); + Assert.assertTrue(attrs.keySet().contains(docker)); + Assert.assertTrue(attrs.keySet().contains(gpu)); + } + + @Test(timeout = 10000) + public void testRecoverFromEditLog() throws Exception { + NodeAttribute docker = NodeAttribute + .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "DOCKER", + NodeAttributeType.STRING, "docker-0"); + NodeAttribute gpu = NodeAttribute + .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", + NodeAttributeType.STRING, "nvidia"); + NodeAttribute fpga = NodeAttribute + .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "FPGA", + NodeAttributeType.STRING, "asus"); + + Map> toAddAttributes = new HashMap<>(); + toAddAttributes.put("host0", ImmutableSet.of(gpu, fpga)); + toAddAttributes.put("host1", ImmutableSet.of(docker)); + + // Add node attribute + mgr.addNodeAttributes(toAddAttributes); + + Assert.assertEquals("host0 size", 2, + mgr.getAttributesForNode("host0").size()); + + // Increase editlog operation + for (int i = 0; i < 5; i++) { + // Add gpu host1 + toAddAttributes.clear(); + toAddAttributes.put("host0", ImmutableSet.of(gpu)); + mgr.removeNodeAttributes(toAddAttributes); + + // Add gpu host1 + toAddAttributes.clear(); + toAddAttributes.put("host1", ImmutableSet.of(docker)); + mgr.addNodeAttributes(toAddAttributes); + + // Remove GPU replace + toAddAttributes.clear(); + toAddAttributes.put("host0", ImmutableSet.of(gpu)); + mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED, + toAddAttributes); + + // Add fgpa host1 + toAddAttributes.clear(); + toAddAttributes.put("host1", ImmutableSet.of(gpu)); + mgr.addNodeAttributes(toAddAttributes); + } + mgr.stop(); + + // Start new attribute manager with same path + mgr = new MockNodeAttrbuteManager(); + mgr.init(conf); + mgr.start(); + + Assert.assertEquals("host0 size", 1, + mgr.getAttributesForNode("host0").size()); + Assert.assertEquals("host1 size", 2, + mgr.getAttributesForNode("host1").size()); + + toAddAttributes.clear(); + NodeAttribute replaced = + NodeAttribute.newInstance("GPU2", NodeAttributeType.STRING, "nvidia2"); + toAddAttributes.put("host0", ImmutableSet.of(replaced)); + mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED, + toAddAttributes); + mgr.stop(); + + mgr = new MockNodeAttrbuteManager(); + mgr.init(conf); + mgr.start(); + Map valueMap = + mgr.getAttributesForNode("host0"); + Map.Entry entry = + valueMap.entrySet().iterator().next(); + NodeAttribute attribute = entry.getKey(); + Assert.assertEquals("host0 size", 1, + mgr.getAttributesForNode("host0").size()); + Assert.assertEquals("host1 size", 2, + mgr.getAttributesForNode("host1").size()); + checkNodeAttributeEqual(replaced, attribute); + } + + public void checkNodeAttributeEqual(NodeAttribute atr1, NodeAttribute atr2) { + Assert.assertEquals(atr1.getAttributeType(), atr2.getAttributeType()); + Assert.assertEquals(atr1.getAttributeName(), atr2.getAttributeName()); + Assert.assertEquals(atr1.getAttributePrefix(), atr2.getAttributePrefix()); + Assert.assertEquals(atr1.getAttributeValue(), atr2.getAttributeValue()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java index 07968d41ef..b8c5bc9e30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java @@ -23,7 +23,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeAttributeType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.AttributeValue; +import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil; import org.junit.Test; @@ -31,6 +33,7 @@ import org.junit.After; import org.junit.Assert; +import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; @@ -49,9 +52,17 @@ public class TestNodeAttributesManager { new String[] {"host1", "host2", "host3"}; @Before - public void init() { + public void init() throws IOException { Configuration conf = new Configuration(); attributesManager = new NodeAttributesManagerImpl(); + conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, + FileSystemNodeAttributeStore.class, NodeAttributeStore.class); + File tempDir = File.createTempFile("nattr", ".tmp"); + tempDir.delete(); + tempDir.mkdirs(); + tempDir.deleteOnExit(); + conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, + tempDir.getAbsolutePath()); attributesManager.init(conf); attributesManager.start(); }