MAPREDUCE-3398. Fixed log aggregation to work correctly in secure mode. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1214429 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-12-14 19:55:22 +00:00
parent 739f8871f2
commit 50fa9b89f4
21 changed files with 606 additions and 62 deletions

View File

@ -296,6 +296,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3541. Fix broken TestJobQueueClient test. (Ravi Prakash via MAPREDUCE-3541. Fix broken TestJobQueueClient test. (Ravi Prakash via
mahadev) mahadev)
MAPREDUCE-3398. Fixed log aggregation to work correctly in secure mode.
(Siddharth Seth via vinodkv)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -91,12 +91,7 @@ public class YarnConfiguration extends Configuration {
public static final String RM_CLIENT_THREAD_COUNT = public static final String RM_CLIENT_THREAD_COUNT =
RM_PREFIX + "client.thread-count"; RM_PREFIX + "client.thread-count";
public static final int DEFAULT_RM_CLIENT_THREAD_COUNT = 10; public static final int DEFAULT_RM_CLIENT_THREAD_COUNT = 10;
/** The expiry interval for application master reporting.*/
public static final String RM_AM_EXPIRY_INTERVAL_MS =
RM_PREFIX + "am.liveness-monitor.expiry-interval-ms";
public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000;
/** The Kerberos principal for the resource manager.*/ /** The Kerberos principal for the resource manager.*/
public static final String RM_PRINCIPAL = public static final String RM_PRINCIPAL =
RM_PREFIX + "principal"; RM_PREFIX + "principal";
@ -126,7 +121,17 @@ public class YarnConfiguration extends Configuration {
public static final int DEFAULT_RM_RESOURCE_TRACKER_PORT = 8025; public static final int DEFAULT_RM_RESOURCE_TRACKER_PORT = 8025;
public static final String DEFAULT_RM_RESOURCE_TRACKER_ADDRESS = public static final String DEFAULT_RM_RESOURCE_TRACKER_ADDRESS =
"0.0.0.0:" + DEFAULT_RM_RESOURCE_TRACKER_PORT; "0.0.0.0:" + DEFAULT_RM_RESOURCE_TRACKER_PORT;
/** The expiry interval for application master reporting.*/
public static final String RM_AM_EXPIRY_INTERVAL_MS =
YARN_PREFIX + "am.liveness-monitor.expiry-interval-ms";
public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000;
/** How long to wait until a node manager is considered dead.*/
public static final String RM_NM_EXPIRY_INTERVAL_MS =
YARN_PREFIX + "nm.liveness-monitor.expiry-interval-ms";
public static final int DEFAULT_RM_NM_EXPIRY_INTERVAL_MS = 600000;
/** Are acls enabled.*/ /** Are acls enabled.*/
public static final String YARN_ACL_ENABLE = public static final String YARN_ACL_ENABLE =
YARN_PREFIX + "acl.enable"; YARN_PREFIX + "acl.enable";
@ -160,12 +165,7 @@ public class YarnConfiguration extends Configuration {
/** The keytab for the resource manager.*/ /** The keytab for the resource manager.*/
public static final String RM_KEYTAB = public static final String RM_KEYTAB =
RM_PREFIX + "keytab"; RM_PREFIX + "keytab";
/** How long to wait until a node manager is considered dead.*/
public static final String RM_NM_EXPIRY_INTERVAL_MS =
RM_PREFIX + "nm.liveness-monitor.expiry-interval-ms";
public static final int DEFAULT_RM_NM_EXPIRY_INTERVAL_MS = 600000;
/** How long to wait until a container is considered dead.*/ /** How long to wait until a container is considered dead.*/
public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS =
RM_PREFIX + "rm.container-allocation.expiry-interval-ms"; RM_PREFIX + "rm.container-allocation.expiry-interval-ms";
@ -293,10 +293,16 @@ public class YarnConfiguration extends Configuration {
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs"; public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs"; public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
/** Interval at which the delayed token removal thread runs */
public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
30000l;
/** Whether to enable log aggregation */ /** Whether to enable log aggregation */
public static final String NM_LOG_AGGREGATION_ENABLED = NM_PREFIX public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable"; + "log-aggregation-enable";
public static final boolean DEFAULT_NM_LOG_AGGREGATION_ENABLED = false; public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
/** /**
* Number of seconds to retain logs on the NodeManager. Only applicable if Log * Number of seconds to retain logs on the NodeManager. Only applicable if Log

View File

@ -53,8 +53,8 @@ protected void render(Block html) {
logEntity = containerId.toString(); logEntity = containerId.toString();
} }
if (!conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) { YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
html.h1() html.h1()
._("Aggregation is not enabled. Try the nodemanager at " + nodeId) ._("Aggregation is not enabled. Try the nodemanager at " + nodeId)
._(); ._();

View File

@ -19,6 +19,7 @@
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
@ -33,6 +34,9 @@ public interface NodeStatus {
public abstract void setContainersStatuses( public abstract void setContainersStatuses(
List<ContainerStatus> containersStatuses); List<ContainerStatus> containersStatuses);
public abstract List<ApplicationId> getKeepAliveApplications();
public abstract void setKeepAliveApplications(List<ApplicationId> appIds);
NodeHealthStatus getNodeHealthStatus(); NodeHealthStatus getNodeHealthStatus();
void setNodeHealthStatus(NodeHealthStatus healthStatus); void setNodeHealthStatus(NodeHealthStatus healthStatus);

View File

@ -23,13 +23,16 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeHealthStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeHealthStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@ -37,7 +40,9 @@
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements NodeStatus {
public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements
NodeStatus {
NodeStatusProto proto = NodeStatusProto.getDefaultInstance(); NodeStatusProto proto = NodeStatusProto.getDefaultInstance();
NodeStatusProto.Builder builder = null; NodeStatusProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
@ -45,6 +50,7 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
private NodeId nodeId = null; private NodeId nodeId = null;
private List<ContainerStatus> containers = null; private List<ContainerStatus> containers = null;
private NodeHealthStatus nodeHealthStatus = null; private NodeHealthStatus nodeHealthStatus = null;
private List<ApplicationId> keepAliveApplications = null;
public NodeStatusPBImpl() { public NodeStatusPBImpl() {
builder = NodeStatusProto.newBuilder(); builder = NodeStatusProto.newBuilder();
@ -55,15 +61,14 @@ public NodeStatusPBImpl(NodeStatusProto proto) {
viaProto = true; viaProto = true;
} }
public NodeStatusProto getProto() { public synchronized NodeStatusProto getProto() {
mergeLocalToProto();
mergeLocalToProto();
proto = viaProto ? proto : builder.build(); proto = viaProto ? proto : builder.build();
viaProto = true; viaProto = true;
return proto; return proto;
} }
private void mergeLocalToBuilder() { private synchronized void mergeLocalToBuilder() {
if (this.nodeId != null) { if (this.nodeId != null) {
builder.setNodeId(convertToProtoFormat(this.nodeId)); builder.setNodeId(convertToProtoFormat(this.nodeId));
} }
@ -73,9 +78,12 @@ private void mergeLocalToBuilder() {
if (this.nodeHealthStatus != null) { if (this.nodeHealthStatus != null) {
builder.setNodeHealthStatus(convertToProtoFormat(this.nodeHealthStatus)); builder.setNodeHealthStatus(convertToProtoFormat(this.nodeHealthStatus));
} }
if (this.keepAliveApplications != null) {
addKeepAliveApplicationsToProto();
}
} }
private void mergeLocalToProto() { private synchronized void mergeLocalToProto() {
if (viaProto) if (viaProto)
maybeInitBuilder(); maybeInitBuilder();
mergeLocalToBuilder(); mergeLocalToBuilder();
@ -84,14 +92,14 @@ private void mergeLocalToProto() {
viaProto = true; viaProto = true;
} }
private void maybeInitBuilder() { private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) { if (viaProto || builder == null) {
builder = NodeStatusProto.newBuilder(proto); builder = NodeStatusProto.newBuilder(proto);
} }
viaProto = false; viaProto = false;
} }
private void addContainersToProto() { private synchronized void addContainersToProto() {
maybeInitBuilder(); maybeInitBuilder();
builder.clearContainersStatuses(); builder.clearContainersStatuses();
if (containers == null) if (containers == null)
@ -124,19 +132,53 @@ public void remove() {
}; };
builder.addAllContainersStatuses(iterable); builder.addAllContainersStatuses(iterable);
} }
private synchronized void addKeepAliveApplicationsToProto() {
maybeInitBuilder();
builder.clearKeepAliveApplications();
if (keepAliveApplications == null)
return;
Iterable<ApplicationIdProto> iterable = new Iterable<ApplicationIdProto>() {
@Override
public Iterator<ApplicationIdProto> iterator() {
return new Iterator<ApplicationIdProto>() {
Iterator<ApplicationId> iter = keepAliveApplications.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ApplicationIdProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllKeepAliveApplications(iterable);
}
@Override @Override
public int getResponseId() { public synchronized int getResponseId() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder; NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
return p.getResponseId(); return p.getResponseId();
} }
@Override @Override
public void setResponseId(int responseId) { public synchronized void setResponseId(int responseId) {
maybeInitBuilder(); maybeInitBuilder();
builder.setResponseId(responseId); builder.setResponseId(responseId);
} }
@Override @Override
public NodeId getNodeId() { public synchronized NodeId getNodeId() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder; NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
if (this.nodeId != null) { if (this.nodeId != null) {
return this.nodeId; return this.nodeId;
@ -148,8 +190,9 @@ public NodeId getNodeId() {
return this.nodeId; return this.nodeId;
} }
@Override @Override
public void setNodeId(NodeId nodeId) { public synchronized void setNodeId(NodeId nodeId) {
maybeInitBuilder(); maybeInitBuilder();
if (nodeId == null) if (nodeId == null)
builder.clearNodeId(); builder.clearNodeId();
@ -158,20 +201,35 @@ public void setNodeId(NodeId nodeId) {
} }
@Override @Override
public List<ContainerStatus> getContainersStatuses() { public synchronized List<ContainerStatus> getContainersStatuses() {
initContainers(); initContainers();
return this.containers; return this.containers;
} }
@Override @Override
public void setContainersStatuses(List<ContainerStatus> containers) { public synchronized void setContainersStatuses(
List<ContainerStatus> containers) {
if (containers == null) { if (containers == null) {
builder.clearContainersStatuses(); builder.clearContainersStatuses();
} }
this.containers = containers; this.containers = containers;
} }
@Override
public synchronized List<ApplicationId> getKeepAliveApplications() {
initKeepAliveApplications();
return this.keepAliveApplications;
}
@Override
public synchronized void setKeepAliveApplications(List<ApplicationId> appIds) {
if (appIds == null) {
builder.clearKeepAliveApplications();
}
this.keepAliveApplications = appIds;
}
private void initContainers() { private synchronized void initContainers() {
if (this.containers != null) { if (this.containers != null) {
return; return;
} }
@ -185,8 +243,22 @@ private void initContainers() {
} }
private synchronized void initKeepAliveApplications() {
if (this.keepAliveApplications != null) {
return;
}
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
List<ApplicationIdProto> list = p.getKeepAliveApplicationsList();
this.keepAliveApplications = new ArrayList<ApplicationId>();
for (ApplicationIdProto c : list) {
this.keepAliveApplications.add(convertFromProtoFormat(c));
}
}
@Override @Override
public NodeHealthStatus getNodeHealthStatus() { public synchronized NodeHealthStatus getNodeHealthStatus() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder; NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
if (nodeHealthStatus != null) { if (nodeHealthStatus != null) {
return nodeHealthStatus; return nodeHealthStatus;
@ -199,7 +271,7 @@ public NodeHealthStatus getNodeHealthStatus() {
} }
@Override @Override
public void setNodeHealthStatus(NodeHealthStatus healthStatus) { public synchronized void setNodeHealthStatus(NodeHealthStatus healthStatus) {
maybeInitBuilder(); maybeInitBuilder();
if (healthStatus == null) { if (healthStatus == null) {
builder.clearNodeHealthStatus(); builder.clearNodeHealthStatus();
@ -231,4 +303,12 @@ private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) {
private ContainerStatusProto convertToProtoFormat(ContainerStatus c) { private ContainerStatusProto convertToProtoFormat(ContainerStatus c) {
return ((ContainerStatusPBImpl)c).getProto(); return ((ContainerStatusPBImpl)c).getProto();
} }
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) {
return new ApplicationIdPBImpl(c);
}
private ApplicationIdProto convertToProtoFormat(ApplicationId c) {
return ((ApplicationIdPBImpl)c).getProto();
}
}

View File

@ -34,6 +34,7 @@ message NodeStatusProto {
optional int32 response_id = 2; optional int32 response_id = 2;
repeated ContainerStatusProto containersStatuses = 3; repeated ContainerStatusProto containersStatuses = 3;
optional NodeHealthStatusProto nodeHealthStatus = 4; optional NodeHealthStatusProto nodeHealthStatus = 4;
repeated ApplicationIdProto keep_alive_applications = 5;
} }
message RegistrationResponseProto { message RegistrationResponseProto {

View File

@ -72,7 +72,7 @@
<property> <property>
<description>The expiry interval for application master reporting.</description> <description>The expiry interval for application master reporting.</description>
<name>yarn.resourcemanager.am.liveness-monitor.expiry-interval-ms</name> <name>yarn.am.liveness-monitor.expiry-interval-ms</name>
<value>600000</value> <value>600000</value>
</property> </property>
@ -155,7 +155,7 @@
<property> <property>
<description>How long to wait until a node manager is considered dead.</description> <description>How long to wait until a node manager is considered dead.</description>
<name>yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms</name> <name>yarn.nm.liveness-monitor.expiry-interval-ms</name>
<value>600000</value> <value>600000</value>
</property> </property>
@ -210,6 +210,12 @@
<value>10000</value> <value>10000</value>
</property> </property>
<property>
<description>Interval at which the delayed token removal thread runs</description>
<name>yarn.resourcemanager.delayed.delegation-token.removal-interval-ms</name>
<value>30000</value>
</property>
<!-- Node Manager Configs --> <!-- Node Manager Configs -->
<property> <property>
<description>address of node manager IPC.</description> <description>address of node manager IPC.</description>
@ -304,7 +310,7 @@
<property> <property>
<description>Whether to enable log aggregation</description> <description>Whether to enable log aggregation</description>
<name>yarn.nodemanager.log-aggregation-enable</name> <name>yarn.log-aggregation-enable</name>
<value>false</value> <value>false</value>
</property> </property>

View File

@ -20,8 +20,12 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.apache.avro.AvroRuntimeException; import org.apache.avro.AvroRuntimeException;
@ -56,6 +60,7 @@
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
public class NodeStatusUpdaterImpl extends AbstractService implements public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatusUpdater { NodeStatusUpdater {
@ -76,6 +81,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private byte[] secretKeyBytes = new byte[0]; private byte[] secretKeyBytes = new byte[0];
private boolean isStopped; private boolean isStopped;
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private boolean tokenKeepAliveEnabled;
private long tokenRemovalDelayMs;
/** Keeps track of when the next keep alive request should be sent for an app*/
private Map<ApplicationId, Long> appTokenKeepAliveMap =
new HashMap<ApplicationId, Long>();
private Random keepAliveDelayRandom = new Random();
private final NodeHealthCheckerService healthChecker; private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics; private final NodeManagerMetrics metrics;
@ -103,6 +114,13 @@ public synchronized void init(Configuration conf) {
this.totalResource = recordFactory.newRecordInstance(Resource.class); this.totalResource = recordFactory.newRecordInstance(Resource.class);
this.totalResource.setMemory(memoryMb); this.totalResource.setMemory(memoryMb);
metrics.addResource(totalResource); metrics.addResource(totalResource);
this.tokenKeepAliveEnabled =
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
&& isSecurityEnabled();
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
super.init(conf); super.init(conf);
} }
@ -139,6 +157,10 @@ public synchronized void stop() {
super.stop(); super.stop();
} }
protected boolean isSecurityEnabled() {
return UserGroupInformation.isSecurityEnabled();
}
protected ResourceTracker getRMClient() { protected ResourceTracker getRMClient() {
Configuration conf = getConfig(); Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf); YarnRPC rpc = YarnRPC.create(conf);
@ -188,6 +210,29 @@ public byte[] getRMNMSharedSecret() {
return this.secretKeyBytes.clone(); return this.secretKeyBytes.clone();
} }
private List<ApplicationId> createKeepAliveApplicationList() {
if (!tokenKeepAliveEnabled) {
return Collections.emptyList();
}
List<ApplicationId> appList = new ArrayList<ApplicationId>();
for (Iterator<Entry<ApplicationId, Long>> i =
this.appTokenKeepAliveMap.entrySet().iterator(); i.hasNext();) {
Entry<ApplicationId, Long> e = i.next();
ApplicationId appId = e.getKey();
Long nextKeepAlive = e.getValue();
if (!this.context.getApplications().containsKey(appId)) {
// Remove if the application has finished.
i.remove();
} else if (System.currentTimeMillis() > nextKeepAlive) {
// KeepAlive list for the next hearbeat.
appList.add(appId);
trackAppForKeepAlive(appId);
}
}
return appList;
}
private NodeStatus getNodeStatus() { private NodeStatus getNodeStatus() {
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
@ -231,9 +276,29 @@ private NodeStatus getNodeStatus() {
} }
nodeStatus.setNodeHealthStatus(nodeHealthStatus); nodeStatus.setNodeHealthStatus(nodeHealthStatus);
List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
nodeStatus.setKeepAliveApplications(keepAliveAppIds);
return nodeStatus; return nodeStatus;
} }
private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
for (ApplicationId appId : appIds) {
trackAppForKeepAlive(appId);
}
}
}
private void trackAppForKeepAlive(ApplicationId appId) {
// Next keepAlive request for app between 0.7 & 0.9 of when the token will
// likely expire.
long nextTime = System.currentTimeMillis()
+ (long) (0.7 * tokenRemovalDelayMs + (0.2 * tokenRemovalDelayMs
* keepAliveDelayRandom.nextInt(100))/100);
appTokenKeepAliveMap.put(appId, nextTime);
}
@Override @Override
public void sendOutofBandHeartBeat() { public void sendOutofBandHeartBeat() {
synchronized (this.heartbeatMonitor) { synchronized (this.heartbeatMonitor) {
@ -245,6 +310,7 @@ protected void startStatusUpdater() {
new Thread("Node Status Updater") { new Thread("Node Status Updater") {
@Override @Override
@SuppressWarnings("unchecked")
public void run() { public void run() {
int lastHeartBeatID = 0; int lastHeartBeatID = 0;
while (!isStopped) { while (!isStopped) {
@ -284,6 +350,8 @@ public void run() {
} }
List<ApplicationId> appsToCleanup = List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanupList(); response.getApplicationsToCleanupList();
//Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup);
if (appsToCleanup.size() != 0) { if (appsToCleanup.size() != 0) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup)); new CMgrCompletedAppsEvent(appsToCleanup));

View File

@ -192,8 +192,8 @@ private void addIfService(Object object) {
protected LogHandler createLogHandler(Configuration conf, Context context, protected LogHandler createLogHandler(Configuration conf, Context context,
DeletionService deletionService) { DeletionService deletionService) {
if (conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) { YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
return new LogAggregationService(this.dispatcher, context, return new LogAggregationService(this.dispatcher, context,
deletionService, dirsHandler); deletionService, dirsHandler);
} else { } else {

View File

@ -170,6 +170,7 @@ public void run() {
this.writer.closeWriter(); this.writer.closeWriter();
LOG.info("Finished aggregate log-file for app " + this.applicationId); LOG.info("Finished aggregate log-file for app " + this.applicationId);
} }
try { try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() { userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override @Override

View File

@ -88,8 +88,8 @@ public void logs() {
containerId.getApplicationAttemptId().getApplicationId(); containerId.getApplicationAttemptId().getApplicationId();
Application app = nmContext.getApplications().get(appId); Application app = nmContext.getApplications().get(appId);
if (app == null if (app == null
&& nmConf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, && nmConf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) { YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
String logServerUrl = nmConf.get(YarnConfiguration.YARN_LOG_SERVER_URL); String logServerUrl = nmConf.get(YarnConfiguration.YARN_LOG_SERVER_URL);
String redirectUrl = null; String redirectUrl = null;
if (logServerUrl == null || logServerUrl.isEmpty()) { if (logServerUrl == null || logServerUrl.isEmpty()) {

View File

@ -22,7 +22,9 @@
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -56,6 +58,7 @@
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
@ -63,10 +66,12 @@
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.Service.STATE; import org.apache.hadoop.yarn.service.Service.STATE;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.mockito.Mockito.mock;
public class TestNodeStatusUpdater { public class TestNodeStatusUpdater {
@ -216,7 +221,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
HeartbeatResponse response = recordFactory HeartbeatResponse response = recordFactory
.newRecordInstance(HeartbeatResponse.class); .newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID); response.setResponseId(heartBeatID);
NodeHeartbeatResponse nhResponse = recordFactory NodeHeartbeatResponse nhResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class); .newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response); nhResponse.setHeartbeatResponse(response);
@ -241,6 +246,48 @@ protected ResourceTracker getRMClient() {
return resourceTracker; return resourceTracker;
} }
} }
private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker;
private Context context;
public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
ContainerTokenSecretManager containerTokenSecretManager) {
super(context, dispatcher, healthChecker, metrics,
containerTokenSecretManager);
this.context = context;
this.resourceTracker = new MyResourceTracker3(this.context);
}
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
}
@Override
protected boolean isSecurityEnabled() {
return true;
}
}
private class MyNodeManager extends NodeManager {
private MyNodeStatusUpdater3 nodeStatusUpdater;
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
ContainerTokenSecretManager containerTokenSecretManager) {
this.nodeStatusUpdater =
new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics,
containerTokenSecretManager);
return this.nodeStatusUpdater;
}
protected MyNodeStatusUpdater3 getNodeStatusUpdater() {
return this.nodeStatusUpdater;
}
}
// //
private class MyResourceTracker2 implements ResourceTracker { private class MyResourceTracker2 implements ResourceTracker {
@ -276,6 +323,65 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
} }
} }
private class MyResourceTracker3 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
public NodeAction registerNodeAction = NodeAction.NORMAL;
private Map<ApplicationId, List<Long>> keepAliveRequests =
new HashMap<ApplicationId, List<Long>>();
private ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
private final Context context;
MyResourceTracker3(Context context) {
this.context = context;
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException {
RegisterNodeManagerResponse response =
recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
RegistrationResponse regResponse =
recordFactory.newRecordInstance(RegistrationResponse.class);
regResponse.setNodeAction(registerNodeAction);
response.setRegistrationResponse(regResponse);
return response;
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException {
LOG.info("Got heartBeatId: [" + heartBeatID +"]");
NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++);
HeartbeatResponse response =
recordFactory.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
response.setNodeAction(heartBeatNodeAction);
if (nodeStatus.getKeepAliveApplications() != null
&& nodeStatus.getKeepAliveApplications().size() > 0) {
for (ApplicationId appId : nodeStatus.getKeepAliveApplications()) {
List<Long> list = keepAliveRequests.get(appId);
if (list == null) {
list = new LinkedList<Long>();
keepAliveRequests.put(appId, list);
}
list.add(System.currentTimeMillis());
}
}
if (heartBeatID == 2) {
LOG.info("Sending FINISH_APP for application: [" + appId + "]");
this.context.getApplications().put(appId, mock(Application.class));
response.addAllApplicationsToCleanup(Collections.singletonList(appId));
}
NodeHeartbeatResponse nhResponse =
recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response);
return nhResponse;
}
}
@Before @Before
public void clearError() { public void clearError() {
nmStartError = null; nmStartError = null;
@ -456,6 +562,38 @@ public void start() {
verifyNodeStartFailure("Starting of RPC Server failed"); verifyNodeStartFailure("Starting of RPC Server failed");
} }
@Test
public void testApplicationKeepAlive() throws Exception {
MyNodeManager nm = new MyNodeManager();
try {
YarnConfiguration conf = createNMConfig();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
4000l);
nm.init(conf);
nm.start();
// HB 2 -> app cancelled by RM.
while (heartBeatID < 12) {
Thread.sleep(1000l);
}
MyResourceTracker3 rt =
(MyResourceTracker3) nm.getNodeStatusUpdater().getRMClient();
rt.context.getApplications().remove(rt.appId);
Assert.assertEquals(1, rt.keepAliveRequests.size());
int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size();
LOG.info("Number of Keep Alive Requests: [" + numKeepAliveRequests + "]");
Assert.assertTrue(numKeepAliveRequests == 2 || numKeepAliveRequests == 3);
while (heartBeatID < 20) {
Thread.sleep(1000l);
}
int numKeepAliveRequests2 = rt.keepAliveRequests.get(rt.appId).size();
Assert.assertEquals(numKeepAliveRequests, numKeepAliveRequests2);
} finally {
if (nm.getServiceState() == STATE.STARTED)
nm.stop();
}
}
private void verifyNodeStartFailure(String errMessage) { private void verifyNodeStartFailure(String errMessage) {
YarnConfiguration conf = createNMConfig(); YarnConfiguration conf = createNMConfig();
nm.init(conf); nm.init(conf);

View File

@ -68,7 +68,7 @@ public void testLogDeletion() {
+ localLogDirs[1].getAbsolutePath(); + localLogDirs[1].getAbsolutePath();
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString); conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false); conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
DrainDispatcher dispatcher = createDispatcher(conf); DrainDispatcher dispatcher = createDispatcher(conf);
@ -142,7 +142,7 @@ public void testDelayedDelete() {
+ localLogDirs[1].getAbsolutePath(); + localLogDirs[1].getAbsolutePath();
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString); conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false); conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 10800l); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 10800l);

View File

@ -173,7 +173,7 @@ protected synchronized void finishApplication(ApplicationId applicationId) {
} else { } else {
// Inform the DelegationTokenRenewer // Inform the DelegationTokenRenewer
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
rmContext.getDelegationTokenRenewer().removeApplication(applicationId); rmContext.getDelegationTokenRenewer().applicationFinished(applicationId);
} }
completedApps.add(applicationId); completedApps.add(applicationId);

View File

@ -272,7 +272,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// 4. Send status to RMNode, saving the latest response. // 4. Send status to RMNode, saving the latest response.
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
remoteNodeStatus.getContainersStatuses(), latestResponse)); remoteNodeStatus.getContainersStatuses(),
remoteNodeStatus.getKeepAliveApplications(), latestResponse));
nodeHeartBeatResponse.setHeartbeatResponse(latestResponse); nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
return nodeHeartBeatResponse; return nodeHeartBeatResponse;

