YARN-6585. RM fails to start when upgrading from 2.7 for clusters with node labels. Contributed by Sunil G.
This commit is contained in:
parent
325163f23f
commit
5578af8603
@ -111,6 +111,13 @@ private void initLocalNodeLabels() {
|
||||
for (NodeLabelProto r : attributesProtoList) {
|
||||
this.updatedNodeLabels.add(convertFromProtoFormat(r));
|
||||
}
|
||||
|
||||
if (this.updatedNodeLabels.isEmpty()) {
|
||||
List<String> deprecatedLabelsList = p.getDeprecatedNodeLabelsList();
|
||||
for (String l : deprecatedLabelsList) {
|
||||
this.updatedNodeLabels.add(NodeLabel.newInstance(l));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private NodeLabel convertFromProtoFormat(NodeLabelProto p) {
|
||||
|
@ -26,7 +26,9 @@
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -46,12 +48,17 @@
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.api.records.DecommissionType;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
@ -62,6 +69,7 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
@ -79,6 +87,9 @@
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestRMAdminService {
|
||||
@ -1463,4 +1474,52 @@ private void checkBadConfiguration(Configuration conf) {
|
||||
e.getMessage().startsWith(HAUtil.BAD_CONFIG_MESSAGE_PREFIX));
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testAdminAddToClusterNodeLabelsWithDeprecatedAPIs()
|
||||
throws Exception, YarnException {
|
||||
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
|
||||
|
||||
uploadDefaultConfiguration();
|
||||
|
||||
rm = new MockRM(configuration) {
|
||||
protected ClientRMService createClientRMService() {
|
||||
return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
|
||||
this.applicationACLsManager, this.queueACLsManager,
|
||||
this.getRMContext().getRMDelegationTokenSecretManager());
|
||||
};
|
||||
};
|
||||
rm.init(configuration);
|
||||
rm.start();
|
||||
|
||||
try {
|
||||
List<String> list = new ArrayList<String>();
|
||||
list.add("a");
|
||||
list.add("b");
|
||||
AddToClusterNodeLabelsRequestProto proto = AddToClusterNodeLabelsRequestProto
|
||||
.newBuilder().addAllDeprecatedNodeLabels(list).build();
|
||||
AddToClusterNodeLabelsRequestPBImpl protoImpl = new AddToClusterNodeLabelsRequestPBImpl(
|
||||
proto);
|
||||
rm.adminService
|
||||
.addToClusterNodeLabels((AddToClusterNodeLabelsRequest) protoImpl);
|
||||
} catch (Exception ex) {
|
||||
fail("Could not update node labels." + ex);
|
||||
}
|
||||
|
||||
// Create a client.
|
||||
Configuration conf = new Configuration();
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
|
||||
ApplicationClientProtocol client = (ApplicationClientProtocol) rpc
|
||||
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
|
||||
|
||||
// Get node labels collection
|
||||
GetClusterNodeLabelsResponse response = client
|
||||
.getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance());
|
||||
NodeLabel labelX = NodeLabel.newInstance("a");
|
||||
NodeLabel labelY = NodeLabel.newInstance("b");
|
||||
Assert.assertTrue(
|
||||
response.getNodeLabelList().containsAll(Arrays.asList(labelX, labelY)));
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
@ -704,4 +705,52 @@ public void testLabelsToNodesOnNodeActiveDeactive() throws Exception {
|
||||
assertLabelsToNodesEquals(
|
||||
mgr.getLabelsToNodes(), transposeNodeToLabels(mgr.getNodeLabels()));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testBackwardsCompatableMirror() throws Exception {
|
||||
lmgr = new RMNodeLabelsManager();
|
||||
Configuration conf = new Configuration();
|
||||
File tempDir = File.createTempFile("nlb", ".tmp");
|
||||
tempDir.delete();
|
||||
tempDir.mkdirs();
|
||||
tempDir.deleteOnExit();
|
||||
String tempDirName = tempDir.getAbsolutePath();
|
||||
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR, tempDirName);
|
||||
|
||||
// The following are the contents of a 2.7-formatted levelDB file to be
|
||||
// placed in nodelabel.mirror. There are 3 labels: 'a', 'b', and 'c'.
|
||||
// host1 is labeled with 'a', host2 is labeled with 'b', and c is not
|
||||
// associated with a node.
|
||||
byte[] contents =
|
||||
{
|
||||
0x09, 0x0A, 0x01, 0x61, 0x0A, 0x01, 0x62, 0x0A, 0x01, 0x63, 0x20,
|
||||
0x0A, 0x0E, 0x0A, 0x09, 0x0A, 0x05, 0x68, 0x6F, 0x73, 0x74, 0x32,
|
||||
0x10, 0x00, 0x12, 0x01, 0x62, 0x0A, 0x0E, 0x0A, 0x09, 0x0A, 0x05,
|
||||
0x68, 0x6F, 0x73, 0x74, 0x31, 0x10, 0x00, 0x12, 0x01, 0x61
|
||||
};
|
||||
File file = new File(tempDirName + "/nodelabel.mirror");
|
||||
file.createNewFile();
|
||||
FileOutputStream stream = new FileOutputStream(file);
|
||||
stream.write(contents);
|
||||
stream.close();
|
||||
|
||||
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_SCHEDULER,
|
||||
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
|
||||
Configuration withQueueLabels = getConfigurationWithQueueLabels(conf);
|
||||
|
||||
MockRM rm = initRM(withQueueLabels);
|
||||
Set<String> labelNames = lmgr.getClusterNodeLabelNames();
|
||||
Map<String, Set<NodeId>> labeledNodes = lmgr.getLabelsToNodes();
|
||||
|
||||
Assert.assertTrue(labelNames.contains("a"));
|
||||
Assert.assertTrue(labelNames.contains("b"));
|
||||
Assert.assertTrue(labelNames.contains("c"));
|
||||
Assert.assertTrue(labeledNodes.get("a")
|
||||
.contains(NodeId.newInstance("host1", 0)));
|
||||
Assert.assertTrue(labeledNodes.get("b")
|
||||
.contains(NodeId.newInstance("host2", 0)));
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user