YARN-60. Fixed a bug in ResourceManager which causes all NMs to get NPEs and thus causes all containers to be rejected. Contributed by Vinod Kumar Vavilapalli.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1379550 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
38d003a6db
commit
45a8e8c5a4
@ -74,3 +74,5 @@ Release 0.23.3 - Unreleased
|
||||
YARN-63. RMNodeImpl is missing valid transitions from the UNHEALTHY state
|
||||
(Jason Lowe via bobby)
|
||||
|
||||
YARN-60. Fixed a bug in ResourceManager which causes all NMs to get NPEs and
|
||||
thus causes all containers to be rejected. (vinodkv)
|
||||
|
@ -18,11 +18,14 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
|
||||
|
||||
public interface NodeHeartbeatRequest {
|
||||
public abstract NodeStatus getNodeStatus();
|
||||
|
||||
public abstract void setNodeStatus(NodeStatus status);
|
||||
|
||||
NodeStatus getNodeStatus();
|
||||
void setNodeStatus(NodeStatus status);
|
||||
|
||||
MasterKey getLastKnownMasterKey();
|
||||
void setLastKnownMasterKey(MasterKey secretKey);
|
||||
}
|
||||
|
@ -18,24 +18,25 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
|
||||
|
||||
|
||||
|
||||
public class NodeHeartbeatRequestPBImpl extends ProtoBase<NodeHeartbeatRequestProto> implements NodeHeartbeatRequest {
|
||||
public class NodeHeartbeatRequestPBImpl extends
|
||||
ProtoBase<NodeHeartbeatRequestProto> implements NodeHeartbeatRequest {
|
||||
NodeHeartbeatRequestProto proto = NodeHeartbeatRequestProto.getDefaultInstance();
|
||||
NodeHeartbeatRequestProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
private NodeStatus nodeStatus = null;
|
||||
|
||||
private MasterKey lastKnownMasterKey = null;
|
||||
|
||||
public NodeHeartbeatRequestPBImpl() {
|
||||
builder = NodeHeartbeatRequestProto.newBuilder();
|
||||
@ -57,6 +58,10 @@ private void mergeLocalToBuilder() {
|
||||
if (this.nodeStatus != null) {
|
||||
builder.setNodeStatus(convertToProtoFormat(this.nodeStatus));
|
||||
}
|
||||
if (this.lastKnownMasterKey != null) {
|
||||
builder
|
||||
.setLastKnownMasterKey(convertToProtoFormat(this.lastKnownMasterKey));
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
@ -96,6 +101,27 @@ public void setNodeStatus(NodeStatus nodeStatus) {
|
||||
this.nodeStatus = nodeStatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterKey getLastKnownMasterKey() {
|
||||
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.lastKnownMasterKey != null) {
|
||||
return this.lastKnownMasterKey;
|
||||
}
|
||||
if (!p.hasLastKnownMasterKey()) {
|
||||
return null;
|
||||
}
|
||||
this.lastKnownMasterKey = convertFromProtoFormat(p.getLastKnownMasterKey());
|
||||
return this.lastKnownMasterKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLastKnownMasterKey(MasterKey masterKey) {
|
||||
maybeInitBuilder();
|
||||
if (masterKey == null)
|
||||
builder.clearLastKnownMasterKey();
|
||||
this.lastKnownMasterKey = masterKey;
|
||||
}
|
||||
|
||||
private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) {
|
||||
return new NodeStatusPBImpl(p);
|
||||
}
|
||||
@ -104,6 +130,11 @@ private NodeStatusProto convertToProtoFormat(NodeStatus t) {
|
||||
return ((NodeStatusPBImpl)t).getProto();
|
||||
}
|
||||
|
||||
private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
|
||||
return new MasterKeyPBImpl(p);
|
||||
}
|
||||
|
||||
|
||||
private MasterKeyProto convertToProtoFormat(MasterKey t) {
|
||||
return ((MasterKeyPBImpl)t).getProto();
|
||||
}
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ message RegisterNodeManagerResponseProto {
|
||||
|
||||
message NodeHeartbeatRequestProto {
|
||||
optional NodeStatusProto node_status = 1;
|
||||
optional MasterKeyProto last_known_master_key = 2;
|
||||
}
|
||||
|
||||
message NodeHeartbeatResponseProto {
|
||||
|
@ -31,6 +31,7 @@
|
||||
import org.apache.avro.AvroRuntimeException;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
@ -111,10 +112,7 @@ public synchronized void init(Configuration conf) {
|
||||
this.totalResource = recordFactory.newRecordInstance(Resource.class);
|
||||
this.totalResource.setMemory(memoryMb);
|
||||
metrics.addResource(totalResource);
|
||||
this.tokenKeepAliveEnabled =
|
||||
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
|
||||
&& isSecurityEnabled();
|
||||
this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
|
||||
this.tokenRemovalDelayMs =
|
||||
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
|
||||
@ -163,10 +161,17 @@ synchronized boolean hasToRebootNode() {
|
||||
return this.hasToRebootNode;
|
||||
}
|
||||
|
||||
protected boolean isSecurityEnabled() {
|
||||
private boolean isSecurityEnabled() {
|
||||
return UserGroupInformation.isSecurityEnabled();
|
||||
}
|
||||
|
||||
@Private
|
||||
protected boolean isTokenKeepAliveEnabled(Configuration conf) {
|
||||
return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
|
||||
&& isSecurityEnabled();
|
||||
}
|
||||
|
||||
protected ResourceTracker getRMClient() {
|
||||
Configuration conf = getConfig();
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
@ -321,7 +326,11 @@ public void run() {
|
||||
|
||||
NodeHeartbeatRequest request = recordFactory
|
||||
.newRecordInstance(NodeHeartbeatRequest.class);
|
||||
request.setNodeStatus(nodeStatus);
|
||||
request.setNodeStatus(nodeStatus);
|
||||
if (isSecurityEnabled()) {
|
||||
request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context
|
||||
.getContainerTokenSecretManager().getCurrentKey());
|
||||
}
|
||||
HeartbeatResponse response =
|
||||
resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
|
||||
|
||||
|
@ -92,7 +92,6 @@ public synchronized byte[] retrievePassword(
|
||||
containerId.getApplicationAttemptId().getApplicationId();
|
||||
|
||||
MasterKeyData masterKeyToUse = null;
|
||||
|
||||
if (this.previousMasterKey != null
|
||||
&& keyId == this.previousMasterKey.getMasterKey().getKeyId()) {
|
||||
// A container-launch has come in with a token generated off the last
|
||||
|
@ -261,7 +261,7 @@ protected ResourceTracker getRMClient() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isSecurityEnabled() {
|
||||
protected boolean isTokenKeepAliveEnabled(Configuration conf) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -159,7 +159,7 @@ public synchronized void init(Configuration conf) {
|
||||
DelegationTokenRenewer tokenRenewer = createDelegationTokenRenewer();
|
||||
addService(tokenRenewer);
|
||||
|
||||
this.containerTokenSecretManager = new RMContainerTokenSecretManager(conf);
|
||||
this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
|
||||
|
||||
this.rmContext =
|
||||
new RMContextImpl(this.store, this.rmDispatcher,
|
||||
@ -231,6 +231,11 @@ public synchronized void init(Configuration conf) {
|
||||
super.init(conf);
|
||||
}
|
||||
|
||||
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
|
||||
Configuration conf) {
|
||||
return new RMContainerTokenSecretManager(conf);
|
||||
}
|
||||
|
||||
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
||||
return new SchedulerEventDispatcher(this.scheduler);
|
||||
}
|
||||
|
@ -169,14 +169,14 @@ public RegisterNodeManagerResponse registerNodeManager(
|
||||
return response;
|
||||
}
|
||||
|
||||
MasterKey nextMasterKeyForNode = null;
|
||||
if (isSecurityEnabled()) {
|
||||
nextMasterKeyForNode = this.containerTokenSecretManager.getCurrentKey();
|
||||
MasterKey nextMasterKeyForNode =
|
||||
this.containerTokenSecretManager.getCurrentKey();
|
||||
regResponse.setMasterKey(nextMasterKeyForNode);
|
||||
}
|
||||
|
||||
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
|
||||
resolve(host), capability, nextMasterKeyForNode);
|
||||
resolve(host), capability);
|
||||
|
||||
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
|
||||
if (oldNode == null) {
|
||||
@ -266,17 +266,18 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||
latestResponse.addAllApplicationsToCleanup(rmNode.getAppsToCleanup());
|
||||
latestResponse.setNodeAction(NodeAction.NORMAL);
|
||||
|
||||
MasterKey nextMasterKeyForNode = null;
|
||||
|
||||
// Check if node's masterKey needs to be updated and if the currentKey has
|
||||
// roller over, send it across
|
||||
if (isSecurityEnabled()) {
|
||||
|
||||
boolean shouldSendMasterKey = false;
|
||||
MasterKey nodeKnownMasterKey = rmNode.getCurrentMasterKey();
|
||||
nextMasterKeyForNode = this.containerTokenSecretManager.getNextKey();
|
||||
|
||||
MasterKey nextMasterKeyForNode =
|
||||
this.containerTokenSecretManager.getNextKey();
|
||||
if (nextMasterKeyForNode != null) {
|
||||
// nextMasterKeyForNode can be null if there is no outstanding key that
|
||||
// is in the activation period.
|
||||
MasterKey nodeKnownMasterKey = request.getLastKnownMasterKey();
|
||||
if (nodeKnownMasterKey.getKeyId() != nextMasterKeyForNode.getKeyId()) {
|
||||
shouldSendMasterKey = true;
|
||||
}
|
||||
@ -290,8 +291,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
|
||||
remoteNodeStatus.getContainersStatuses(),
|
||||
remoteNodeStatus.getKeepAliveApplications(), latestResponse,
|
||||
nextMasterKeyForNode));
|
||||
remoteNodeStatus.getKeepAliveApplications(), latestResponse));
|
||||
|
||||
nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
|
||||
return nodeHeartBeatResponse;
|
||||
|
@ -28,7 +28,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
|
||||
/**
|
||||
* Node managers information on available resources
|
||||
@ -107,6 +106,4 @@ public interface RMNode {
|
||||
public List<ApplicationId> getAppsToCleanup();
|
||||
|
||||
public HeartbeatResponse getLastHeartBeatResponse();
|
||||
|
||||
public MasterKey getCurrentMasterKey();
|
||||
}
|
||||
|
@ -46,7 +46,6 @@
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
|
||||
@ -105,8 +104,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||
private HeartbeatResponse latestHeartBeatResponse = recordFactory
|
||||
.newRecordInstance(HeartbeatResponse.class);
|
||||
|
||||
private MasterKey currentMasterKey;
|
||||
|
||||
private static final StateMachineFactory<RMNodeImpl,
|
||||
NodeState,
|
||||
RMNodeEventType,
|
||||
@ -167,8 +164,7 @@ RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
|
||||
RMNodeEvent> stateMachine;
|
||||
|
||||
public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
|
||||
int cmPort, int httpPort, Node node, Resource capability,
|
||||
MasterKey masterKey) {
|
||||
int cmPort, int httpPort, Node node, Resource capability) {
|
||||
this.nodeId = nodeId;
|
||||
this.context = context;
|
||||
this.hostName = hostName;
|
||||
@ -178,7 +174,6 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
|
||||
this.nodeAddress = hostName + ":" + cmPort;
|
||||
this.httpAddress = hostName + ":" + httpPort;
|
||||
this.node = node;
|
||||
this.currentMasterKey = masterKey;
|
||||
this.nodeHealthStatus.setIsNodeHealthy(true);
|
||||
this.nodeHealthStatus.setHealthReport("Healthy");
|
||||
this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
|
||||
@ -312,17 +307,6 @@ public HeartbeatResponse getLastHeartBeatResponse() {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterKey getCurrentMasterKey() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.currentMasterKey;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void handle(RMNodeEvent event) {
|
||||
LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
|
||||
@ -500,7 +484,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||
|
||||
// Switch the last heartbeatresponse.
|
||||
rmNode.latestHeartBeatResponse = statusEvent.getLatestResponse();
|
||||
rmNode.currentMasterKey = statusEvent.getCurrentMasterKey();
|
||||
|
||||
NodeHealthStatus remoteNodeHealthStatus =
|
||||
statusEvent.getNodeHealthStatus();
|
||||
@ -582,7 +565,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||
|
||||
// Switch the last heartbeatresponse.
|
||||
rmNode.latestHeartBeatResponse = statusEvent.getLatestResponse();
|
||||
rmNode.currentMasterKey = statusEvent.getCurrentMasterKey();
|
||||
NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
|
||||
rmNode.setNodeHealthStatus(remoteNodeHealthStatus);
|
||||
if (remoteNodeHealthStatus.getIsNodeHealthy()) {
|
||||
|
@ -25,7 +25,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
|
||||
public class RMNodeStatusEvent extends RMNodeEvent {
|
||||
|
||||
@ -33,17 +32,15 @@ public class RMNodeStatusEvent extends RMNodeEvent {
|
||||
private final List<ContainerStatus> containersCollection;
|
||||
private final HeartbeatResponse latestResponse;
|
||||
private final List<ApplicationId> keepAliveAppIds;
|
||||
private final MasterKey currentMasterKey;
|
||||
|
||||
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
|
||||
List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
|
||||
HeartbeatResponse latestResponse, MasterKey currentMasterKey) {
|
||||
HeartbeatResponse latestResponse) {
|
||||
super(nodeId, RMNodeEventType.STATUS_UPDATE);
|
||||
this.nodeHealthStatus = nodeHealthStatus;
|
||||
this.containersCollection = collection;
|
||||
this.keepAliveAppIds = keepAliveAppIds;
|
||||
this.latestResponse = latestResponse;
|
||||
this.currentMasterKey = currentMasterKey;
|
||||
}
|
||||
|
||||
public NodeHealthStatus getNodeHealthStatus() {
|
||||
@ -61,8 +58,4 @@ public HeartbeatResponse getLatestResponse() {
|
||||
public List<ApplicationId> getKeepAliveAppIds() {
|
||||
return this.keepAliveAppIds;
|
||||
}
|
||||
|
||||
public MasterKey getCurrentMasterKey() {
|
||||
return this.currentMasterKey;
|
||||
}
|
||||
}
|
@ -89,7 +89,7 @@ public void stop() {
|
||||
* Creates a new master-key and sets it as the primary.
|
||||
*/
|
||||
@Private
|
||||
protected void rollMasterKey() {
|
||||
public void rollMasterKey() {
|
||||
super.writeLock.lock();
|
||||
try {
|
||||
LOG.info("Rolling master-key for container-tokens");
|
||||
@ -97,6 +97,9 @@ protected void rollMasterKey() {
|
||||
this.currentMasterKey = createNewMasterKey();
|
||||
} else {
|
||||
this.nextMasterKey = createNewMasterKey();
|
||||
LOG.info("Going to activate master-key with key-id "
|
||||
+ this.nextMasterKey.getMasterKey().getKeyId() + " in "
|
||||
+ this.activationDelay + "ms");
|
||||
this.timer.schedule(new NextKeyActivator(), this.activationDelay);
|
||||
}
|
||||
} finally {
|
||||
@ -122,7 +125,7 @@ public MasterKey getNextKey() {
|
||||
* Activate the new master-key
|
||||
*/
|
||||
@Private
|
||||
protected void activateNextMasterKey() {
|
||||
public void activateNextMasterKey() {
|
||||
super.writeLock.lock();
|
||||
try {
|
||||
LOG.info("Activating next master key with id: "
|
||||
|
@ -35,7 +35,9 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
@ -46,8 +48,9 @@ public class MockNM {
|
||||
private final int memory;
|
||||
private final ResourceTrackerService resourceTracker;
|
||||
private final int httpPort = 2;
|
||||
private MasterKey currentMasterKey;
|
||||
|
||||
MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
|
||||
public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
|
||||
this.memory = memory;
|
||||
this.resourceTracker = resourceTracker;
|
||||
String[] splits = nodeIdStr.split(":");
|
||||
@ -72,7 +75,7 @@ public void containerStatus(Container container) throws Exception {
|
||||
nodeHeartbeat(conts, true);
|
||||
}
|
||||
|
||||
public NodeId registerNode() throws Exception {
|
||||
public RegistrationResponse registerNode() throws Exception {
|
||||
RegisterNodeManagerRequest req = Records.newRecord(
|
||||
RegisterNodeManagerRequest.class);
|
||||
req.setNodeId(nodeId);
|
||||
@ -80,13 +83,15 @@ public NodeId registerNode() throws Exception {
|
||||
Resource resource = Records.newRecord(Resource.class);
|
||||
resource.setMemory(memory);
|
||||
req.setResource(resource);
|
||||
resourceTracker.registerNodeManager(req);
|
||||
return nodeId;
|
||||
RegistrationResponse registrationResponse =
|
||||
resourceTracker.registerNodeManager(req).getRegistrationResponse();
|
||||
this.currentMasterKey = registrationResponse.getMasterKey();
|
||||
return registrationResponse;
|
||||
}
|
||||
|
||||
public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
|
||||
public HeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
|
||||
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
|
||||
b, ++responseId);
|
||||
isHealthy, ++responseId);
|
||||
}
|
||||
|
||||
public HeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
|
||||
@ -123,7 +128,15 @@ public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||
healthStatus.setLastHealthReportTime(1);
|
||||
status.setNodeHealthStatus(healthStatus);
|
||||
req.setNodeStatus(status);
|
||||
return resourceTracker.nodeHeartbeat(req).getHeartbeatResponse();
|
||||
req.setLastKnownMasterKey(this.currentMasterKey);
|
||||
HeartbeatResponse heartbeatResponse =
|
||||
resourceTracker.nodeHeartbeat(req).getHeartbeatResponse();
|
||||
MasterKey masterKeyFromRM = heartbeatResponse.getMasterKey();
|
||||
this.currentMasterKey =
|
||||
(masterKeyFromRM != null
|
||||
&& masterKeyFromRM.getKeyId() != this.currentMasterKey.getKeyId()
|
||||
? masterKeyFromRM : this.currentMasterKey);
|
||||
return heartbeatResponse;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -25,12 +25,11 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
@ -188,11 +187,6 @@ public List<ApplicationId> getAppsToCleanup() {
|
||||
public HeartbeatResponse getLastHeartBeatResponse() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterKey getCurrentMasterKey() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) {
|
||||
|
@ -105,7 +105,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
new TestSchedulerEventDispatcher());
|
||||
|
||||
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
|
||||
node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
|
||||
node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null);
|
||||
|
||||
}
|
||||
|
||||
|
@ -54,6 +54,7 @@
|
||||
import org.w3c.dom.Element;
|
||||
import org.w3c.dom.NodeList;
|
||||
import org.xml.sax.InputSource;
|
||||
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.servlet.GuiceServletContextListener;
|
||||
@ -145,7 +146,7 @@ public void testNodesDefaultWithUnHealthyNode() throws JSONException,
|
||||
nodeHealth.setHealthReport("test health report");
|
||||
nodeHealth.setIsNodeHealthy(false);
|
||||
node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeHealth,
|
||||
new ArrayList<ContainerStatus>(), null, null, null));
|
||||
new ArrayList<ContainerStatus>(), null, null));
|
||||
rm.NMwaitForState(nm3.getNodeId(), NodeState.UNHEALTHY);
|
||||
|
||||
ClientResponse response =
|
||||
@ -360,7 +361,7 @@ public void testNodesQueryHealthyAndState() throws JSONException, Exception {
|
||||
nodeHealth.setHealthReport("test health report");
|
||||
nodeHealth.setIsNodeHealthy(false);
|
||||
node.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeHealth,
|
||||
new ArrayList<ContainerStatus>(), null, null, null));
|
||||
new ArrayList<ContainerStatus>(), null, null));
|
||||
rm.NMwaitForState(nm1.getNodeId(), NodeState.UNHEALTHY);
|
||||
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
|
@ -44,6 +44,12 @@
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -46,7 +46,6 @@
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
@ -222,7 +221,7 @@ public void testMaliceUser() throws IOException, InterruptedException {
|
||||
Resource modifiedResource = BuilderUtils.newResource(2048);
|
||||
ContainerTokenIdentifier modifiedIdentifier = new ContainerTokenIdentifier(
|
||||
dummyIdentifier.getContainerID(), dummyIdentifier.getNmHostAddress(),
|
||||
modifiedResource, Long.MAX_VALUE, 0);
|
||||
modifiedResource, Long.MAX_VALUE, dummyIdentifier.getMasterKeyId());
|
||||
Token<ContainerTokenIdentifier> modifiedToken = new Token<ContainerTokenIdentifier>(
|
||||
modifiedIdentifier.getBytes(), containerToken.getPassword().array(),
|
||||
new Text(containerToken.getKind()), new Text(containerToken
|
||||
@ -250,19 +249,14 @@ public Void run() {
|
||||
+ "it will indicate RPC success");
|
||||
} catch (Exception e) {
|
||||
Assert.assertEquals(
|
||||
java.lang.reflect.UndeclaredThrowableException.class
|
||||
.getCanonicalName(), e.getClass().getCanonicalName());
|
||||
Assert.assertEquals(RemoteException.class.getCanonicalName(), e
|
||||
.getCause().getClass().getCanonicalName());
|
||||
Assert.assertEquals(
|
||||
"org.apache.hadoop.security.token.SecretManager$InvalidToken",
|
||||
((RemoteException) e.getCause()).getClassName());
|
||||
java.lang.reflect.UndeclaredThrowableException.class
|
||||
.getCanonicalName(), e.getClass().getCanonicalName());
|
||||
Assert.assertTrue(e
|
||||
.getCause()
|
||||
.getMessage()
|
||||
.matches(
|
||||
"Given Container container_\\d*_\\d*_\\d\\d_\\d*"
|
||||
+ " seems to have an illegally generated token."));
|
||||
.contains(
|
||||
"DIGEST-MD5: digest response format violation. "
|
||||
+ "Mismatched response."));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -0,0 +1,120 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestRMNMSecretKeys {
|
||||
|
||||
@Test
|
||||
public void testNMUpdation() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
// Default rolling and activation intervals are large enough, no need to
|
||||
// intervene
|
||||
|
||||
final DrainDispatcher dispatcher = new DrainDispatcher();
|
||||
ResourceManager rm = new ResourceManager(null) {
|
||||
@Override
|
||||
protected void doSecureLogin() throws IOException {
|
||||
// Do nothing.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return dispatcher;
|
||||
}
|
||||
};
|
||||
rm.init(conf);
|
||||
rm.start();
|
||||
|
||||
MockNM nm = new MockNM("host:1234", 3072, rm.getResourceTrackerService());
|
||||
RegistrationResponse registrationResponse = nm.registerNode();
|
||||
MasterKey masterKey = registrationResponse.getMasterKey();
|
||||
Assert.assertNotNull("Registration should cause a key-update!", masterKey);
|
||||
dispatcher.await();
|
||||
|
||||
HeartbeatResponse response = nm.nodeHeartbeat(true);
|
||||
Assert.assertNull(
|
||||
"First heartbeat after registration shouldn't get any key updates!",
|
||||
response.getMasterKey());
|
||||
dispatcher.await();
|
||||
|
||||
response = nm.nodeHeartbeat(true);
|
||||
Assert
|
||||
.assertNull(
|
||||
"Even second heartbeat after registration shouldn't get any key updates!",
|
||||
response.getMasterKey());
|
||||
dispatcher.await();
|
||||
|
||||
// Let's force a roll-over
|
||||
RMContainerTokenSecretManager secretManager =
|
||||
rm.getRMContainerTokenSecretManager();
|
||||
secretManager.rollMasterKey();
|
||||
|
||||
// Heartbeats after roll-over and before activation should be fine.
|
||||
response = nm.nodeHeartbeat(true);
|
||||
Assert.assertNotNull(
|
||||
"Heartbeats after roll-over and before activation should not err out.",
|
||||
response.getMasterKey());
|
||||
Assert.assertEquals(
|
||||
"Roll-over should have incremented the key-id only by one!",
|
||||
masterKey.getKeyId() + 1, response.getMasterKey().getKeyId());
|
||||
dispatcher.await();
|
||||
|
||||
response = nm.nodeHeartbeat(true);
|
||||
Assert.assertNull(
|
||||
"Second heartbeat after roll-over shouldn't get any key updates!",
|
||||
response.getMasterKey());
|
||||
dispatcher.await();
|
||||
|
||||
// Let's force activation
|
||||
secretManager.activateNextMasterKey();
|
||||
|
||||
response = nm.nodeHeartbeat(true);
|
||||
Assert.assertNull("Activation shouldn't cause any key updates!",
|
||||
response.getMasterKey());
|
||||
dispatcher.await();
|
||||
|
||||
response = nm.nodeHeartbeat(true);
|
||||
Assert.assertNull(
|
||||
"Even second heartbeat after activation shouldn't get any key updates!",
|
||||
response.getMasterKey());
|
||||
dispatcher.await();
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user