View File

@ -414,7 +414,9 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.context.getDispatcher().getEventHandler().handle( rmNode.context.getDispatcher().getEventHandler().handle(
new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers, new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers,
completedContainers)); completedContainers));
rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
statusEvent.getKeepAliveAppIds());
return RMNodeState.RUNNING; return RMNodeState.RUNNING;
} }
} }

View File

@ -20,6 +20,7 @@
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
@ -28,15 +29,17 @@
public class RMNodeStatusEvent extends RMNodeEvent { public class RMNodeStatusEvent extends RMNodeEvent {
private final NodeHealthStatus nodeHealthStatus; private final NodeHealthStatus nodeHealthStatus;
private List<ContainerStatus> containersCollection; private final List<ContainerStatus> containersCollection;
private final HeartbeatResponse latestResponse; private final HeartbeatResponse latestResponse;
private final List<ApplicationId> keepAliveAppIds;
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List<ContainerStatus> collection, List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
HeartbeatResponse latestResponse) { HeartbeatResponse latestResponse) {
super(nodeId, RMNodeEventType.STATUS_UPDATE); super(nodeId, RMNodeEventType.STATUS_UPDATE);
this.nodeHealthStatus = nodeHealthStatus; this.nodeHealthStatus = nodeHealthStatus;
this.containersCollection = collection; this.containersCollection = collection;
this.keepAliveAppIds = keepAliveAppIds;
this.latestResponse = latestResponse; this.latestResponse = latestResponse;
} }
@ -51,4 +54,8 @@ public List<ContainerStatus> getContainers() {
public HeartbeatResponse getLatestResponse() { public HeartbeatResponse getLatestResponse() {
return this.latestResponse; return this.latestResponse;
} }
}
public List<ApplicationId> getKeepAliveAppIds() {
return this.keepAliveAppIds;
}
}

View File

@ -20,14 +20,19 @@
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -40,6 +45,7 @@
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
/** /**
@ -65,7 +71,16 @@ public class DelegationTokenRenewer extends AbstractService {
// appId=>List<tokens> // appId=>List<tokens>
private Set<DelegationTokenToRenew> delegationTokens = private Set<DelegationTokenToRenew> delegationTokens =
Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>()); Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap =
new ConcurrentHashMap<ApplicationId, Long>();
private long tokenRemovalDelayMs;
private Thread delayedRemovalThread;
private boolean tokenKeepAliveEnabled;
public DelegationTokenRenewer() { public DelegationTokenRenewer() {
super(DelegationTokenRenewer.class.getName()); super(DelegationTokenRenewer.class.getName());
} }
@ -73,6 +88,12 @@ public DelegationTokenRenewer() {
@Override @Override
public synchronized void init(Configuration conf) { public synchronized void init(Configuration conf) {
super.init(conf); super.init(conf);
this.tokenKeepAliveEnabled =
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
} }
@Override @Override
@ -81,6 +102,12 @@ public synchronized void start() {
dtCancelThread.start(); dtCancelThread.start();
renewalTimer = new Timer(true); renewalTimer = new Timer(true);
if (tokenKeepAliveEnabled) {
delayedRemovalThread =
new Thread(new DelayedTokenRemovalRunnable(getConfig()),
"DelayedTokenCanceller");
delayedRemovalThread.start();
}
} }
@Override @Override
@ -94,6 +121,14 @@ public synchronized void stop() {
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
if (tokenKeepAliveEnabled && delayedRemovalThread != null) {
delayedRemovalThread.interrupt();
try {
delayedRemovalThread.join(1000);
} catch (InterruptedException e) {
LOG.info("Interrupted while joining on delayed removal thread.", e);
}
}
super.stop(); super.stop();
} }
@ -343,12 +378,38 @@ private void removeFailedDelegationToken(DelegationTokenToRenew t) {
if(t.timerTask!=null) if(t.timerTask!=null)
t.timerTask.cancel(); t.timerTask.cancel();
} }
/** /**
* Removing delegation token for completed applications. * Removing delegation token for completed applications.
* @param applicationId completed application * @param applicationId completed application
*/ */
public void removeApplication(ApplicationId applicationId) { public void applicationFinished(ApplicationId applicationId) {
if (!tokenKeepAliveEnabled) {
removeApplicationFromRenewal(applicationId);
} else {
delayedRemovalMap.put(applicationId, System.currentTimeMillis()
+ tokenRemovalDelayMs);
}
}
/**
* Add a list of applications to the keep alive list. If an appId already
* exists, update it's keep-alive time.
*
* @param appIds
* the list of applicationIds to be kept alive.
*
*/
public void updateKeepAliveApplications(List<ApplicationId> appIds) {
if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
for (ApplicationId appId : appIds) {
delayedRemovalMap.put(appId, System.currentTimeMillis()
+ tokenRemovalDelayMs);
}
}
}
private void removeApplicationFromRenewal(ApplicationId applicationId) {
synchronized (delegationTokens) { synchronized (delegationTokens) {
Iterator<DelegationTokenToRenew> it = delegationTokens.iterator(); Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
while(it.hasNext()) { while(it.hasNext()) {
@ -371,4 +432,50 @@ public void removeApplication(ApplicationId applicationId) {
} }
} }
} }
/**
* Takes care of cancelling app delegation tokens after the configured
* cancellation delay, taking into consideration keep-alive requests.
*
*/
private class DelayedTokenRemovalRunnable implements Runnable {
private long waitTimeMs;
DelayedTokenRemovalRunnable(Configuration conf) {
waitTimeMs =
conf.getLong(
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS);
}
@Override
public void run() {
List<ApplicationId> toCancel = new ArrayList<ApplicationId>();
while (!Thread.currentThread().isInterrupted()) {
Iterator<Entry<ApplicationId, Long>> it =
delayedRemovalMap.entrySet().iterator();
toCancel.clear();
while (it.hasNext()) {
Entry<ApplicationId, Long> e = it.next();
if (e.getValue() < System.currentTimeMillis()) {
toCancel.add(e.getKey());
}
}
for (ApplicationId appId : toCancel) {
removeApplicationFromRenewal(appId);
delayedRemovalMap.remove(appId);
}
synchronized (this) {
try {
wait(waitTimeMs);
} catch (InterruptedException e) {
LOG.info("Delayed Deletion Thread Interrupted. Shutting it down");
return;
}
}
}
}
}
} }

View File

@ -20,12 +20,12 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Collections;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -44,6 +44,7 @@
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -328,7 +329,7 @@ public void testDTRenewal () throws Exception {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
delegationTokenRenewer.addApplication(applicationId_1, ts, true); delegationTokenRenewer.addApplication(applicationId_1, ts, true);
delegationTokenRenewer.removeApplication(applicationId_1); delegationTokenRenewer.applicationFinished(applicationId_1);
numberOfExpectedRenewals = Renewer.counter; // number of renewals so far numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try { try {
@ -343,7 +344,7 @@ public void testDTRenewal () throws Exception {
// also renewing of the cancelled token should fail // also renewing of the cancelled token should fail
try { try {
token4.renew(conf); token4.renew(conf);
assertTrue("Renewal of canceled token didn't fail", false); fail("Renewal of cancelled token should have failed");
} catch (InvalidToken ite) { } catch (InvalidToken ite) {
//expected //expected
} }
@ -377,7 +378,7 @@ public void testDTRenewalWithNoCancel () throws Exception {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
delegationTokenRenewer.addApplication(applicationId_1, ts, false); delegationTokenRenewer.addApplication(applicationId_1, ts, false);
delegationTokenRenewer.removeApplication(applicationId_1); delegationTokenRenewer.applicationFinished(applicationId_1);
int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try { try {
@ -393,4 +394,123 @@ public void testDTRenewalWithNoCancel () throws Exception {
// been canceled // been canceled
token1.renew(conf); token1.renew(conf);
} }
/**
* Basic idea of the test:
* 0. Setup token KEEP_ALIVE
* 1. create tokens.
* 2. register them for renewal - to be cancelled on app complete
* 3. Complete app.
* 4. Verify token is alive within the KEEP_ALIVE time
* 5. Verify token has been cancelled after the KEEP_ALIVE_TIME
* @throws IOException
* @throws URISyntaxException
*/
@Test
public void testDTKeepAlive1 () throws Exception {
DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
Configuration lconf = new Configuration(conf);
lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
//Keep tokens alive for 6 seconds.
lconf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 6000l);
//Try removing tokens every second.
lconf.setLong(
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
1000l);
localDtr.init(lconf);
localDtr.start();
MyFS dfs = (MyFS)FileSystem.get(lconf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+lconf.hashCode());
Credentials ts = new Credentials();
// get the delegation tokens
MyToken token1 = dfs.getDelegationToken(new Text("user1"));
String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0";
ts.addToken(new Text(nn1), token1);
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
localDtr.addApplication(applicationId_0, ts, true);
localDtr.applicationFinished(applicationId_0);
Thread.sleep(3000l);
//Token should still be around. Renewal should not fail.
token1.renew(lconf);
//Allow the keepalive time to run out
Thread.sleep(6000l);
//The token should have been cancelled at this point. Renewal will fail.
try {
token1.renew(lconf);
fail("Renewal of cancelled token should have failed");
} catch (InvalidToken ite) {}
}
/**
* Basic idea of the test:
* 0. Setup token KEEP_ALIVE
* 1. create tokens.
* 2. register them for renewal - to be cancelled on app complete
* 3. Complete app.
* 4. Verify token is alive within the KEEP_ALIVE time
* 5. Send an explicity KEEP_ALIVE_REQUEST
* 6. Verify token KEEP_ALIVE time is renewed.
* 7. Verify token has been cancelled after the renewed KEEP_ALIVE_TIME.
* @throws IOException
* @throws URISyntaxException
*/
@Test
public void testDTKeepAlive2() throws Exception {
DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
Configuration lconf = new Configuration(conf);
lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
//Keep tokens alive for 6 seconds.
lconf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 6000l);
//Try removing tokens every second.
lconf.setLong(
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
1000l);
localDtr.init(lconf);
localDtr.start();
MyFS dfs = (MyFS)FileSystem.get(lconf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+lconf.hashCode());
Credentials ts = new Credentials();
// get the delegation tokens
MyToken token1 = dfs.getDelegationToken(new Text("user1"));
String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0";
ts.addToken(new Text(nn1), token1);
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
localDtr.addApplication(applicationId_0, ts, true);
localDtr.applicationFinished(applicationId_0);
Thread.sleep(4000l);
//Send another keep alive.
localDtr.updateKeepAliveApplications(Collections
.singletonList(applicationId_0));
//Renewal should not fail.
token1.renew(lconf);
//Token should be around after this.
Thread.sleep(4500l);
//Renewal should not fail. - ~1.5 seconds for keepalive timeout.
token1.renew(lconf);
//Allow the keepalive time to run out
Thread.sleep(3000l);
//The token should have been cancelled at this point. Renewal will fail.
try {
token1.renew(lconf);
fail("Renewal of cancelled token should have failed");
} catch (InvalidToken ite) {}
}
} }

View File

@ -395,7 +395,7 @@ public void testNodesQueryHealthyAndState() throws JSONException, Exception {
nodeHealth.setHealthReport("test health report"); nodeHealth.setHealthReport("test health report");
nodeHealth.setIsNodeHealthy(false); nodeHealth.setIsNodeHealthy(false);
node.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeHealth, node.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeHealth,
new ArrayList<ContainerStatus>(), null)); new ArrayList<ContainerStatus>(), null, null));
rm.NMwaitForState(nm1.getNodeId(), RMNodeState.UNHEALTHY); rm.NMwaitForState(nm1.getNodeId(), RMNodeState.UNHEALTHY);
JSONObject json = r.path("ws").path("v1").path("cluster").path("nodes") JSONObject json = r.path("ws").path("v1").path("cluster").path("nodes")

View File

@ -193,6 +193,10 @@ Hadoop MapReduce Next Generation - Cluster Setup
| | | ACLs are of for <comma-separated-users><space><comma-separated-groups>. | | | | ACLs are of for <comma-separated-users><space><comma-separated-groups>. |
| | | Defaults to special value of <<*>> which means <anyone>. | | | | Defaults to special value of <<*>> which means <anyone>. |
| | | Special value of just <space> means no one has access. | | | | Special value of just <space> means no one has access. |
*-------------------------+-------------------------+------------------------+
| <<<yarn.log-aggregation-enable>>> | | |
| | <false> | |
| | | Configuration to enable or disable log aggregation |
*-------------------------+-------------------------+------------------------+ *-------------------------+-------------------------+------------------------+
@ -260,10 +264,6 @@ Hadoop MapReduce Next Generation - Cluster Setup
| | are written. | | | | are written. | |
| | | Multiple paths help spread disk i/o. | | | | Multiple paths help spread disk i/o. |
*-------------------------+-------------------------+------------------------+ *-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.log-aggregation-enable>>> | | |
| | <false> | |
| | | Configuration to enable or disable log aggregation |
*-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.log.retain-seconds>>> | | | | <<<yarn.nodemanager.log.retain-seconds>>> | | |
| | <10800> | | | | <10800> | |
| | | Default time (in seconds) to retain log files on the NodeManager | | | | Default time (in seconds) to retain log files on the NodeManager |